[c++client] performance optimizations The change on Batcher::ComputeDeadlineUnlocked() gave about 50% boost for scenarios when session timeout is not set and write operations are small (raw/wire size ~100 bytes).
Avoid calling std::shared_from_this(KuduSession) for every scheduled write operation. Change-Id: I4b57fc7355f9f673f30861ec30cb6b48cdf656d2 Reviewed-on: http://gerrit.cloudera.org:8080/4385 Tested-by: Kudu Jenkins Reviewed-by: David Ribeiro Alves <dral...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/1f639da6 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/1f639da6 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/1f639da6 Branch: refs/heads/master Commit: 1f639da62d1c995ad9a7b596160b42054757f5e1 Parents: 09bf034 Author: Alexey Serbin <aser...@cloudera.com> Authored: Wed Sep 14 23:52:01 2016 -0700 Committer: Alexey Serbin <aser...@cloudera.com> Committed: Mon Sep 19 04:22:57 2016 +0000 ---------------------------------------------------------------------- src/kudu/client/batcher.cc | 14 +++------ src/kudu/client/batcher.h | 4 +-- src/kudu/client/client.cc | 4 +-- src/kudu/client/session-internal.cc | 51 +++++++++++++------------------- src/kudu/client/session-internal.h | 23 ++++++++++---- 5 files changed, 45 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/1f639da6/src/kudu/client/batcher.cc ---------------------------------------------------------------------- diff --git a/src/kudu/client/batcher.cc b/src/kudu/client/batcher.cc index de6cfdc..d7c4dca 100644 --- a/src/kudu/client/batcher.cc +++ b/src/kudu/client/batcher.cc @@ -389,6 +389,7 @@ Batcher::Batcher(KuduClient* client, had_errors_(false), flush_callback_(nullptr), next_op_sequence_number_(0), + timeout_(MonoDelta::FromSeconds(60)), outstanding_lookups_(0), buffer_bytes_used_(0) { } @@ -427,10 +428,9 @@ Batcher::~Batcher() { CHECK(state_ == kFlushed || state_ == kAborted) << "Bad state: " << state_; } -void Batcher::SetTimeoutMillis(int millis) { - CHECK_GE(millis, 0); +void Batcher::SetTimeout(const MonoDelta& timeout) { std::lock_guard<simple_spinlock> l(lock_); - timeout_ = MonoDelta::FromMilliseconds(millis); + timeout_ = timeout; } @@ -478,13 +478,7 @@ void Batcher::CheckForFinishedFlush() { } MonoTime Batcher::ComputeDeadlineUnlocked() const { - MonoDelta timeout = timeout_; - if (PREDICT_FALSE(!timeout.Initialized())) { - KLOG_EVERY_N(WARNING, 1000) << "Client writing with no timeout set, using 60 seconds.\n" - << GetStackTrace(); - timeout = MonoDelta::FromSeconds(60); - } - return MonoTime::Now() + timeout; + return MonoTime::Now() + timeout_; } void Batcher::FlushAsync(KuduStatusCallback* cb) { http://git-wip-us.apache.org/repos/asf/kudu/blob/1f639da6/src/kudu/client/batcher.h ---------------------------------------------------------------------- diff --git a/src/kudu/client/batcher.h b/src/kudu/client/batcher.h index 566d356..3a52016 100644 --- a/src/kudu/client/batcher.h +++ b/src/kudu/client/batcher.h @@ -80,7 +80,7 @@ class Batcher : public RefCountedThreadSafe<Batcher> { // The timeout is currently set on all of the RPCs, but in the future will be relative // to when the Flush call is made (eg even if the lookup of the TS takes a long time, it // may time out before even sending an op). TODO: implement that - void SetTimeoutMillis(int millis); + void SetTimeout(const MonoDelta& timeout); // Add a new operation to the batch. Requires that the batch has not yet been flushed. // @@ -215,7 +215,7 @@ class Batcher : public RefCountedThreadSafe<Batcher> { // Amount of time to wait for a given op, from start to finish. // - // Set by SetTimeoutMillis. + // Set by SetTimeout(). MonoDelta timeout_; // After flushing, the absolute deadline for all in-flight ops. http://git-wip-us.apache.org/repos/asf/kudu/blob/1f639da6/src/kudu/client/client.cc ---------------------------------------------------------------------- diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc index 4609f2f..8314019 100644 --- a/src/kudu/client/client.cc +++ b/src/kudu/client/client.cc @@ -775,7 +775,7 @@ Status KuduSession::SetFlushMode(FlushMode m) { // Be paranoid in client code. return Status::InvalidArgument("Bad flush mode"); } - return data_->SetFlushMode(m, shared_from_this()); + return data_->SetFlushMode(m); } Status KuduSession::SetExternalConsistencyMode(ExternalConsistencyMode m) { @@ -820,7 +820,7 @@ bool KuduSession::HasPendingOperations() const { } Status KuduSession::Apply(KuduWriteOperation* write_op) { - RETURN_NOT_OK(data_->ApplyWriteOp(shared_from_this(), write_op)); + RETURN_NOT_OK(data_->ApplyWriteOp(write_op)); // Thread-safety note: this method should not be called concurrently // with other methods which modify the KuduSession::Data members, so it // should be safe to read KuduSession::Data members without protection. http://git-wip-us.apache.org/repos/asf/kudu/blob/1f639da6/src/kudu/client/session-internal.cc ---------------------------------------------------------------------- diff --git a/src/kudu/client/session-internal.cc b/src/kudu/client/session-internal.cc index 6fed4e0..b97fb3d 100644 --- a/src/kudu/client/session-internal.cc +++ b/src/kudu/client/session-internal.cc @@ -45,7 +45,6 @@ KuduSession::Data::Data(shared_ptr<KuduClient> client, messenger_(std::move(messenger)), error_collector_(new ErrorCollector()), external_consistency_mode_(CLIENT_PROPAGATED), - timeout_ms_(-1), flush_interval_(MonoDelta::FromMilliseconds(1000)), flush_task_active_(false), flush_mode_(AUTO_FLUSH_SYNC), @@ -59,7 +58,8 @@ KuduSession::Data::Data(shared_ptr<KuduClient> client, } void KuduSession::Data::Init(weak_ptr<KuduSession> session) { - TimeBasedFlushInit(std::move(session)); + session_.swap(session); + TimeBasedFlushInit(); } void KuduSession::Data::FlushFinished(Batcher* batcher) { @@ -100,7 +100,7 @@ Status KuduSession::Data::SetExternalConsistencyMode( "Cannot change external consistency mode when writes are buffered"); } // Thread-safety note: the external_consistency_mode_ is not supposed - // to be accessed or modified from any other thread of control: + // to be accessed or modified from any other thread: // no thread-safety is assumed for the kudu::KuduSession interface. // However, the lock is needed to check for pending operations because // there may be pending RPCs and the background flush task may be running. @@ -108,8 +108,7 @@ Status KuduSession::Data::SetExternalConsistencyMode( return Status::OK(); } -Status KuduSession::Data::SetFlushMode(FlushMode mode, - sp::weak_ptr<KuduSession> session) { +Status KuduSession::Data::SetFlushMode(FlushMode mode) { { std::lock_guard<Mutex> l(mutex_); if (HasPendingOperationsUnlocked()) { @@ -129,7 +128,7 @@ Status KuduSession::Data::SetFlushMode(FlushMode mode, flush_mode_ = mode; } - TimeBasedFlushInit(std::move(session)); + TimeBasedFlushInit(); return Status::OK(); } @@ -142,7 +141,7 @@ Status KuduSession::Data::SetBufferBytesLimit(size_t size) { "Cannot change buffer size limit when writes are buffered."); } // Thread-safety note: the buffer_bytes_limit_ is not supposed to be accessed - // or modified from any other thread of control: no thread-safety is assumed + // or modified from any other thread: no thread-safety is assumed // for the kudu::KuduSession interface. Due to the latter reason, // there should not be any threads waiting on conditions which are affected // by the change, so signalling other threads isn't necessary here. @@ -165,7 +164,7 @@ Status KuduSession::Data::SetBufferFlushWatermark(int watermark_pct) { "Cannot change buffer flush watermark when writes are buffered."); } // Thread-safety note: the buffer_watermark_pct_ is not supposed - // to be accessed or modified from any other thread of control: + // to be accessed or modified from any other thread: // no thread-safety is assumed for the kudu::KuduSession interface. // Due to the latter reason, there should not be any threads waiting on // conditions which are affected by the setting, so no signalling @@ -199,7 +198,7 @@ Status KuduSession::Data::SetMaxBatchersNum(unsigned int max_num) { "Cannot change the limit on maximum number of batchers when writes are buffered."); } // Thread-safety note: the batchers_num_limit_ is not supposed - // to be accessed or modified from any other thread of control: + // to be accessed or modified from any other thread: // no thread-safety is assumed for the kudu::KuduSession interface. // Due to the latter reason, there should not be any threads waiting // on conditions which are affected by the setting, so no signalling @@ -216,9 +215,9 @@ void KuduSession::Data::SetTimeoutMillis(int timeout_ms) { } { std::lock_guard<Mutex> l(mutex_); - timeout_ms_ = timeout_ms; + timeout_ = MonoDelta::FromMilliseconds(timeout_ms); if (batcher_) { - batcher_->SetTimeoutMillis(timeout_ms); + batcher_->SetTimeout(timeout_); } } } @@ -272,7 +271,7 @@ void KuduSession::Data::FlushCurrentBatcher(int64_t watermark, scoped_refptr<Batcher> batcher_to_flush; { std::lock_guard<Mutex> l(mutex_); - if (batcher_ && batcher_->buffer_bytes_used() >= watermark) { + if (PREDICT_TRUE(batcher_) && batcher_->buffer_bytes_used() >= watermark) { batcher_to_flush.swap(batcher_); } } @@ -319,10 +318,7 @@ MonoDelta KuduSession::Data::FlushCurrentBatcher(const MonoDelta& max_age) { // from this this method, the operation must end up either in the corresponding // batcher (success path) or in the error collector (failure path). Otherwise // it would be a memory leak. -Status KuduSession::Data::ApplyWriteOp( - sp::weak_ptr<KuduSession> weak_session, - KuduWriteOperation* write_op) { - +Status KuduSession::Data::ApplyWriteOp(KuduWriteOperation* write_op) { if (PREDICT_FALSE(!write_op)) { return Status::InvalidArgument("NULL operation"); } @@ -337,14 +333,7 @@ Status KuduSession::Data::ApplyWriteOp( // Get 'wire size' of the write operation. const int64_t required_size = Batcher::GetOperationSizeInBuffer(write_op); - // Thread-safety note: the buffer_bytes_limit_ and - // buffer_watermark_pct_ are not supposed to be modified - // from any other thread of control since no thread-safety is advertised - // for the kudu::KuduSession interface. - // So, no protection while accessing those members. const size_t max_size = buffer_bytes_limit_; - const size_t flush_watermark = - buffer_bytes_limit_ * buffer_watermark_pct_ / 100; // Thread-safety note: the flush_mode_ is accessed from the background // time-based flush task for reading. Practically, it would be possible // to get away with not protecting the flush_mode_ since it's read-only @@ -426,13 +415,12 @@ Status KuduSession::Data::ApplyWriteOp( DCHECK(!batcher_); // Thread-safety note: the external_consistecy_mode_ and timeout_ms_ // are not supposed to be accessed or modified from any other thread - // of control since no thread-safety is advertised - // for the kudu::KuduSession interface. + // no thread-safety is advertised for the kudu::KuduSession interface. scoped_refptr<Batcher> batcher( - new Batcher(client_.get(), error_collector_, std::move(weak_session), + new Batcher(client_.get(), error_collector_, session_, external_consistency_mode_)); - if (timeout_ms_ != -1) { - batcher->SetTimeoutMillis(timeout_ms_); + if (timeout_.Initialized()) { + batcher->SetTimeout(timeout_); } batcher.swap(batcher_); ++batchers_num_; @@ -448,6 +436,8 @@ Status KuduSession::Data::ApplyWriteOp( } if (flush_mode == AUTO_FLUSH_BACKGROUND) { + const size_t flush_watermark = + buffer_bytes_limit_ * buffer_watermark_pct_ / 100; // In AUTO_FLUSH_BACKGROUND mode it's necessary to flush the newly added // operations if the flush watermark is reached. The current batcher is // the exclusive and the only container for the newly added operations. @@ -459,10 +449,9 @@ Status KuduSession::Data::ApplyWriteOp( return Status::OK(); } -void KuduSession::Data::TimeBasedFlushInit( - sp::weak_ptr<KuduSession> weak_session) { +void KuduSession::Data::TimeBasedFlushInit() { KuduSession::Data::TimeBasedFlushTask( - Status::OK(), messenger_, std::move(weak_session), true); + Status::OK(), messenger_, session_, true); } void KuduSession::Data::TimeBasedFlushTask( http://git-wip-us.apache.org/repos/asf/kudu/blob/1f639da6/src/kudu/client/session-internal.h ---------------------------------------------------------------------- diff --git a/src/kudu/client/session-internal.h b/src/kudu/client/session-internal.h index fe2225c..66a70a6 100644 --- a/src/kudu/client/session-internal.h +++ b/src/kudu/client/session-internal.h @@ -60,7 +60,7 @@ class KuduSession::Data { void Init(sp::weak_ptr<KuduSession> session); // Called by Batcher when a flush has finished. - void FlushFinished(internal::Batcher* b); + void FlushFinished(internal::Batcher* batcher); // Returns Status::IllegalState() if 'force' is false and there are still pending // operations. If 'force' is true batcher_ is aborted even if there are pending @@ -68,7 +68,7 @@ class KuduSession::Data { Status Close(bool force); // Set flush mode for the session. - Status SetFlushMode(FlushMode mode, sp::weak_ptr<KuduSession> session); + Status SetFlushMode(FlushMode mode); // Set external consistency mode for the session. Status SetExternalConsistencyMode(KuduSession::ExternalConsistencyMode m); @@ -123,11 +123,10 @@ class KuduSession::Data { MonoDelta FlushCurrentBatcher(const MonoDelta& max_age); // Apply a write operation, i.e. push it through the batcher chain. - Status ApplyWriteOp(sp::weak_ptr<KuduSession> session, - KuduWriteOperation* write_op); + Status ApplyWriteOp(KuduWriteOperation* write_op); // Check and start the time-based flush task in background, if necessary. - void TimeBasedFlushInit(sp::weak_ptr<KuduSession> weak_session); + void TimeBasedFlushInit(); // The self-rescheduling task to flush write operations which have been // accumulating for too long (controlled by flush_interval_). @@ -158,6 +157,12 @@ class KuduSession::Data { // The client that this session is associated with. const sp::shared_ptr<KuduClient> client_; + // Weak reference to the containing session. The reference is weak to + // avoid circular referencing. The reference to the KuduSession object + // is needed by batchers and time-based flush task: being run in independent + // threads, they need to make sure the object is alive before accessing it. + sp::weak_ptr<KuduSession> session_; + // The reference to the client's messenger (keeping the reference instead of // declaring friendship to KuduClient and accessing it via the client_). std::weak_ptr<rpc::Messenger> messenger_; @@ -168,7 +173,7 @@ class KuduSession::Data { kudu::client::KuduSession::ExternalConsistencyMode external_consistency_mode_; // Timeout for the next batch. - int timeout_ms_; + MonoDelta timeout_; // Interval for the max-wait flush background task. MonoDelta flush_interval_; // protected by mutex_ @@ -207,12 +212,18 @@ class KuduSession::Data { // operations. The buffer is a virtual entity: there isn't contiguous place // in the memory which would contain that 'buffered' data. Instead, buffer's // data is spread across all pending operations in all active batchers. + // Thread-safety note: buffer_bytes_limit_ is not supposed to be modified + // from any other thread since no thread-safety is advertised for the + // kudu::KuduSession interface. size_t buffer_bytes_limit_; // The high-watermark level as the percentage of the buffer space used by // freshly added (not-yet-scheduled-for-flush) write operations. // Once the level is reached, the BackgroundFlusher triggers flushing // of accumulated write operations when running in AUTO_FLUSH_BACKGROUND mode. + // Thread-safety note: buffer_watermark_pct_ is not supposed to be modified + // from any other thread since no thread-safety is advertised for the + // kudu::KuduSession interface. int32_t buffer_watermark_pct_; // The total number of bytes used by buffered write operations.