IMPALA-7215: Implement a templatized CountingBarrier

Currently, our CountingBarrier util only notifies a 'bool' value
and uses an underlying Promise<bool>.

We're seeing cases in code where we might want to be notified of a
different kind of Promise (other than bool). We add a templatized
class TypedCountingBarrier<T> and convert CountingBarrier to use the
TypedCountingBarrier<T> internally.

This was identified while working on IMPALA-7163.

Testing: Ran 'core' tests.

Change-Id: I05fc79228250408ae16481ae7ff3491a90d26b8e
Reviewed-on: http://gerrit.cloudera.org:8080/10827
Reviewed-by: Sailesh Mukil <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/4ce2af9f
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/4ce2af9f
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/4ce2af9f

Branch: refs/heads/master
Commit: 4ce2af9ff2340bba1bfb40a1ed3b5b39d6011f87
Parents: 8000c31
Author: Sailesh Mukil <[email protected]>
Authored: Tue Jun 26 11:20:12 2018 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Thu Jun 28 21:18:25 2018 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-scan-node.cc  |  2 +-
 be/src/rpc/rpc-mgr-test.cc     |  6 ++-
 be/src/runtime/coordinator.cc  |  2 +-
 be/src/util/counting-barrier.h | 75 +++++++++++++++++++++++++++----------
 be/src/util/hdfs-bulk-ops.cc   |  2 +-
 5 files changed, 62 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/4ce2af9f/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index fe2eee7..e9c0864 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -81,7 +81,7 @@ Status HdfsScanNode::GetNext(RuntimeState* state, RowBatch* 
row_batch, bool* eos
     RETURN_IF_ERROR(IssueInitialScanRanges(state));
 
     // Release the scanner threads
-    ranges_issued_barrier_.Notify();
+    discard_result(ranges_issued_barrier_.Notify());
 
     if (progress_.done()) SetDone();
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/4ce2af9f/be/src/rpc/rpc-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr-test.cc b/be/src/rpc/rpc-mgr-test.cc
index ed91812..1e11099 100644
--- a/be/src/rpc/rpc-mgr-test.cc
+++ b/be/src/rpc/rpc-mgr-test.cc
@@ -225,8 +225,10 @@ TEST_F(RpcMgrTest, AsyncCall) {
     ScanMemResponsePB response;
     SetupScanMemRequest(&request, &controller);
     CountingBarrier barrier(1);
-    scan_mem_proxy->ScanMemAsync(request, &response, &controller,
-        [barrier_ptr = &barrier]() { barrier_ptr->Notify(); });
+    scan_mem_proxy->ScanMemAsync(
+        request, &response, &controller, [barrier_ptr = &barrier]() {
+          discard_result(barrier_ptr->Notify());
+        });
     // TODO: Inject random cancellation here.
     barrier.Wait();
     ASSERT_TRUE(controller.status().ok()) << controller.status().ToString();

http://git-wip-us.apache.org/repos/asf/impala/blob/4ce2af9f/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index b790f22..1567c9a 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -706,7 +706,7 @@ Status Coordinator::UpdateBackendExecStatus(const 
TReportExecStatusParams& param
               TNetworkAddressToString(backend_state->impalad_address())));
     }
     // We've applied all changes from the final status report - notify waiting 
threads.
-    backend_exec_complete_barrier_->Notify();
+    discard_result(backend_exec_complete_barrier_->Notify());
   }
   // If query execution has terminated, return a cancelled status to force the 
fragment
   // instance to stop executing.

http://git-wip-us.apache.org/repos/asf/impala/blob/4ce2af9f/be/src/util/counting-barrier.h
----------------------------------------------------------------------
diff --git a/be/src/util/counting-barrier.h b/be/src/util/counting-barrier.h
index 827c526..c1bc805 100644
--- a/be/src/util/counting-barrier.h
+++ b/be/src/util/counting-barrier.h
@@ -23,51 +23,88 @@
 
 namespace impala {
 
-/// Allows clients to wait for the arrival of a fixed number of notifications 
before they
-/// are allowed to continue.
-class CountingBarrier {
+/// Allows clients to wait for the arrival of a fixed number of notifications 
after which
+/// they are returned a value of type 'T' and allowed to continue.
+template <typename T>
+class TypedCountingBarrier {
  public:
-  /// Initialises the CountingBarrier with `count` pending notifications.
-  CountingBarrier(int32_t count) : count_(count) {
-    DCHECK_GT(count, 0);
-  }
+  /// Initialises the TypedCountingBarrier with `count` pending notifications.
+  TypedCountingBarrier(int32_t count) : count_(count) { DCHECK_GT(count, 0); }
 
   /// Sends one notification, decrementing the number of pending notifications 
by one.
   /// Returns the remaining pending notifications.
-  int32_t Notify() {
+  /// If this is the final notifier, it unblocks Wait() with the returned 
value as
+  /// 'promise_value'.
+  int32_t Notify(const T& promise_value) {
     int32_t result = count_.Add(-1);
-    if (result == 0) promise_.Set(true);
+    if (result == 0) promise_.Set(promise_value);
     return result;
   }
 
-  /// Sets the number of pending notifications to 0 and unblocks Wait().
-  void NotifyRemaining() {
+  /// Sets the number of pending notifications to 0 and unblocks Wait() with 
the returned
+  /// value as 'promise_value'.
+  void NotifyRemaining(const T& promise_value) {
     while (true) {
       int32_t value = count_.Load();
       if (value <= 0) return;  // count_ can legitimately drop below 0
       if (count_.CompareAndSwap(value, 0)) {
-        promise_.Set(true);
+        promise_.Set(promise_value);
         return;
       }
     }
   }
 
-  /// Blocks until all notifications are received.
-  void Wait() { promise_.Get(); }
+  /// Blocks until all notifications are received. Returns the value set by
+  /// Notify() or NotifyRemaining().
+  const T& Wait() { return promise_.Get(); }
 
   /// Blocks until all notifications are received, or until 'timeout_ms' 
passes, in which
-  /// case '*timed_out' will be true.
-  void Wait(int64_t timeout_ms, bool* timed_out) { promise_.Get(timeout_ms, 
timed_out); }
+  /// case '*timed_out' will be true. If '*timed_out' is false, then returns 
the value set
+  /// by Notify() or NotifyRemaining().
+  const T& Wait(int64_t timeout_ms, bool* timed_out) {
+    return promise_.Get(timeout_ms, timed_out);
+  }
 
   int32_t pending() const { return count_.Load(); }
 
  private:
   /// Used to signal waiters when all notifications are received.
-  Promise<bool> promise_;
+  Promise<T> promise_;
 
   /// The number of pending notifications remaining.
   AtomicInt32 count_;
 
+  DISALLOW_COPY_AND_ASSIGN(TypedCountingBarrier);
+};
+
+/// Wrapper around TypedCountingBarrier<T> which allows clients to wait for 
the arrival
+/// of a fixed number of notifications after which they are allowed to 
continue.
+class CountingBarrier {
+ public:
+  /// Initialises the CountingBarrier with `count` pending notifications.
+  CountingBarrier(int32_t count) : barrier_(count) { DCHECK_GT(count, 0); }
+
+  /// Sends one notification, decrementing the number of pending notifications 
by one.
+  /// Returns the remaining pending notifications.
+  int32_t Notify() { return barrier_.Notify(true); }
+
+  /// Sets the number of pending notifications to 0 and unblocks Wait().
+  void NotifyRemaining() { barrier_.NotifyRemaining(true); }
+
+  /// Blocks until all notifications are received.
+  void Wait() { discard_result(barrier_.Wait()); }
+
+  /// Blocks until all notifications are received, or until 'timeout_ms' 
passes, in which
+  /// case '*timed_out' will be true.
+  void Wait(int64_t timeout_ms, bool* timed_out) {
+    discard_result(barrier_.Wait(timeout_ms, timed_out));
+  }
+
+  int32_t pending() const { return barrier_.pending(); }
+
+ private:
+  TypedCountingBarrier<bool> barrier_;
+
   DISALLOW_COPY_AND_ASSIGN(CountingBarrier);
 };
 
@@ -79,9 +116,7 @@ class NotifyBarrierOnExit {
     DCHECK(b != NULL);
   }
 
-  ~NotifyBarrierOnExit() {
-    barrier->Notify();
-  }
+  ~NotifyBarrierOnExit() { discard_result(barrier->Notify()); }
 
  private:
   CountingBarrier* barrier;

http://git-wip-us.apache.org/repos/asf/impala/blob/4ce2af9f/be/src/util/hdfs-bulk-ops.cc
----------------------------------------------------------------------
diff --git a/be/src/util/hdfs-bulk-ops.cc b/be/src/util/hdfs-bulk-ops.cc
index c5c7ad5..53d5c7a 100644
--- a/be/src/util/hdfs-bulk-ops.cc
+++ b/be/src/util/hdfs-bulk-ops.cc
@@ -182,7 +182,7 @@ void HdfsOperationSet::AddError(const string& err, const 
HdfsOp* op) {
 }
 
 void HdfsOperationSet::MarkOneOpDone() {
-  ops_complete_barrier_->Notify();
+  discard_result(ops_complete_barrier_->Notify());
 }
 
 bool HdfsOperationSet::ShouldAbort() {

Reply via email to