IMPALA-6134: Update code base to use impala::ConditionVariable boost::condtion_variable supports thread interruption which has some overhead. In some places we already use impala::ConditionVariable which is a very thin layer around pthread, therefore it has less overhead.
This commit substitues every boost::condition_variable in the codebase (except under kudu/) to impala::ConditionVariable. It also extends impala::ConditionVariable class to support waiting with a given timeout. The WaitFor function takes a duration as parameter. The WaitUntil function takes an absolute time as parameter. Change-Id: I3085c6dcb42350b61244df6e7f091a1e7db356c9 Reviewed-on: http://gerrit.cloudera.org:8080/8428 Reviewed-by: Tim Armstrong <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/1673e726 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/1673e726 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/1673e726 Branch: refs/heads/master Commit: 1673e726bb5dc64d316d699f2f63fa00ac321819 Parents: 9923b82 Author: Zoltan Borok-Nagy <[email protected]> Authored: Tue Oct 31 17:30:26 2017 +0100 Committer: Impala Public Jenkins <[email protected]> Committed: Wed Nov 8 02:16:26 2017 +0000 ---------------------------------------------------------------------- be/src/catalog/catalog-server.cc | 6 ++--- be/src/catalog/catalog-server.h | 3 ++- be/src/exec/plan-root-sink.cc | 16 ++++++------ be/src/exec/plan-root-sink.h | 7 +++-- be/src/rpc/thrift-server.cc | 12 ++++----- be/src/runtime/coordinator.cc | 6 ++--- be/src/runtime/coordinator.h | 4 +-- be/src/runtime/data-stream-mgr.h | 1 - be/src/runtime/data-stream-recvr.cc | 25 +++++++++--------- be/src/runtime/data-stream-sender.cc | 8 +++--- be/src/runtime/disk-io-mgr-internal.h | 11 ++++---- be/src/runtime/disk-io-mgr-reader-context.cc | 2 +- be/src/runtime/disk-io-mgr-scan-range.cc | 6 ++--- be/src/runtime/disk-io-mgr-stress.cc | 2 ++ be/src/runtime/disk-io-mgr-stress.h | 3 --- be/src/runtime/disk-io-mgr-test.cc | 17 ++++++------- be/src/runtime/disk-io-mgr.cc | 12 ++++----- be/src/runtime/disk-io-mgr.h | 4 +-- be/src/runtime/fragment-instance-state.cc | 16 +++++------- be/src/runtime/fragment-instance-state.h | 5 ++-- be/src/scheduling/admission-controller.cc | 8 +++--- be/src/scheduling/admission-controller.h | 3 ++- be/src/service/client-request-state.cc | 4 +-- be/src/service/client-request-state.h | 3 ++- be/src/service/impala-server.cc | 16 +++++------- be/src/service/impala-server.h | 5 ++-- be/src/statestore/statestore.h | 1 - be/src/util/blocking-queue.h | 4 +-- be/src/util/condition-variable.h | 31 ++++++++++++++++++----- be/src/util/promise.h | 11 ++++---- be/src/util/thread-pool.h | 7 ++--- 31 files changed, 137 insertions(+), 122 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/catalog/catalog-server.cc ---------------------------------------------------------------------- diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc index b4745fe..b004b22 100644 --- a/be/src/catalog/catalog-server.cc +++ b/be/src/catalog/catalog-server.cc @@ -194,7 +194,7 @@ Status CatalogServer::Start() { // Notify the thread to start for the first time. { lock_guard<mutex> l(catalog_lock_); - catalog_update_cv_.notify_one(); + catalog_update_cv_.NotifyOne(); } return Status::OK(); } @@ -253,7 +253,7 @@ void CatalogServer::UpdateCatalogTopicCallback( // Signal the catalog update gathering thread to start. topic_updates_ready_ = false; - catalog_update_cv_.notify_one(); + catalog_update_cv_.NotifyOne(); } [[noreturn]] void CatalogServer::GatherCatalogUpdatesThread() { @@ -264,7 +264,7 @@ void CatalogServer::UpdateCatalogTopicCallback( // when topic_updates_ready_ is false, otherwise we may be in the middle of // processing a heartbeat. while (topic_updates_ready_) { - catalog_update_cv_.wait(unique_lock); + catalog_update_cv_.Wait(unique_lock); } MonotonicStopWatch sw; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/catalog/catalog-server.h ---------------------------------------------------------------------- diff --git a/be/src/catalog/catalog-server.h b/be/src/catalog/catalog-server.h index 452a9b9..78a3f20 100644 --- a/be/src/catalog/catalog-server.h +++ b/be/src/catalog/catalog-server.h @@ -29,6 +29,7 @@ #include "gen-cpp/Types_types.h" #include "catalog/catalog.h" #include "statestore/statestore-subscriber.h" +#include "util/condition-variable.h" #include "util/metrics.h" #include "rapidjson/rapidjson.h" @@ -95,7 +96,7 @@ class CatalogServer { /// fetch its next set of updates from the JniCatalog. At the end of each statestore /// heartbeat, this CV is signaled and the catalog_update_gathering_thread_ starts /// querying the JniCatalog for catalog objects. Protected by the catalog_lock_. - boost::condition_variable catalog_update_cv_; + ConditionVariable catalog_update_cv_; /// The latest available set of catalog topic updates (additions/modifications, and /// deletions). Set by the catalog_update_gathering_thread_ and protected by http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/exec/plan-root-sink.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/plan-root-sink.cc b/be/src/exec/plan-root-sink.cc index 9c20ff3..c4ad604 100644 --- a/be/src/exec/plan-root-sink.cc +++ b/be/src/exec/plan-root-sink.cc @@ -73,7 +73,7 @@ Status PlanRootSink::Send(RuntimeState* state, RowBatch* batch) { // written clients may not cope correctly with them. See IMPALA-4335. while (current_batch_row < batch->num_rows()) { unique_lock<mutex> l(lock_); - while (results_ == nullptr && !consumer_done_) sender_cv_.wait(l); + while (results_ == nullptr && !consumer_done_) sender_cv_.Wait(l); if (consumer_done_ || batch == nullptr) { eos_ = true; return Status::OK(); @@ -101,7 +101,7 @@ Status PlanRootSink::Send(RuntimeState* state, RowBatch* batch) { expr_results_pool_->Clear(); // Signal the consumer. results_ = nullptr; - consumer_cv_.notify_all(); + consumer_cv_.NotifyAll(); } return Status::OK(); } @@ -110,7 +110,7 @@ Status PlanRootSink::FlushFinal(RuntimeState* state) { unique_lock<mutex> l(lock_); sender_done_ = true; eos_ = true; - consumer_cv_.notify_all(); + consumer_cv_.NotifyAll(); return Status::OK(); } @@ -119,17 +119,17 @@ void PlanRootSink::Close(RuntimeState* state) { // No guarantee that FlushFinal() has been called, so need to mark sender_done_ here as // well. sender_done_ = true; - consumer_cv_.notify_all(); + consumer_cv_.NotifyAll(); // Wait for consumer to be done, in case sender tries to tear-down this sink while the // sender is still reading from it. - while (!consumer_done_) sender_cv_.wait(l); + while (!consumer_done_) sender_cv_.Wait(l); DataSink::Close(state); } void PlanRootSink::CloseConsumer() { unique_lock<mutex> l(lock_); consumer_done_ = true; - sender_cv_.notify_all(); + sender_cv_.NotifyAll(); } Status PlanRootSink::GetNext( @@ -138,9 +138,9 @@ Status PlanRootSink::GetNext( results_ = results; num_rows_requested_ = num_results; - sender_cv_.notify_all(); + sender_cv_.NotifyAll(); - while (!eos_ && results_ != nullptr && !sender_done_) consumer_cv_.wait(l); + while (!eos_ && results_ != nullptr && !sender_done_) consumer_cv_.Wait(l); *eos = eos_; return state->GetQueryStatus(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/exec/plan-root-sink.h ---------------------------------------------------------------------- diff --git a/be/src/exec/plan-root-sink.h b/be/src/exec/plan-root-sink.h index 654bd27..f367d04 100644 --- a/be/src/exec/plan-root-sink.h +++ b/be/src/exec/plan-root-sink.h @@ -19,8 +19,7 @@ #define IMPALA_EXEC_PLAN_ROOT_SINK_H #include "exec/data-sink.h" - -#include <boost/thread/condition_variable.hpp> +#include "util/condition-variable.h" namespace impala { @@ -95,12 +94,12 @@ class PlanRootSink : public DataSink { /// num_rows_requested_, and so the sender may begin satisfying that request for rows /// from its current batch. Also signalled when CloseConsumer() is called, to unblock /// the sender. - boost::condition_variable sender_cv_; + ConditionVariable sender_cv_; /// Waited on by the consumer only. Signalled when the sender has finished serving a /// request for rows. Also signalled by Close() and FlushFinal() to signal to the /// consumer that no more rows are coming. - boost::condition_variable consumer_cv_; + ConditionVariable consumer_cv_; /// Signals to producer that the consumer is done, and the sink may be torn down. bool consumer_done_ = false; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/rpc/thrift-server.cc ---------------------------------------------------------------------- diff --git a/be/src/rpc/thrift-server.cc b/be/src/rpc/thrift-server.cc index 5bf47b2..ab51315 100644 --- a/be/src/rpc/thrift-server.cc +++ b/be/src/rpc/thrift-server.cc @@ -18,7 +18,6 @@ #include <boost/filesystem.hpp> #include <boost/thread.hpp> #include <boost/thread/mutex.hpp> -#include <boost/thread/condition_variable.hpp> #include <boost/uuid/uuid_io.hpp> #include <thrift/concurrency/Thread.h> @@ -37,6 +36,7 @@ #include "rpc/authentication.h" #include "rpc/thrift-server.h" #include "rpc/thrift-thread.h" +#include "util/condition-variable.h" #include "util/debug-util.h" #include "util/metrics.h" #include "util/network-util.h" @@ -139,13 +139,13 @@ class ThriftServer::ThriftServerEventProcessor : public TServerEventHandler { private: // Lock used to ensure that there are no missed notifications between starting the - // supervision thread and calling signal_cond_.timed_wait. Also used to ensure + // supervision thread and calling signal_cond_.WaitUntil. Also used to ensure // thread-safe access to members of thrift_server_ boost::mutex signal_lock_; // Condition variable that is notified by the supervision thread once either // a) all is well or b) an error occurred. - boost::condition_variable signal_cond_; + ConditionVariable signal_cond_; // The ThriftServer under management. This class is a friend of ThriftServer, and // reaches in to change member variables at will. @@ -179,7 +179,7 @@ Status ThriftServer::ThriftServerEventProcessor::StartAndWaitForServer() { // visibility. while (!signal_fired_) { // Yields lock and allows supervision thread to continue and signal - if (!signal_cond_.timed_wait(lock, deadline)) { + if (!signal_cond_.WaitUntil(lock, deadline)) { stringstream ss; ss << "ThriftServer '" << thrift_server_->name_ << "' (on port: " << thrift_server_->port_ << ") did not start within " @@ -220,7 +220,7 @@ void ThriftServer::ThriftServerEventProcessor::Supervise() { // failure, for example. signal_fired_ = true; } - signal_cond_.notify_all(); + signal_cond_.NotifyAll(); } void ThriftServer::ThriftServerEventProcessor::preServe() { @@ -234,7 +234,7 @@ void ThriftServer::ThriftServerEventProcessor::preServe() { thrift_server_->started_ = true; // Should only be one thread waiting on signal_cond_, but wake all just in case. - signal_cond_.notify_all(); + signal_cond_.NotifyAll(); } // This thread-local variable contains the current connection context for whichever http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/runtime/coordinator.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc index 7fe6fb7..ae0c9ad 100644 --- a/be/src/runtime/coordinator.cc +++ b/be/src/runtime/coordinator.cc @@ -796,7 +796,7 @@ Status Coordinator::WaitForBackendCompletion() { while (num_remaining_backends_ > 0 && query_status_.ok()) { VLOG_QUERY << "Coordinator waiting for backends to finish, " << num_remaining_backends_ << " remaining"; - backend_completion_cv_.wait(l); + backend_completion_cv_.Wait(l); } if (query_status_.ok()) { VLOG_QUERY << "All backends finished successfully."; @@ -913,7 +913,7 @@ void Coordinator::CancelInternal() { VLOG_QUERY << Substitute( "CancelBackends() query_id=$0, tried to cancel $1 backends", PrintId(query_id()), num_cancelled); - backend_completion_cv_.notify_all(); + backend_completion_cv_.NotifyAll(); ReleaseExecResourcesLocked(); ReleaseAdmissionControlResourcesLocked(); @@ -966,7 +966,7 @@ Status Coordinator::UpdateBackendExecStatus(const TReportExecStatusParams& param BackendState::LogFirstInProgress(backend_states_); } if (--num_remaining_backends_ == 0 || !status.ok()) { - backend_completion_cv_.notify_all(); + backend_completion_cv_.NotifyAll(); } return Status::OK(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/runtime/coordinator.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h index 3549ae9..e7ddee9 100644 --- a/be/src/runtime/coordinator.h +++ b/be/src/runtime/coordinator.h @@ -28,7 +28,6 @@ #include <boost/accumulators/statistics/stats.hpp> #include <boost/accumulators/statistics/variance.hpp> #include <boost/scoped_ptr.hpp> -#include <boost/thread/condition_variable.hpp> #include <boost/thread/mutex.hpp> #include <boost/unordered_map.hpp> #include <boost/unordered_set.hpp> @@ -41,6 +40,7 @@ #include "gen-cpp/Types_types.h" #include "runtime/runtime-state.h" // for PartitionStatusMap; TODO: disentangle #include "scheduling/query-schedule.h" +#include "util/condition-variable.h" #include "util/progress-updater.h" namespace impala { @@ -298,7 +298,7 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save /// If there is no coordinator fragment, Wait() simply waits until all /// backends report completion by notifying on backend_completion_cv_. /// Tied to lock_. - boost::condition_variable backend_completion_cv_; + ConditionVariable backend_completion_cv_; /// Count of the number of backends for which done != true. When this /// hits 0, any Wait()'ing thread is notified http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/runtime/data-stream-mgr.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/data-stream-mgr.h b/be/src/runtime/data-stream-mgr.h index eff468e..1e9f4b6 100644 --- a/be/src/runtime/data-stream-mgr.h +++ b/be/src/runtime/data-stream-mgr.h @@ -22,7 +22,6 @@ #include <list> #include <set> #include <boost/thread/mutex.hpp> -#include <boost/thread/condition_variable.hpp> #include <boost/unordered_map.hpp> #include <boost/unordered_set.hpp> http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/runtime/data-stream-recvr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/data-stream-recvr.cc b/be/src/runtime/data-stream-recvr.cc index d8150eb..cdea4a0 100644 --- a/be/src/runtime/data-stream-recvr.cc +++ b/be/src/runtime/data-stream-recvr.cc @@ -23,13 +23,12 @@ #include "runtime/mem-tracker.h" #include "runtime/row-batch.h" #include "runtime/sorted-run-merger.h" +#include "util/condition-variable.h" #include "util/runtime-profile-counters.h" #include "util/periodic-counter-updater.h" #include "common/names.h" -using boost::condition_variable; - namespace impala { // Implements a blocking queue of row batches from one or more senders. One queue @@ -83,10 +82,10 @@ class DataStreamRecvr::SenderQueue { int num_remaining_senders_; // signal arrival of new batch or the eos/cancelled condition - condition_variable data_arrival_cv_; + ConditionVariable data_arrival_cv_; // signal removal of data by stream consumer - condition_variable data_removal__cv_; + ConditionVariable data_removal__cv_; // queue of (batch length, batch) pairs. The SenderQueue block owns memory to // these batches. They are handed off to the caller via GetBatch. @@ -120,7 +119,7 @@ Status DataStreamRecvr::SenderQueue::GetBatch(RowBatch** next_batch) { CANCEL_SAFE_SCOPED_TIMER( received_first_batch_ ? NULL : recvr_->first_batch_wait_total_timer_, &is_cancelled_); - data_arrival_cv_.wait(l); + data_arrival_cv_.Wait(l); } // cur_batch_ must be replaced with the returned batch. @@ -140,7 +139,7 @@ Status DataStreamRecvr::SenderQueue::GetBatch(RowBatch** next_batch) { recvr_->num_buffered_bytes_.Add(-batch_queue_.front().first); VLOG_ROW << "fetched #rows=" << result->num_rows(); batch_queue_.pop_front(); - data_removal__cv_.notify_one(); + data_removal__cv_.NotifyOne(); current_batch_.reset(result); *next_batch = current_batch_.get(); return Status::OK(); @@ -175,10 +174,10 @@ void DataStreamRecvr::SenderQueue::AddBatch(const TRowBatch& thrift_batch) { try_mutex::scoped_try_lock timer_lock(recvr_->buffer_wall_timer_lock_); if (timer_lock) { CANCEL_SAFE_SCOPED_TIMER(recvr_->buffer_full_wall_timer_, &is_cancelled_); - data_removal__cv_.wait(l); + data_removal__cv_.Wait(l); got_timer_lock = true; } else { - data_removal__cv_.wait(l); + data_removal__cv_.Wait(l); got_timer_lock = false; } } @@ -197,7 +196,7 @@ void DataStreamRecvr::SenderQueue::AddBatch(const TRowBatch& thrift_batch) { // time it takes this thread to finish (and yield lock_) and the // notified thread to be woken up and to acquire the try_lock. In // practice, this time is small relative to the total wait time. - if (got_timer_lock) data_removal__cv_.notify_one(); + if (got_timer_lock) data_removal__cv_.NotifyOne(); } if (!is_cancelled_) { @@ -213,7 +212,7 @@ void DataStreamRecvr::SenderQueue::AddBatch(const TRowBatch& thrift_batch) { << " batch_size=" << batch_size << "\n"; batch_queue_.push_back(make_pair(batch_size, batch)); recvr_->num_buffered_bytes_.Add(batch_size); - data_arrival_cv_.notify_one(); + data_arrival_cv_.NotifyOne(); } } @@ -225,7 +224,7 @@ void DataStreamRecvr::SenderQueue::DecrementSenders() { << recvr_->fragment_instance_id() << " node_id=" << recvr_->dest_node_id() << " #senders=" << num_remaining_senders_; - if (num_remaining_senders_ == 0) data_arrival_cv_.notify_one(); + if (num_remaining_senders_ == 0) data_arrival_cv_.NotifyOne(); } void DataStreamRecvr::SenderQueue::Cancel() { @@ -239,8 +238,8 @@ void DataStreamRecvr::SenderQueue::Cancel() { } // Wake up all threads waiting to produce/consume batches. They will all // notice that the stream is cancelled and handle it. - data_arrival_cv_.notify_all(); - data_removal__cv_.notify_all(); + data_arrival_cv_.NotifyAll(); + data_removal__cv_.NotifyAll(); } void DataStreamRecvr::SenderQueue::Close() { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/runtime/data-stream-sender.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/data-stream-sender.cc b/be/src/runtime/data-stream-sender.cc index dc26a23..ed156dc 100644 --- a/be/src/runtime/data-stream-sender.cc +++ b/be/src/runtime/data-stream-sender.cc @@ -33,6 +33,7 @@ #include "runtime/mem-tracker.h" #include "runtime/backend-client.h" #include "util/aligned-new.h" +#include "util/condition-variable.h" #include "util/debug-util.h" #include "util/network-util.h" #include "util/thread-pool.h" @@ -45,7 +46,6 @@ #include "common/names.h" -using boost::condition_variable; using namespace apache::thrift; using namespace apache::thrift::protocol; using namespace apache::thrift::transport; @@ -133,7 +133,7 @@ class DataStreamSender::Channel : public CacheLineAligned { // TODO: if the order of row batches does not matter, we can consider increasing // the number of threads. ThreadPool<TRowBatch*> rpc_thread_; // sender thread. - condition_variable rpc_done_cv_; // signaled when rpc_in_flight_ is set to true. + ConditionVariable rpc_done_cv_; // signaled when rpc_in_flight_ is set to true. mutex rpc_thread_lock_; // Lock with rpc_done_cv_ protecting rpc_in_flight_ bool rpc_in_flight_; // true if the rpc_thread_ is busy sending. @@ -188,7 +188,7 @@ void DataStreamSender::Channel::TransmitData(int thread_id, const TRowBatch* bat unique_lock<mutex> l(rpc_thread_lock_); rpc_in_flight_ = false; } - rpc_done_cv_.notify_one(); + rpc_done_cv_.NotifyOne(); } void DataStreamSender::Channel::TransmitDataHelper(const TRowBatch* batch) { @@ -239,7 +239,7 @@ void DataStreamSender::Channel::WaitForRpc() { SCOPED_TIMER(parent_->state_->total_network_send_timer()); unique_lock<mutex> l(rpc_thread_lock_); while (rpc_in_flight_) { - rpc_done_cv_.wait(l); + rpc_done_cv_.Wait(l); } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/runtime/disk-io-mgr-internal.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr-internal.h b/be/src/runtime/disk-io-mgr-internal.h index a9acca4..138f3f0 100644 --- a/be/src/runtime/disk-io-mgr-internal.h +++ b/be/src/runtime/disk-io-mgr-internal.h @@ -28,6 +28,7 @@ #include "runtime/disk-io-mgr.h" #include "runtime/mem-tracker.h" #include "runtime/thread-resource-mgr.h" +#include "util/condition-variable.h" #include "util/cpu-info.h" #include "util/debug-util.h" #include "util/disk-info.h" @@ -51,7 +52,7 @@ struct DiskIoMgr::DiskQueue { /// thread should shut down. A disk thread will be woken up when there is a reader /// added to the queue. A reader is only on the queue when it has at least one /// scan range that is not blocked on available buffers. - boost::condition_variable work_available; + ConditionVariable work_available; /// list of all request contexts that have work queued on this disk std::list<DiskIoRequestContext*> request_contexts; @@ -65,7 +66,7 @@ struct DiskIoMgr::DiskQueue { request_contexts.end()); request_contexts.push_back(worker); } - work_available.notify_all(); + work_available.NotifyAll(); } DiskQueue(int id) : disk_id(id) { } @@ -157,7 +158,7 @@ class DiskIoRequestContext { // boost doesn't let us dcheck that the reader lock is taken DCHECK_GT(num_disks_with_ranges_, 0); if (--num_disks_with_ranges_ == 0) { - disks_complete_cond_var_.notify_all(); + disks_complete_cond_var_.NotifyAll(); } DCHECK(Validate()) << std::endl << DebugString(); } @@ -289,13 +290,13 @@ class DiskIoRequestContext { /// We currently populate one range per disk. /// TODO: think about this some more. InternalQueue<ScanRange> ready_to_start_ranges_; - boost::condition_variable ready_to_start_ranges_cv_; // used with lock_ + ConditionVariable ready_to_start_ranges_cv_; // used with lock_ /// Ranges that are blocked due to back pressure on outgoing buffers. InternalQueue<ScanRange> blocked_ranges_; /// Condition variable for UnregisterContext() to wait for all disks to complete - boost::condition_variable disks_complete_cond_var_; + ConditionVariable disks_complete_cond_var_; /// Struct containing state per disk. See comments in the disk read loop on how /// they are used. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/runtime/disk-io-mgr-reader-context.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr-reader-context.cc b/be/src/runtime/disk-io-mgr-reader-context.cc index 77f332a..afe2b23 100644 --- a/be/src/runtime/disk-io-mgr-reader-context.cc +++ b/be/src/runtime/disk-io-mgr-reader-context.cc @@ -88,7 +88,7 @@ void DiskIoRequestContext::Cancel(const Status& status) { // Signal reader and unblock the GetNext/Read thread. That read will fail with // a cancelled status. - ready_to_start_ranges_cv_.notify_all(); + ready_to_start_ranges_cv_.NotifyAll(); } void DiskIoRequestContext::AddRequestRange( http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/runtime/disk-io-mgr-scan-range.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr-scan-range.cc b/be/src/runtime/disk-io-mgr-scan-range.cc index c5c9514..7f0692e 100644 --- a/be/src/runtime/disk-io-mgr-scan-range.cc +++ b/be/src/runtime/disk-io-mgr-scan-range.cc @@ -67,7 +67,7 @@ bool DiskIoMgr::ScanRange::EnqueueBuffer( blocked_on_queue_ = ready_buffers_.size() == SCAN_RANGE_READY_BUFFER_LIMIT; } - buffer_ready_cv_.notify_one(); + buffer_ready_cv_.NotifyOne(); return blocked_on_queue_; } @@ -81,7 +81,7 @@ Status DiskIoMgr::ScanRange::GetNext(unique_ptr<BufferDescriptor>* buffer) { DCHECK(Validate()) << DebugString(); while (ready_buffers_.empty() && !is_cancelled_) { - buffer_ready_cv_.wait(scan_range_lock); + buffer_ready_cv_.Wait(scan_range_lock); } if (is_cancelled_) { @@ -153,7 +153,7 @@ void DiskIoMgr::ScanRange::Cancel(const Status& status) { is_cancelled_ = true; status_ = status; } - buffer_ready_cv_.notify_all(); + buffer_ready_cv_.NotifyAll(); CleanupQueuedBuffers(); // For cached buffers, we can't close the range until the cached buffer is returned. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/runtime/disk-io-mgr-stress.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr-stress.cc b/be/src/runtime/disk-io-mgr-stress.cc index 658b747..3959194 100644 --- a/be/src/runtime/disk-io-mgr-stress.cc +++ b/be/src/runtime/disk-io-mgr-stress.cc @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +#include <boost/thread/mutex.hpp> + #include "runtime/disk-io-mgr-stress.h" #include "util/time.h" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/runtime/disk-io-mgr-stress.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr-stress.h b/be/src/runtime/disk-io-mgr-stress.h index 6d7549e..0a66f2c 100644 --- a/be/src/runtime/disk-io-mgr-stress.h +++ b/be/src/runtime/disk-io-mgr-stress.h @@ -22,10 +22,7 @@ #include <memory> #include <vector> #include <boost/scoped_ptr.hpp> -#include <boost/thread/mutex.hpp> -#include <boost/thread/condition_variable.hpp> #include <boost/thread/thread.hpp> -#include <boost/unordered_map.hpp> #include "runtime/disk-io-mgr.h" #include "runtime/mem-tracker.h" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/runtime/disk-io-mgr-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr-test.cc b/be/src/runtime/disk-io-mgr-test.cc index a6a719f..8d36ea6 100644 --- a/be/src/runtime/disk-io-mgr-test.cc +++ b/be/src/runtime/disk-io-mgr-test.cc @@ -27,6 +27,7 @@ #include "runtime/disk-io-mgr-stress.h" #include "runtime/mem-tracker.h" #include "runtime/thread-resource-mgr.h" +#include "util/condition-variable.h" #include "util/cpu-info.h" #include "util/disk-info.h" #include "util/thread.h" @@ -37,8 +38,6 @@ DECLARE_int32(num_remote_hdfs_io_threads); DECLARE_int32(num_s3_io_threads); DECLARE_int32(num_adls_io_threads); -using boost::condition_variable; - const int MIN_BUFFER_SIZE = 512; const int MAX_BUFFER_SIZE = 1024; const int LARGE_MEM_LIMIT = 1024 * 1024 * 1024; @@ -72,7 +71,7 @@ class DiskIoMgrTest : public testing::Test { { lock_guard<mutex> l(written_mutex_); ++num_ranges_written_; - if (num_ranges_written_ == num_writes) writes_done_.notify_one(); + if (num_ranges_written_ == num_writes) writes_done_.NotifyOne(); } } @@ -81,7 +80,7 @@ class DiskIoMgrTest : public testing::Test { { lock_guard<mutex> l(written_mutex_); ++num_ranges_written_; - if (num_ranges_written_ == num_writes) writes_done_.notify_all(); + if (num_ranges_written_ == num_writes) writes_done_.NotifyAll(); } } @@ -179,7 +178,7 @@ class DiskIoMgrTest : public testing::Test { ObjectPool pool_; mutex written_mutex_; - condition_variable writes_done_; + ConditionVariable writes_done_; int num_ranges_written_; }; @@ -229,7 +228,7 @@ TEST_F(DiskIoMgrTest, SingleWriter) { { unique_lock<mutex> lock(written_mutex_); - while (num_ranges_written_ < num_ranges) writes_done_.wait(lock); + while (num_ranges_written_ < num_ranges) writes_done_.Wait(lock); } num_ranges_written_ = 0; io_mgr.UnregisterContext(writer); @@ -283,7 +282,7 @@ TEST_F(DiskIoMgrTest, InvalidWrite) { { unique_lock<mutex> lock(written_mutex_); - while (num_ranges_written_ < 2) writes_done_.wait(lock); + while (num_ranges_written_ < 2) writes_done_.Wait(lock); } num_ranges_written_ = 0; io_mgr.UnregisterContext(writer); @@ -342,7 +341,7 @@ TEST_F(DiskIoMgrTest, SingleWriterCancel) { { unique_lock<mutex> lock(written_mutex_); - while (num_ranges_written_ < num_ranges_before_cancel) writes_done_.wait(lock); + while (num_ranges_written_ < num_ranges_before_cancel) writes_done_.Wait(lock); } num_ranges_written_ = 0; io_mgr.UnregisterContext(writer); @@ -810,7 +809,7 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) { { unique_lock<mutex> lock(written_mutex_); - while (num_ranges_written_ < num_write_ranges) writes_done_.wait(lock); + while (num_ranges_written_ < num_write_ranges) writes_done_.Wait(lock); } threads.join_all(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/runtime/disk-io-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc index 54dfc98..8af70f5 100644 --- a/be/src/runtime/disk-io-mgr.cc +++ b/be/src/runtime/disk-io-mgr.cc @@ -343,7 +343,7 @@ DiskIoMgr::~DiskIoMgr() { // to shut_down_ are protected. unique_lock<mutex> disk_lock(disk_queues_[i]->lock); } - disk_queues_[i]->work_available.notify_all(); + disk_queues_[i]->work_available.NotifyAll(); } disk_thread_group_.JoinAll(); @@ -469,7 +469,7 @@ void DiskIoMgr::CancelContext(DiskIoRequestContext* context, bool wait_for_disks unique_lock<mutex> lock(context->lock_); DCHECK(context->Validate()) << endl << context->DebugString(); while (context->num_disks_with_ranges_ > 0) { - context->disks_complete_cond_var_.wait(lock); + context->disks_complete_cond_var_.Wait(lock); } } } @@ -639,7 +639,7 @@ Status DiskIoMgr::GetNextRange(DiskIoRequestContext* reader, ScanRange** range) } if (reader->ready_to_start_ranges_.empty()) { - reader->ready_to_start_ranges_cv_.wait(reader_lock); + reader->ready_to_start_ranges_cv_.Wait(reader_lock); } else { *range = reader->ready_to_start_ranges_.Dequeue(); DCHECK(*range != nullptr); @@ -846,7 +846,7 @@ bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range, while (!shut_down_ && disk_queue->request_contexts.empty()) { // wait if there are no readers on the queue - disk_queue->work_available.wait(disk_lock); + disk_queue->work_available.Wait(disk_lock); } if (shut_down_) break; DCHECK(!disk_queue->request_contexts.empty()); @@ -909,9 +909,9 @@ bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range, // All the ranges have been started, notify everyone blocked on GetNextRange. // Only one of them will get work so make sure to return nullptr to the other // caller threads. - (*request_context)->ready_to_start_ranges_cv_.notify_all(); + (*request_context)->ready_to_start_ranges_cv_.NotifyAll(); } else { - (*request_context)->ready_to_start_ranges_cv_.notify_one(); + (*request_context)->ready_to_start_ranges_cv_.NotifyOne(); } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/runtime/disk-io-mgr.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr.h b/be/src/runtime/disk-io-mgr.h index ed33942..e7a7122 100644 --- a/be/src/runtime/disk-io-mgr.h +++ b/be/src/runtime/disk-io-mgr.h @@ -25,7 +25,6 @@ #include <boost/scoped_ptr.hpp> #include <boost/unordered_set.hpp> #include <boost/thread/mutex.hpp> -#include <boost/thread/condition_variable.hpp> #include "common/atomic.h" #include "common/hdfs.h" @@ -35,6 +34,7 @@ #include "runtime/thread-resource-mgr.h" #include "util/aligned-new.h" #include "util/bit-util.h" +#include "util/condition-variable.h" #include "util/error-util.h" #include "util/internal-queue.h" #include "util/runtime-profile.h" @@ -563,7 +563,7 @@ class DiskIoMgr : public CacheLineAligned { /// IO buffers that are queued for this scan range. /// Condition variable for GetNext - boost::condition_variable buffer_ready_cv_; + ConditionVariable buffer_ready_cv_; std::deque<std::unique_ptr<BufferDescriptor>> ready_buffers_; /// Lock that should be taken during hdfs calls. Only one thread (the disk reading http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/runtime/fragment-instance-state.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc index a7d3c86..f957dd1 100644 --- a/be/src/runtime/fragment-instance-state.cc +++ b/be/src/runtime/fragment-instance-state.cc @@ -230,7 +230,7 @@ Status FragmentInstanceState::Prepare() { thread_name, [this]() { this->ReportProfileThread(); }, &report_thread_, true)); // Make sure the thread started up, otherwise ReportProfileThread() might get into // a race with StopReportThread(). - while (!report_thread_active_) report_thread_started_cv_.wait(l); + while (!report_thread_active_) report_thread_started_cv_.Wait(l); } return Status::OK(); @@ -322,28 +322,26 @@ void FragmentInstanceState::ReportProfileThread() { unique_lock<mutex> l(report_thread_lock_); // tell Prepare() that we started report_thread_active_ = true; - report_thread_started_cv_.notify_one(); + report_thread_started_cv_.NotifyOne(); // Jitter the reporting time of remote fragments by a random amount between // 0 and the report_interval. This way, the coordinator doesn't get all the // updates at once so its better for contention as well as smoother progress // reporting. int report_fragment_offset = rand() % FLAGS_status_report_interval; - boost::system_time timeout = boost::get_system_time() - + boost::posix_time::seconds(report_fragment_offset); + boost::posix_time::seconds wait_duration(report_fragment_offset); // We don't want to wait longer than it takes to run the entire fragment. - stop_report_thread_cv_.timed_wait(l, timeout); + stop_report_thread_cv_.WaitFor(l, wait_duration); while (report_thread_active_) { - boost::system_time timeout = boost::get_system_time() - + boost::posix_time::seconds(FLAGS_status_report_interval); + boost::posix_time::seconds loop_wait_duration(FLAGS_status_report_interval); // timed_wait can return because the timeout occurred or the condition variable // was signaled. We can't rely on its return value to distinguish between the // two cases (e.g. there is a race here where the wait timed out but before grabbing // the lock, the condition variable was signaled). Instead, we will use an external // flag, report_thread_active_, to coordinate this. - stop_report_thread_cv_.timed_wait(l, timeout); + stop_report_thread_cv_.WaitFor(l, loop_wait_duration); if (!report_thread_active_) break; SendReport(false, Status::OK()); @@ -378,7 +376,7 @@ void FragmentInstanceState::StopReportThread() { lock_guard<mutex> l(report_thread_lock_); report_thread_active_ = false; } - stop_report_thread_cv_.notify_one(); + stop_report_thread_cv_.NotifyOne(); report_thread_->Join(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/runtime/fragment-instance-state.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/fragment-instance-state.h b/be/src/runtime/fragment-instance-state.h index fa35c6b..f540b56 100644 --- a/be/src/runtime/fragment-instance-state.h +++ b/be/src/runtime/fragment-instance-state.h @@ -28,6 +28,7 @@ #include "gen-cpp/ImpalaInternalService_types.h" #include "runtime/row-batch.h" +#include "util/condition-variable.h" #include "util/promise.h" #include "util/runtime-profile.h" @@ -138,11 +139,11 @@ class FragmentInstanceState { /// Indicates that profile reporting thread should stop. /// Tied to report_thread_lock_. - boost::condition_variable stop_report_thread_cv_; + ConditionVariable stop_report_thread_cv_; /// Indicates that profile reporting thread started. /// Tied to report_thread_lock_. - boost::condition_variable report_thread_started_cv_; + ConditionVariable report_thread_started_cv_; /// When the report thread starts, it sets report_thread_active_ to true and signals /// report_thread_started_cv_. The report thread is shut down by setting http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/scheduling/admission-controller.cc ---------------------------------------------------------------------- diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc index 8994a5b..99f659a 100644 --- a/be/src/scheduling/admission-controller.cc +++ b/be/src/scheduling/admission-controller.cc @@ -233,7 +233,7 @@ AdmissionController::~AdmissionController() { // Lock to ensure the dequeue thread will see the update to done_ lock_guard<mutex> l(admission_ctrl_lock_); done_ = true; - dequeue_cv_.notify_one(); + dequeue_cv_.NotifyOne(); } dequeue_thread_->Join(); } @@ -620,7 +620,7 @@ void AdmissionController::ReleaseQuery(const QuerySchedule& schedule) { VLOG_RPC << "Released query id=" << schedule.query_id() << " " << stats->DebugString(); } - dequeue_cv_.notify_one(); + dequeue_cv_.NotifyOne(); } // Statestore subscriber callback for IMPALA_REQUEST_QUEUE_TOPIC. @@ -648,7 +648,7 @@ void AdmissionController::UpdatePoolStats( } UpdateClusterAggregates(); } - dequeue_cv_.notify_one(); // Dequeue and admit queries on the dequeue thread + dequeue_cv_.NotifyOne(); // Dequeue and admit queries on the dequeue thread } void AdmissionController::PoolStats::UpdateRemoteStats(const string& host_id, @@ -821,7 +821,7 @@ void AdmissionController::DequeueLoop() { while (true) { unique_lock<mutex> lock(admission_ctrl_lock_); if (done_) break; - dequeue_cv_.wait(lock); + dequeue_cv_.Wait(lock); for (const PoolConfigMap::value_type& entry: pool_config_map_) { const string& pool_name = entry.first; const TPoolConfig& pool_config = entry.second; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/scheduling/admission-controller.h ---------------------------------------------------------------------- diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h index 0cb9f2a..2830bee 100644 --- a/be/src/scheduling/admission-controller.h +++ b/be/src/scheduling/admission-controller.h @@ -31,6 +31,7 @@ #include "scheduling/request-pool-service.h" #include "scheduling/query-schedule.h" #include "statestore/statestore-subscriber.h" +#include "util/condition-variable.h" #include "util/internal-queue.h" #include "util/thread.h" @@ -409,7 +410,7 @@ class AdmissionController { /// Notifies the dequeuing thread that pool stats have changed and it may be /// possible to dequeue and admit queries. - boost::condition_variable dequeue_cv_; + ConditionVariable dequeue_cv_; /// If true, tear down the dequeuing thread. This only happens in unit tests. bool done_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/service/client-request-state.cc ---------------------------------------------------------------------- diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc index 523e4ae..5d00d24 100644 --- a/be/src/service/client-request-state.cc +++ b/be/src/service/client-request-state.cc @@ -602,12 +602,12 @@ void ClientRequestState::BlockOnWait() { l.lock(); is_block_on_wait_joining_ = false; wait_thread_.reset(); - block_on_wait_cv_.notify_all(); + block_on_wait_cv_.NotifyAll(); } else { // Another thread is already joining with wait_thread_. Block on the cond-var // until the Join() executed in the other thread has completed. do { - block_on_wait_cv_.wait(l); + block_on_wait_cv_.Wait(l); } while (is_block_on_wait_joining_); } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/service/client-request-state.h ---------------------------------------------------------------------- diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h index a5c03f5..968ae04 100644 --- a/be/src/service/client-request-state.h +++ b/be/src/service/client-request-state.h @@ -25,6 +25,7 @@ #include "service/child-query.h" #include "service/impala-server.h" #include "util/auth-util.h" +#include "util/condition-variable.h" #include "util/runtime-profile.h" #include "gen-cpp/Frontend_types.h" #include "gen-cpp/Frontend_types.h" @@ -254,7 +255,7 @@ class ClientRequestState { /// Condition variable to make BlockOnWait() thread-safe. One thread joins /// wait_thread_, and all other threads block on this cv. Used with lock_. - boost::condition_variable block_on_wait_cv_; + ConditionVariable block_on_wait_cv_; /// Used in conjunction with block_on_wait_cv_ to make BlockOnWait() thread-safe. bool is_block_on_wait_joining_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/service/impala-server.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc index 4b225b4..07a83eb 100644 --- a/be/src/service/impala-server.cc +++ b/be/src/service/impala-server.cc @@ -1123,7 +1123,7 @@ Status ImpalaServer::CloseSessionInternal(const TUniqueId& session_id, multiset<int32_t>::const_iterator itr = session_timeout_set_.find(session_timeout); DCHECK(itr != session_timeout_set_.end()); session_timeout_set_.erase(itr); - session_timeout_cv_.notify_one(); + session_timeout_cv_.NotifyOne(); } return Status::OK(); } @@ -1448,7 +1448,7 @@ void ImpalaServer::CatalogUpdateCallback( unique_lock<mutex> unique_lock(catalog_version_lock_); min_subscriber_catalog_topic_version_ = delta.min_subscriber_topic_version; } - catalog_version_update_cv_.notify_all(); + catalog_version_update_cv_.NotifyAll(); } Status ImpalaServer::ProcessCatalogUpdateResult( @@ -1487,7 +1487,7 @@ Status ImpalaServer::ProcessCatalogUpdateResult( << " current version: " << catalog_update_info_.catalog_version; while (catalog_update_info_.catalog_version < min_req_catalog_version && catalog_update_info_.catalog_service_id == catalog_service_id) { - catalog_version_update_cv_.wait(unique_lock); + catalog_version_update_cv_.Wait(unique_lock); } if (!wait_for_all_subscribers) return Status::OK(); @@ -1502,7 +1502,7 @@ Status ImpalaServer::ProcessCatalogUpdateResult( << min_subscriber_catalog_topic_version_; while (min_subscriber_catalog_topic_version_ < min_req_subscriber_topic_version && catalog_update_info_.catalog_service_id == catalog_service_id) { - catalog_version_update_cv_.wait(unique_lock); + catalog_version_update_cv_.Wait(unique_lock); } return Status::OK(); } @@ -1786,7 +1786,7 @@ void ImpalaServer::RegisterSessionTimeout(int32_t session_timeout) { if (session_timeout <= 0) return; lock_guard<mutex> l(session_timeout_lock_); session_timeout_set_.insert(session_timeout); - session_timeout_cv_.notify_one(); + session_timeout_cv_.NotifyOne(); } [[noreturn]] void ImpalaServer::ExpireSessions() { @@ -1794,12 +1794,10 @@ void ImpalaServer::RegisterSessionTimeout(int32_t session_timeout) { { unique_lock<mutex> timeout_lock(session_timeout_lock_); if (session_timeout_set_.empty()) { - session_timeout_cv_.wait(timeout_lock); + session_timeout_cv_.Wait(timeout_lock); } else { // Sleep for a second before checking whether an active session can be expired. - const int64_t SLEEP_TIME_MS = 1000; - system_time deadline = get_system_time() + milliseconds(SLEEP_TIME_MS); - session_timeout_cv_.timed_wait(timeout_lock, deadline); + session_timeout_cv_.WaitFor(timeout_lock, seconds(1)); } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/service/impala-server.h ---------------------------------------------------------------------- diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h index f0dfa06..81ec929 100644 --- a/be/src/service/impala-server.h +++ b/be/src/service/impala-server.h @@ -35,6 +35,7 @@ #include "common/status.h" #include "service/frontend.h" #include "service/query-options.h" +#include "util/condition-variable.h" #include "util/metrics.h" #include "util/runtime-profile.h" #include "util/simple-logger.h" @@ -800,7 +801,7 @@ class ImpalaServer : public ImpalaServiceIf, /// session_timeout_thread_ relies on the following conditional variable to wake up /// on every poll period expiration or when the poll period changes. - boost::condition_variable session_timeout_cv_; + ConditionVariable session_timeout_cv_; /// map from query id to exec state; ClientRequestState is owned by us and referenced /// as a shared_ptr to allow asynchronous deletion @@ -922,7 +923,7 @@ class ImpalaServer : public ImpalaServiceIf, boost::mutex catalog_version_lock_; /// Variable to signal when the catalog version has been modified - boost::condition_variable catalog_version_update_cv_; + ConditionVariable catalog_version_update_cv_; /// Contains details on the version information of a catalog update. struct CatalogUpdateVersionInfo { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/statestore/statestore.h ---------------------------------------------------------------------- diff --git a/be/src/statestore/statestore.h b/be/src/statestore/statestore.h index 26aa836..86a0d9e 100644 --- a/be/src/statestore/statestore.h +++ b/be/src/statestore/statestore.h @@ -24,7 +24,6 @@ #include <vector> #include <boost/scoped_ptr.hpp> -#include <boost/thread/condition_variable.hpp> #include <boost/unordered_map.hpp> #include <boost/uuid/uuid_generators.hpp> http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/util/blocking-queue.h ---------------------------------------------------------------------- diff --git a/be/src/util/blocking-queue.h b/be/src/util/blocking-queue.h index a32345c..a4b1b8f 100644 --- a/be/src/util/blocking-queue.h +++ b/be/src/util/blocking-queue.h @@ -19,7 +19,6 @@ #ifndef IMPALA_UTIL_BLOCKING_QUEUE_H #define IMPALA_UTIL_BLOCKING_QUEUE_H -#include <boost/thread/condition_variable.hpp> #include <boost/thread/mutex.hpp> #include <boost/scoped_ptr.hpp> #include <deque> @@ -141,12 +140,11 @@ class BlockingQueue : public CacheLineAligned { boost::unique_lock<boost::mutex> write_lock(put_lock_); boost::system_time wtime = boost::get_system_time() + boost::posix_time::microseconds(timeout_micros); - const struct timespec timeout = boost::detail::to_timespec(wtime); bool notified = true; while (SizeLocked(write_lock) >= max_elements_ && !shutdown_ && notified) { timer.Start(); // Wait until we're notified or until the timeout expires. - notified = put_cv_.TimedWait(write_lock, &timeout); + notified = put_cv_.WaitUntil(write_lock, wtime); timer.Stop(); } total_put_wait_time_ += timer.ElapsedTime(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/util/condition-variable.h ---------------------------------------------------------------------- diff --git a/be/src/util/condition-variable.h b/be/src/util/condition-variable.h index b0b1090..c1a1e56 100644 --- a/be/src/util/condition-variable.h +++ b/be/src/util/condition-variable.h @@ -20,6 +20,7 @@ #include <boost/thread/pthread/timespec.hpp> #include <boost/thread/mutex.hpp> +#include <boost/thread/thread_time.hpp> #include <pthread.h> #include <unistd.h> @@ -34,7 +35,7 @@ class ConditionVariable { ~ConditionVariable() { pthread_cond_destroy(&cv_); } /// Wait indefinitely on the condition variable until it's notified. - inline void Wait(boost::unique_lock<boost::mutex>& lock) { + void Wait(boost::unique_lock<boost::mutex>& lock) { DCHECK(lock.owns_lock()); pthread_mutex_t* mutex = lock.mutex()->native_handle(); pthread_cond_wait(&cv_, mutex); @@ -43,18 +44,36 @@ class ConditionVariable { /// Wait until the condition variable is notified or 'timeout' has passed. /// Returns true if the condition variable is notified before the absolute timeout /// specified in 'timeout' has passed. Returns false otherwise. - inline bool TimedWait(boost::unique_lock<boost::mutex>& lock, - const struct timespec* timeout) { + bool WaitUntil(boost::unique_lock<boost::mutex>& lock, + const timespec& abs_time) { DCHECK(lock.owns_lock()); pthread_mutex_t* mutex = lock.mutex()->native_handle(); - return pthread_cond_timedwait(&cv_, mutex, timeout) == 0; + return pthread_cond_timedwait(&cv_, mutex, &abs_time) == 0; + } + + /// Wait until the condition variable is notified or 'abs_time' has passed. + /// Returns true if the condition variable is notified before the absolute timeout + /// specified in 'abs_time' has passed. Returns false otherwise. + bool WaitUntil(boost::unique_lock<boost::mutex>& lock, + const boost::system_time& abs_time) { + return WaitUntil(lock, to_timespec(abs_time)); + } + + /// Wait until the condition variable is notified or have waited for the time + /// specified in 'wait_duration'. + /// Returns true if the condition variable is notified in time. + /// Returns false otherwise. + template <typename duration_type> + bool WaitFor(boost::unique_lock<boost::mutex>& lock, + const duration_type& wait_duration) { + return WaitUntil(lock, to_timespec(boost::get_system_time() + wait_duration)); } /// Notify a single waiter on this condition variable. - inline void NotifyOne() { pthread_cond_signal(&cv_); } + void NotifyOne() { pthread_cond_signal(&cv_); } /// Notify all waiters on this condition variable. - inline void NotifyAll() { pthread_cond_broadcast(&cv_); } + void NotifyAll() { pthread_cond_broadcast(&cv_); } private: pthread_cond_t cv_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/util/promise.h ---------------------------------------------------------------------- diff --git a/be/src/util/promise.h b/be/src/util/promise.h index c6b8f15..5de2d13 100644 --- a/be/src/util/promise.h +++ b/be/src/util/promise.h @@ -21,6 +21,7 @@ #include <algorithm> #include <boost/thread.hpp> +#include "util/condition-variable.h" #include "util/time.h" #include "common/atomic.h" #include "common/logging.h" @@ -53,9 +54,9 @@ class Promise { /// p.get(); /// } /// < promise object gets destroyed > - /// Calling notify_all() with the val_lock_ guarantees that the thread calling + /// Calling NotifyAll() with the val_lock_ guarantees that the thread calling /// Set() is done and the promise is safe to delete. - val_set_cond_.notify_all(); + val_set_cond_.NotifyAll(); } /// Blocks until a value is set, and then returns a reference to that value. Once Get() @@ -63,7 +64,7 @@ class Promise { const T& Get() { boost::unique_lock<boost::mutex> l(val_lock_); while (!val_is_set_) { - val_set_cond_.wait(l); + val_set_cond_.Wait(l); } return val_; } @@ -86,7 +87,7 @@ class Promise { boost::posix_time::microseconds wait_time = boost::posix_time::microseconds(std::max<int64_t>( 1, timeout_micros - (now - start))); - val_set_cond_.timed_wait(l, wait_time); + val_set_cond_.WaitFor(l, wait_time); now = MonotonicMicros(); } *timed_out = !val_is_set_; @@ -102,7 +103,7 @@ class Promise { private: /// These variables deal with coordination between consumer and producer, and protect /// access to val_; - boost::condition_variable val_set_cond_; + ConditionVariable val_set_cond_; bool val_is_set_; boost::mutex val_lock_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/util/thread-pool.h ---------------------------------------------------------------------- diff --git a/be/src/util/thread-pool.h b/be/src/util/thread-pool.h index 4b5dbf0..fedba58 100644 --- a/be/src/util/thread-pool.h +++ b/be/src/util/thread-pool.h @@ -24,6 +24,7 @@ #include <boost/bind/mem_fn.hpp> #include "util/aligned-new.h" +#include "util/condition-variable.h" #include "util/thread.h" namespace impala { @@ -135,7 +136,7 @@ class ThreadPool : public CacheLineAligned { // If the ThreadPool is not initialized, then the queue must be empty. DCHECK(initialized_ || work_queue_.Size() == 0); while (work_queue_.Size() != 0) { - empty_cv_.wait(l); + empty_cv_.Wait(l); } } Shutdown(); @@ -156,7 +157,7 @@ class ThreadPool : public CacheLineAligned { /// GetSize() and wait()'ing when the condition variable is notified. /// (It will hang if we notify right before calling wait().) boost::unique_lock<boost::mutex> l(lock_); - empty_cv_.notify_all(); + empty_cv_.NotifyAll(); } } } @@ -200,7 +201,7 @@ class ThreadPool : public CacheLineAligned { bool shutdown_ = false; /// Signalled when the queue becomes empty - boost::condition_variable empty_cv_; + ConditionVariable empty_cv_; }; /// Utility thread-pool that accepts callable work items, and simply invokes them.
