This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 762f0fcc30803c20158cc1a75f6eac2fa530a44f Author: Alexey Serbin <[email protected]> AuthorDate: Mon May 6 19:10:39 2019 -0700 [util] introduce Synchronizer::WaitUntil() Added Synchronizer::WaitUntil() method to avoid converting MonoTime into MonoDelta when the deadline is specified as MonoTime. Updated corresponding tests for Synchronizer as well. Change-Id: I00586ac1ba49494ff08abae0d452ab9286a3e56f Reviewed-on: http://gerrit.cloudera.org:8080/13255 Reviewed-by: Andrew Wong <[email protected]> Tested-by: Alexey Serbin <[email protected]> --- src/kudu/master/hms_notification_log_listener.cc | 2 +- src/kudu/util/async_util-test.cc | 90 +++++++++++++++++------- src/kudu/util/async_util.h | 9 ++- 3 files changed, 75 insertions(+), 26 deletions(-) diff --git a/src/kudu/master/hms_notification_log_listener.cc b/src/kudu/master/hms_notification_log_listener.cc index b34d233..2237a4b 100644 --- a/src/kudu/master/hms_notification_log_listener.cc +++ b/src/kudu/master/hms_notification_log_listener.cc @@ -118,7 +118,7 @@ Status HmsNotificationLogListenerTask::WaitForCatchUp(const MonoTime& deadline) wake_up_cv_.Signal(); } - RETURN_NOT_OK_PREPEND(synchronizer.WaitFor(deadline - MonoTime::Now()), + RETURN_NOT_OK_PREPEND(synchronizer.WaitUntil(deadline), "failed to wait for Hive Metastore notification log listener to catch up"); return Status::OK(); } diff --git a/src/kudu/util/async_util-test.cc b/src/kudu/util/async_util-test.cc index 5cb7a63..91f2baa 100644 --- a/src/kudu/util/async_util-test.cc +++ b/src/kudu/util/async_util-test.cc @@ -28,6 +28,7 @@ #include "kudu/gutil/basictypes.h" #include "kudu/gutil/callback.h" #include "kudu/util/monotime.h" +#include "kudu/util/scoped_cleanup.h" #include "kudu/util/status.h" #include "kudu/util/test_macros.h" #include "kudu/util/test_util.h" @@ -43,7 +44,7 @@ class AsyncUtilTest : public KuduTest { // Set up an alarm to fail the test in case of deadlock. alarm(30); } - ~AsyncUtilTest() { + virtual ~AsyncUtilTest() { // Disable the alarm on test exit. alarm(0); } @@ -99,31 +100,72 @@ TEST_F(AsyncUtilTest, TestSynchronizerMultiWait) { } } -TEST_F(AsyncUtilTest, TestSynchronizerTimedWait) { - thread waiter; - { - Synchronizer sync; - auto cb = sync.AsStatusCallback(); - waiter = thread([cb] { - SleepFor(MonoDelta::FromMilliseconds(5)); - cb.Run(Status::OK()); - }); - ASSERT_OK(sync.WaitFor(MonoDelta::FromMilliseconds(1000))); - } - waiter.join(); +// Flavors of wait that Synchronizer is capable of: WaitFor() or WaitUntil(). +enum class TimedWaitFlavor { + WaitFor, + WaitUntil, +}; - { - Synchronizer sync; - auto cb = sync.AsStatusCallback(); - waiter = thread([cb] { - SleepFor(MonoDelta::FromMilliseconds(1000)); - cb.Run(Status::OK()); - }); - ASSERT_TRUE(sync.WaitFor(MonoDelta::FromMilliseconds(5)).IsTimedOut()); +class AsyncUtilTimedWaitTest: + public AsyncUtilTest, + public ::testing::WithParamInterface<TimedWaitFlavor> { +}; + +TEST_P(AsyncUtilTimedWaitTest, SynchronizerTimedWaitSuccess) { + const auto kWaitInterval = MonoDelta::FromMilliseconds(1000); + + Synchronizer sync; + auto cb = sync.AsStatusCallback(); + auto waiter = thread([cb] { + SleepFor(MonoDelta::FromMilliseconds(5)); + cb.Run(Status::OK()); + }); + SCOPED_CLEANUP({ + waiter.join(); + }); + const auto mode = GetParam(); + switch (mode) { + case TimedWaitFlavor::WaitFor: + ASSERT_OK(sync.WaitFor(kWaitInterval)); + break; + case TimedWaitFlavor::WaitUntil: + ASSERT_OK(sync.WaitUntil(MonoTime::Now() + kWaitInterval)); + break; + default: + FAIL() << "unsupported wait mode " << static_cast<int>(mode); + break; } +} + +TEST_P(AsyncUtilTimedWaitTest, SynchronizerTimedWaitTimeout) { + const auto kWaitInterval = MonoDelta::FromMilliseconds(5); - // Waiting on the thread gives TSAN to check that no thread safety issues - // occurred. - waiter.join(); + Synchronizer sync; + auto cb = sync.AsStatusCallback(); + auto waiter = thread([cb] { + SleepFor(MonoDelta::FromMilliseconds(1000)); + cb.Run(Status::OK()); + }); + SCOPED_CLEANUP({ + waiter.join(); + }); + const auto mode = GetParam(); + switch (mode) { + case TimedWaitFlavor::WaitFor: + ASSERT_TRUE(sync.WaitFor(kWaitInterval).IsTimedOut()); + break; + case TimedWaitFlavor::WaitUntil: + ASSERT_TRUE(sync.WaitUntil(MonoTime::Now() + kWaitInterval).IsTimedOut()); + break; + default: + FAIL() << "unsupported wait mode " << static_cast<int>(mode); + break; + } } + +INSTANTIATE_TEST_CASE_P(WaitFlavors, + AsyncUtilTimedWaitTest, + ::testing::Values(TimedWaitFlavor::WaitFor, + TimedWaitFlavor::WaitUntil)); + } // namespace kudu diff --git a/src/kudu/util/async_util.h b/src/kudu/util/async_util.h index 338c6c2..61621d6 100644 --- a/src/kudu/util/async_util.h +++ b/src/kudu/util/async_util.h @@ -44,7 +44,7 @@ namespace kudu { class Synchronizer { public: Synchronizer() - : data_(std::make_shared<Data>()) { + : data_(std::make_shared<Data>()) { } void StatusCB(const Status& status) { @@ -71,6 +71,13 @@ class Synchronizer { return data_->status; } + Status WaitUntil(const MonoTime& deadline) const { + if (PREDICT_FALSE(!data_->latch.WaitUntil(deadline))) { + return Status::TimedOut("timed out while waiting for the callback to be called"); + } + return data_->status; + } + void Reset() { data_->latch.Reset(1); }
