IMPALA-6449: Use CLOCK_MONOTONIC in ConditionVariable

ConditionVariable is a thin wrapper around pthread_cond_*.
Currently, pthread_cond_timedwait() uses the default attribute
CLOCK_REALTIME. This is susceptible to adjustment to the system
clock from various sources such as NTP and time may go backward.
This change fixes the problem by switching to using CLOCK_MONOTONIC
so time will be monotonic although the frequency of the clock ticks
may still be adjusted by NTP. Ideally, we should use CLOCK_MONOTONIC_RAW
but it's available only on Linux kernel 2.6.28 or latter. This change
also get rids of some usage of boost::get_system_time() which suffers
from the same problem.

Change-Id: I81611cfd5e7c5347203fe7fa6b0f615602257f87
Reviewed-on: http://gerrit.cloudera.org:8080/9158
Reviewed-by: Michael Ho <k...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/ee74a627
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/ee74a627
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/ee74a627

Branch: refs/heads/2.x
Commit: ee74a6277dec51fd1cd32acfdfb7821174451c03
Parents: d747670
Author: Michael Ho <k...@cloudera.com>
Authored: Mon Jan 29 18:07:28 2018 -0800
Committer: Impala Public Jenkins <impala-public-jenk...@gerrit.cloudera.org>
Committed: Thu Feb 8 07:01:53 2018 +0000

----------------------------------------------------------------------
 be/src/rpc/thrift-server.cc               |  4 +--
 be/src/runtime/fragment-instance-state.cc |  7 ++---
 be/src/service/impala-server.cc           |  2 +-
 be/src/util/blocking-queue.h              |  6 ++--
 be/src/util/condition-variable.h          | 40 +++++++++++++-------------
 be/src/util/promise.h                     |  8 ++----
 be/src/util/time.h                        | 13 +++++++++
 7 files changed, 44 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/ee74a627/be/src/rpc/thrift-server.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-server.cc b/be/src/rpc/thrift-server.cc
index ded710e..48fb1b9 100644
--- a/be/src/rpc/thrift-server.cc
+++ b/be/src/rpc/thrift-server.cc
@@ -165,8 +165,8 @@ Status 
ThriftServer::ThriftServerEventProcessor::StartAndWaitForServer() {
       &ThriftServer::ThriftServerEventProcessor::Supervise, this,
       &thrift_server_->server_thread_));
 
-  system_time deadline = get_system_time() +
-      
posix_time::milliseconds(ThriftServer::ThriftServerEventProcessor::TIMEOUT_MS);
+  timespec deadline;
+  TimeFromNowMillis(ThriftServer::ThriftServerEventProcessor::TIMEOUT_MS, 
&deadline);
 
   // Loop protects against spurious wakeup. Locks provide necessary fences to 
ensure
   // visibility.

http://git-wip-us.apache.org/repos/asf/impala/blob/ee74a627/be/src/runtime/fragment-instance-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.cc 
b/be/src/runtime/fragment-instance-state.cc
index 16b4a7e..ad9e99e 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -341,19 +341,16 @@ void FragmentInstanceState::ReportProfileThread() {
   // updates at once so its better for contention as well as smoother progress
   // reporting.
   int report_fragment_offset = rand() % FLAGS_status_report_interval;
-  boost::posix_time::seconds wait_duration(report_fragment_offset);
   // We don't want to wait longer than it takes to run the entire fragment.
-  stop_report_thread_cv_.WaitFor(l, wait_duration);
+  stop_report_thread_cv_.WaitFor(l, report_fragment_offset * MICROS_PER_SEC);
 
   while (report_thread_active_) {
-    boost::posix_time::seconds 
loop_wait_duration(FLAGS_status_report_interval);
-
     // timed_wait can return because the timeout occurred or the condition 
variable
     // was signaled.  We can't rely on its return value to distinguish between 
the
     // two cases (e.g. there is a race here where the wait timed out but 
before grabbing
     // the lock, the condition variable was signaled).  Instead, we will use 
an external
     // flag, report_thread_active_, to coordinate this.
-    stop_report_thread_cv_.WaitFor(l, loop_wait_duration);
+    stop_report_thread_cv_.WaitFor(l, FLAGS_status_report_interval * 
MICROS_PER_SEC);
 
     if (!report_thread_active_) break;
     SendReport(false, Status::OK());

http://git-wip-us.apache.org/repos/asf/impala/blob/ee74a627/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 0c5f75b..cf5f5fb 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1772,7 +1772,7 @@ void ImpalaServer::UnregisterSessionTimeout(int32_t 
session_timeout) {
         session_timeout_cv_.Wait(timeout_lock);
       } else {
         // Sleep for a second before checking whether an active session can be 
expired.
-        session_timeout_cv_.WaitFor(timeout_lock, seconds(1));
+        session_timeout_cv_.WaitFor(timeout_lock, MICROS_PER_SEC);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/ee74a627/be/src/util/blocking-queue.h
----------------------------------------------------------------------
diff --git a/be/src/util/blocking-queue.h b/be/src/util/blocking-queue.h
index a4b1b8f..1dd90d5 100644
--- a/be/src/util/blocking-queue.h
+++ b/be/src/util/blocking-queue.h
@@ -138,13 +138,13 @@ class BlockingQueue : public CacheLineAligned {
   bool BlockingPutWithTimeout(V&& val, int64_t timeout_micros) {
     MonotonicStopWatch timer;
     boost::unique_lock<boost::mutex> write_lock(put_lock_);
-    boost::system_time wtime = boost::get_system_time() +
-        boost::posix_time::microseconds(timeout_micros);
+    timespec abs_time;
+    TimeFromNowMicros(timeout_micros, &abs_time);
     bool notified = true;
     while (SizeLocked(write_lock) >= max_elements_ && !shutdown_ && notified) {
       timer.Start();
       // Wait until we're notified or until the timeout expires.
-      notified = put_cv_.WaitUntil(write_lock, wtime);
+      notified = put_cv_.WaitUntil(write_lock, abs_time);
       timer.Stop();
     }
     total_put_wait_time_ += timer.ElapsedTime();

http://git-wip-us.apache.org/repos/asf/impala/blob/ee74a627/be/src/util/condition-variable.h
----------------------------------------------------------------------
diff --git a/be/src/util/condition-variable.h b/be/src/util/condition-variable.h
index c1a1e56..e463790 100644
--- a/be/src/util/condition-variable.h
+++ b/be/src/util/condition-variable.h
@@ -24,13 +24,23 @@
 #include <pthread.h>
 #include <unistd.h>
 
+#include "util/time.h"
+
 namespace impala {
 
 /// Simple wrapper around POSIX pthread condition variable. This has lower 
overhead than
 /// boost's implementation as it doesn't implement boost thread interruption.
 class ConditionVariable {
  public:
-  ConditionVariable() { pthread_cond_init(&cv_, NULL); }
+  ConditionVariable() {
+    pthread_condattr_t attrs;
+    int retval = pthread_condattr_init(&attrs);
+    DCHECK_EQ(0, retval);
+    pthread_condattr_setclock(&attrs, CLOCK_MONOTONIC);
+    retval = pthread_cond_init(&cv_, &attrs);
+    DCHECK_EQ(0, retval);
+    pthread_condattr_destroy(&attrs);
+  }
 
   ~ConditionVariable() { pthread_cond_destroy(&cv_); }
 
@@ -41,32 +51,22 @@ class ConditionVariable {
     pthread_cond_wait(&cv_, mutex);
   }
 
-  /// Wait until the condition variable is notified or 'timeout' has passed.
+  /// Wait until the condition variable is notified or 'abs_time' has passed.
   /// Returns true if the condition variable is notified before the absolute 
timeout
-  /// specified in 'timeout' has passed. Returns false otherwise.
-  bool WaitUntil(boost::unique_lock<boost::mutex>& lock,
-      const timespec& abs_time) {
+  /// specified in 'abs_time' has passed. Returns false otherwise.
+  bool WaitUntil(boost::unique_lock<boost::mutex>& lock, const timespec& 
abs_time) {
     DCHECK(lock.owns_lock());
     pthread_mutex_t* mutex = lock.mutex()->native_handle();
     return pthread_cond_timedwait(&cv_, mutex, &abs_time) == 0;
   }
 
-  /// Wait until the condition variable is notified or 'abs_time' has passed.
-  /// Returns true if the condition variable is notified before the absolute 
timeout
-  /// specified in 'abs_time' has passed. Returns false otherwise.
-  bool WaitUntil(boost::unique_lock<boost::mutex>& lock,
-      const boost::system_time& abs_time) {
-    return WaitUntil(lock, to_timespec(abs_time));
-  }
-
-  /// Wait until the condition variable is notified or have waited for the time
-  /// specified in 'wait_duration'.
-  /// Returns true if the condition variable is notified in time.
+  /// Wait until the condition variable is notified or 'duration_us' 
microseconds
+  /// have passed. Returns true if the condition variable is notified in time.
   /// Returns false otherwise.
-  template <typename duration_type>
-  bool WaitFor(boost::unique_lock<boost::mutex>& lock,
-      const duration_type& wait_duration) {
-    return WaitUntil(lock, to_timespec(boost::get_system_time() + 
wait_duration));
+  bool WaitFor(boost::unique_lock<boost::mutex>& lock, int64_t duration_us) {
+    timespec deadline;
+    TimeFromNowMicros(duration_us, &deadline);
+    return WaitUntil(lock, deadline);
   }
 
   /// Notify a single waiter on this condition variable.

http://git-wip-us.apache.org/repos/asf/impala/blob/ee74a627/be/src/util/promise.h
----------------------------------------------------------------------
diff --git a/be/src/util/promise.h b/be/src/util/promise.h
index 5de2d13..c93d9f2 100644
--- a/be/src/util/promise.h
+++ b/be/src/util/promise.h
@@ -77,17 +77,15 @@ class Promise {
   /// timed_out: Indicates whether Get() returned due to timeout. Must be 
non-NULL.
   const T& Get(int64_t timeout_millis, bool* timed_out) {
     DCHECK_GT(timeout_millis, 0);
-    int64_t timeout_micros = timeout_millis * 1000;
+    int64_t timeout_micros = timeout_millis * MICROS_PER_MILLI;
     DCHECK(timed_out != NULL);
     boost::unique_lock<boost::mutex> l(val_lock_);
     int64_t start;
     int64_t now;
     now = start = MonotonicMicros();
     while (!val_is_set_ && (now - start) < timeout_micros) {
-      boost::posix_time::microseconds wait_time =
-          boost::posix_time::microseconds(std::max<int64_t>(
-              1, timeout_micros - (now - start)));
-      val_set_cond_.WaitFor(l, wait_time);
+      int64_t wait_time_micros = std::max<int64_t>(1, timeout_micros - (now - 
start));
+      val_set_cond_.WaitFor(l, wait_time_micros);
       now = MonotonicMicros();
     }
     *timed_out = !val_is_set_;

http://git-wip-us.apache.org/repos/asf/impala/blob/ee74a627/be/src/util/time.h
----------------------------------------------------------------------
diff --git a/be/src/util/time.h b/be/src/util/time.h
index cef14c8..64dbf9c 100644
--- a/be/src/util/time.h
+++ b/be/src/util/time.h
@@ -57,6 +57,19 @@ inline int64_t UnixMillis() {
   return GetCurrentTimeMicros() / MICROS_PER_MILLI;
 }
 
+/// Return the time 'time_us' microseconds away from now in 'abs_time'.
+inline void TimeFromNowMicros(int64_t time_us, timespec* abs_time) {
+  clock_gettime(CLOCK_MONOTONIC, abs_time);
+  abs_time->tv_nsec += (time_us % MICROS_PER_SEC) * NANOS_PER_MICRO;
+  abs_time->tv_sec += time_us / MICROS_PER_SEC + abs_time->tv_nsec / 
NANOS_PER_SEC;
+  abs_time->tv_nsec %= NANOS_PER_SEC;
+}
+
+/// Return the time 'time_ms' milliseconds away from now in 'abs_time'.
+inline void TimeFromNowMillis(int64_t time_ms, timespec* abs_time) {
+  TimeFromNowMicros(time_ms * MICROS_PER_MILLI, abs_time);
+}
+
 /// Returns the number of microseconds that have passed since the Unix epoch. 
This is
 /// affected by manual changes to the system clock but is more suitable for 
use across
 /// a cluster. For more accurate timings on the local host use the monotonic 
functions

Reply via email to