IMPALA-7215: Implement a templatized CountingBarrier Currently, our CountingBarrier util only notifies a 'bool' value and uses an underlying Promise<bool>.
We're seeing cases in code where we might want to be notified of a different kind of Promise (other than bool). We add a templatized class TypedCountingBarrier<T> and convert CountingBarrier to use the TypedCountingBarrier<T> internally. This was identified while working on IMPALA-7163. Testing: Ran 'core' tests. Change-Id: I05fc79228250408ae16481ae7ff3491a90d26b8e Reviewed-on: http://gerrit.cloudera.org:8080/10827 Reviewed-by: Sailesh Mukil <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/4ce2af9f Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/4ce2af9f Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/4ce2af9f Branch: refs/heads/master Commit: 4ce2af9ff2340bba1bfb40a1ed3b5b39d6011f87 Parents: 8000c31 Author: Sailesh Mukil <[email protected]> Authored: Tue Jun 26 11:20:12 2018 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Thu Jun 28 21:18:25 2018 +0000 ---------------------------------------------------------------------- be/src/exec/hdfs-scan-node.cc | 2 +- be/src/rpc/rpc-mgr-test.cc | 6 ++- be/src/runtime/coordinator.cc | 2 +- be/src/util/counting-barrier.h | 75 +++++++++++++++++++++++++++---------- be/src/util/hdfs-bulk-ops.cc | 2 +- 5 files changed, 62 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/4ce2af9f/be/src/exec/hdfs-scan-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc index fe2eee7..e9c0864 100644 --- a/be/src/exec/hdfs-scan-node.cc +++ b/be/src/exec/hdfs-scan-node.cc @@ -81,7 +81,7 @@ Status HdfsScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos RETURN_IF_ERROR(IssueInitialScanRanges(state)); // Release the scanner threads - ranges_issued_barrier_.Notify(); + discard_result(ranges_issued_barrier_.Notify()); if (progress_.done()) SetDone(); } http://git-wip-us.apache.org/repos/asf/impala/blob/4ce2af9f/be/src/rpc/rpc-mgr-test.cc ---------------------------------------------------------------------- diff --git a/be/src/rpc/rpc-mgr-test.cc b/be/src/rpc/rpc-mgr-test.cc index ed91812..1e11099 100644 --- a/be/src/rpc/rpc-mgr-test.cc +++ b/be/src/rpc/rpc-mgr-test.cc @@ -225,8 +225,10 @@ TEST_F(RpcMgrTest, AsyncCall) { ScanMemResponsePB response; SetupScanMemRequest(&request, &controller); CountingBarrier barrier(1); - scan_mem_proxy->ScanMemAsync(request, &response, &controller, - [barrier_ptr = &barrier]() { barrier_ptr->Notify(); }); + scan_mem_proxy->ScanMemAsync( + request, &response, &controller, [barrier_ptr = &barrier]() { + discard_result(barrier_ptr->Notify()); + }); // TODO: Inject random cancellation here. barrier.Wait(); ASSERT_TRUE(controller.status().ok()) << controller.status().ToString(); http://git-wip-us.apache.org/repos/asf/impala/blob/4ce2af9f/be/src/runtime/coordinator.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc index b790f22..1567c9a 100644 --- a/be/src/runtime/coordinator.cc +++ b/be/src/runtime/coordinator.cc @@ -706,7 +706,7 @@ Status Coordinator::UpdateBackendExecStatus(const TReportExecStatusParams& param TNetworkAddressToString(backend_state->impalad_address()))); } // We've applied all changes from the final status report - notify waiting threads. - backend_exec_complete_barrier_->Notify(); + discard_result(backend_exec_complete_barrier_->Notify()); } // If query execution has terminated, return a cancelled status to force the fragment // instance to stop executing. http://git-wip-us.apache.org/repos/asf/impala/blob/4ce2af9f/be/src/util/counting-barrier.h ---------------------------------------------------------------------- diff --git a/be/src/util/counting-barrier.h b/be/src/util/counting-barrier.h index 827c526..c1bc805 100644 --- a/be/src/util/counting-barrier.h +++ b/be/src/util/counting-barrier.h @@ -23,51 +23,88 @@ namespace impala { -/// Allows clients to wait for the arrival of a fixed number of notifications before they -/// are allowed to continue. -class CountingBarrier { +/// Allows clients to wait for the arrival of a fixed number of notifications after which +/// they are returned a value of type 'T' and allowed to continue. +template <typename T> +class TypedCountingBarrier { public: - /// Initialises the CountingBarrier with `count` pending notifications. - CountingBarrier(int32_t count) : count_(count) { - DCHECK_GT(count, 0); - } + /// Initialises the TypedCountingBarrier with `count` pending notifications. + TypedCountingBarrier(int32_t count) : count_(count) { DCHECK_GT(count, 0); } /// Sends one notification, decrementing the number of pending notifications by one. /// Returns the remaining pending notifications. - int32_t Notify() { + /// If this is the final notifier, it unblocks Wait() with the returned value as + /// 'promise_value'. + int32_t Notify(const T& promise_value) { int32_t result = count_.Add(-1); - if (result == 0) promise_.Set(true); + if (result == 0) promise_.Set(promise_value); return result; } - /// Sets the number of pending notifications to 0 and unblocks Wait(). - void NotifyRemaining() { + /// Sets the number of pending notifications to 0 and unblocks Wait() with the returned + /// value as 'promise_value'. + void NotifyRemaining(const T& promise_value) { while (true) { int32_t value = count_.Load(); if (value <= 0) return; // count_ can legitimately drop below 0 if (count_.CompareAndSwap(value, 0)) { - promise_.Set(true); + promise_.Set(promise_value); return; } } } - /// Blocks until all notifications are received. - void Wait() { promise_.Get(); } + /// Blocks until all notifications are received. Returns the value set by + /// Notify() or NotifyRemaining(). + const T& Wait() { return promise_.Get(); } /// Blocks until all notifications are received, or until 'timeout_ms' passes, in which - /// case '*timed_out' will be true. - void Wait(int64_t timeout_ms, bool* timed_out) { promise_.Get(timeout_ms, timed_out); } + /// case '*timed_out' will be true. If '*timed_out' is false, then returns the value set + /// by Notify() or NotifyRemaining(). + const T& Wait(int64_t timeout_ms, bool* timed_out) { + return promise_.Get(timeout_ms, timed_out); + } int32_t pending() const { return count_.Load(); } private: /// Used to signal waiters when all notifications are received. - Promise<bool> promise_; + Promise<T> promise_; /// The number of pending notifications remaining. AtomicInt32 count_; + DISALLOW_COPY_AND_ASSIGN(TypedCountingBarrier); +}; + +/// Wrapper around TypedCountingBarrier<T> which allows clients to wait for the arrival +/// of a fixed number of notifications after which they are allowed to continue. +class CountingBarrier { + public: + /// Initialises the CountingBarrier with `count` pending notifications. + CountingBarrier(int32_t count) : barrier_(count) { DCHECK_GT(count, 0); } + + /// Sends one notification, decrementing the number of pending notifications by one. + /// Returns the remaining pending notifications. + int32_t Notify() { return barrier_.Notify(true); } + + /// Sets the number of pending notifications to 0 and unblocks Wait(). + void NotifyRemaining() { barrier_.NotifyRemaining(true); } + + /// Blocks until all notifications are received. + void Wait() { discard_result(barrier_.Wait()); } + + /// Blocks until all notifications are received, or until 'timeout_ms' passes, in which + /// case '*timed_out' will be true. + void Wait(int64_t timeout_ms, bool* timed_out) { + discard_result(barrier_.Wait(timeout_ms, timed_out)); + } + + int32_t pending() const { return barrier_.pending(); } + + private: + TypedCountingBarrier<bool> barrier_; + DISALLOW_COPY_AND_ASSIGN(CountingBarrier); }; @@ -79,9 +116,7 @@ class NotifyBarrierOnExit { DCHECK(b != NULL); } - ~NotifyBarrierOnExit() { - barrier->Notify(); - } + ~NotifyBarrierOnExit() { discard_result(barrier->Notify()); } private: CountingBarrier* barrier; http://git-wip-us.apache.org/repos/asf/impala/blob/4ce2af9f/be/src/util/hdfs-bulk-ops.cc ---------------------------------------------------------------------- diff --git a/be/src/util/hdfs-bulk-ops.cc b/be/src/util/hdfs-bulk-ops.cc index c5c7ad5..53d5c7a 100644 --- a/be/src/util/hdfs-bulk-ops.cc +++ b/be/src/util/hdfs-bulk-ops.cc @@ -182,7 +182,7 @@ void HdfsOperationSet::AddError(const string& err, const HdfsOp* op) { } void HdfsOperationSet::MarkOneOpDone() { - ops_complete_barrier_->Notify(); + discard_result(ops_complete_barrier_->Notify()); } bool HdfsOperationSet::ShouldAbort() {
