[c++client] performance optimizations

The change on Batcher::ComputeDeadlineUnlocked() gave
about 50% boost for scenarios when session timeout is not set
and write operations are small (raw/wire size ~100 bytes).

Avoid calling std::shared_from_this(KuduSession) for every scheduled
write operation.

Change-Id: I4b57fc7355f9f673f30861ec30cb6b48cdf656d2
Reviewed-on: http://gerrit.cloudera.org:8080/4385
Tested-by: Kudu Jenkins
Reviewed-by: David Ribeiro Alves <dral...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/1f639da6
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/1f639da6
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/1f639da6

Branch: refs/heads/master
Commit: 1f639da62d1c995ad9a7b596160b42054757f5e1
Parents: 09bf034
Author: Alexey Serbin <aser...@cloudera.com>
Authored: Wed Sep 14 23:52:01 2016 -0700
Committer: Alexey Serbin <aser...@cloudera.com>
Committed: Mon Sep 19 04:22:57 2016 +0000

----------------------------------------------------------------------
 src/kudu/client/batcher.cc          | 14 +++------
 src/kudu/client/batcher.h           |  4 +--
 src/kudu/client/client.cc           |  4 +--
 src/kudu/client/session-internal.cc | 51 +++++++++++++-------------------
 src/kudu/client/session-internal.h  | 23 ++++++++++----
 5 files changed, 45 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/1f639da6/src/kudu/client/batcher.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/batcher.cc b/src/kudu/client/batcher.cc
index de6cfdc..d7c4dca 100644
--- a/src/kudu/client/batcher.cc
+++ b/src/kudu/client/batcher.cc
@@ -389,6 +389,7 @@ Batcher::Batcher(KuduClient* client,
     had_errors_(false),
     flush_callback_(nullptr),
     next_op_sequence_number_(0),
+    timeout_(MonoDelta::FromSeconds(60)),
     outstanding_lookups_(0),
     buffer_bytes_used_(0) {
 }
@@ -427,10 +428,9 @@ Batcher::~Batcher() {
   CHECK(state_ == kFlushed || state_ == kAborted) << "Bad state: " << state_;
 }
 
-void Batcher::SetTimeoutMillis(int millis) {
-  CHECK_GE(millis, 0);
+void Batcher::SetTimeout(const MonoDelta& timeout) {
   std::lock_guard<simple_spinlock> l(lock_);
-  timeout_ = MonoDelta::FromMilliseconds(millis);
+  timeout_ = timeout;
 }
 
 
@@ -478,13 +478,7 @@ void Batcher::CheckForFinishedFlush() {
 }
 
 MonoTime Batcher::ComputeDeadlineUnlocked() const {
-  MonoDelta timeout = timeout_;
-  if (PREDICT_FALSE(!timeout.Initialized())) {
-    KLOG_EVERY_N(WARNING, 1000) << "Client writing with no timeout set, using 
60 seconds.\n"
-                                << GetStackTrace();
-    timeout = MonoDelta::FromSeconds(60);
-  }
-  return MonoTime::Now() + timeout;
+  return MonoTime::Now() + timeout_;
 }
 
 void Batcher::FlushAsync(KuduStatusCallback* cb) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/1f639da6/src/kudu/client/batcher.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/batcher.h b/src/kudu/client/batcher.h
index 566d356..3a52016 100644
--- a/src/kudu/client/batcher.h
+++ b/src/kudu/client/batcher.h
@@ -80,7 +80,7 @@ class Batcher : public RefCountedThreadSafe<Batcher> {
   // The timeout is currently set on all of the RPCs, but in the future will 
be relative
   // to when the Flush call is made (eg even if the lookup of the TS takes a 
long time, it
   // may time out before even sending an op). TODO: implement that
-  void SetTimeoutMillis(int millis);
+  void SetTimeout(const MonoDelta& timeout);
 
   // Add a new operation to the batch. Requires that the batch has not yet 
been flushed.
   //
@@ -215,7 +215,7 @@ class Batcher : public RefCountedThreadSafe<Batcher> {
 
   // Amount of time to wait for a given op, from start to finish.
   //
-  // Set by SetTimeoutMillis.
+  // Set by SetTimeout().
   MonoDelta timeout_;
 
   // After flushing, the absolute deadline for all in-flight ops.

http://git-wip-us.apache.org/repos/asf/kudu/blob/1f639da6/src/kudu/client/client.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index 4609f2f..8314019 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -775,7 +775,7 @@ Status KuduSession::SetFlushMode(FlushMode m) {
     // Be paranoid in client code.
     return Status::InvalidArgument("Bad flush mode");
   }
-  return data_->SetFlushMode(m, shared_from_this());
+  return data_->SetFlushMode(m);
 }
 
 Status KuduSession::SetExternalConsistencyMode(ExternalConsistencyMode m) {
@@ -820,7 +820,7 @@ bool KuduSession::HasPendingOperations() const {
 }
 
 Status KuduSession::Apply(KuduWriteOperation* write_op) {
-  RETURN_NOT_OK(data_->ApplyWriteOp(shared_from_this(), write_op));
+  RETURN_NOT_OK(data_->ApplyWriteOp(write_op));
   // Thread-safety note: this method should not be called concurrently
   // with other methods which modify the KuduSession::Data members, so it
   // should be safe to read KuduSession::Data members without protection.

http://git-wip-us.apache.org/repos/asf/kudu/blob/1f639da6/src/kudu/client/session-internal.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/session-internal.cc 
b/src/kudu/client/session-internal.cc
index 6fed4e0..b97fb3d 100644
--- a/src/kudu/client/session-internal.cc
+++ b/src/kudu/client/session-internal.cc
@@ -45,7 +45,6 @@ KuduSession::Data::Data(shared_ptr<KuduClient> client,
       messenger_(std::move(messenger)),
       error_collector_(new ErrorCollector()),
       external_consistency_mode_(CLIENT_PROPAGATED),
-      timeout_ms_(-1),
       flush_interval_(MonoDelta::FromMilliseconds(1000)),
       flush_task_active_(false),
       flush_mode_(AUTO_FLUSH_SYNC),
@@ -59,7 +58,8 @@ KuduSession::Data::Data(shared_ptr<KuduClient> client,
 }
 
 void KuduSession::Data::Init(weak_ptr<KuduSession> session) {
-  TimeBasedFlushInit(std::move(session));
+  session_.swap(session);
+  TimeBasedFlushInit();
 }
 
 void KuduSession::Data::FlushFinished(Batcher* batcher) {
@@ -100,7 +100,7 @@ Status KuduSession::Data::SetExternalConsistencyMode(
         "Cannot change external consistency mode when writes are buffered");
   }
   // Thread-safety note: the external_consistency_mode_ is not supposed
-  // to be accessed or modified from any other thread of control:
+  // to be accessed or modified from any other thread:
   // no thread-safety is assumed for the kudu::KuduSession interface.
   // However, the lock is needed to check for pending operations because
   // there may be pending RPCs and the background flush task may be running.
@@ -108,8 +108,7 @@ Status KuduSession::Data::SetExternalConsistencyMode(
   return Status::OK();
 }
 
-Status KuduSession::Data::SetFlushMode(FlushMode mode,
-                                       sp::weak_ptr<KuduSession> session) {
+Status KuduSession::Data::SetFlushMode(FlushMode mode) {
   {
     std::lock_guard<Mutex> l(mutex_);
     if (HasPendingOperationsUnlocked()) {
@@ -129,7 +128,7 @@ Status KuduSession::Data::SetFlushMode(FlushMode mode,
     flush_mode_ = mode;
   }
 
-  TimeBasedFlushInit(std::move(session));
+  TimeBasedFlushInit();
 
   return Status::OK();
 }
@@ -142,7 +141,7 @@ Status KuduSession::Data::SetBufferBytesLimit(size_t size) {
         "Cannot change buffer size limit when writes are buffered.");
   }
   // Thread-safety note: the buffer_bytes_limit_ is not supposed to be accessed
-  // or modified from any other thread of control: no thread-safety is assumed
+  // or modified from any other thread: no thread-safety is assumed
   // for the kudu::KuduSession interface. Due to the latter reason,
   // there should not be any threads waiting on conditions which are affected
   // by the change, so signalling other threads isn't necessary here.
@@ -165,7 +164,7 @@ Status KuduSession::Data::SetBufferFlushWatermark(int 
watermark_pct) {
         "Cannot change buffer flush watermark when writes are buffered.");
   }
   // Thread-safety note: the buffer_watermark_pct_ is not supposed
-  // to be accessed or modified from any other thread of control:
+  // to be accessed or modified from any other thread:
   // no thread-safety is assumed for the kudu::KuduSession interface.
   // Due to the latter reason, there should not be any threads waiting on
   // conditions which are affected by the setting, so no signalling
@@ -199,7 +198,7 @@ Status KuduSession::Data::SetMaxBatchersNum(unsigned int 
max_num) {
         "Cannot change the limit on maximum number of batchers when writes are 
buffered.");
   }
   // Thread-safety note: the batchers_num_limit_ is not supposed
-  // to be accessed or modified from any other thread of control:
+  // to be accessed or modified from any other thread:
   // no thread-safety is assumed for the kudu::KuduSession interface.
   // Due to the latter reason, there should not be any threads waiting
   // on conditions which are affected by the setting, so no signalling
@@ -216,9 +215,9 @@ void KuduSession::Data::SetTimeoutMillis(int timeout_ms) {
   }
   {
     std::lock_guard<Mutex> l(mutex_);
-    timeout_ms_ = timeout_ms;
+    timeout_ = MonoDelta::FromMilliseconds(timeout_ms);
     if (batcher_) {
-      batcher_->SetTimeoutMillis(timeout_ms);
+      batcher_->SetTimeout(timeout_);
     }
   }
 }
@@ -272,7 +271,7 @@ void KuduSession::Data::FlushCurrentBatcher(int64_t 
watermark,
   scoped_refptr<Batcher> batcher_to_flush;
   {
     std::lock_guard<Mutex> l(mutex_);
-    if (batcher_ && batcher_->buffer_bytes_used() >= watermark) {
+    if (PREDICT_TRUE(batcher_) && batcher_->buffer_bytes_used() >= watermark) {
       batcher_to_flush.swap(batcher_);
     }
   }
@@ -319,10 +318,7 @@ MonoDelta KuduSession::Data::FlushCurrentBatcher(const 
MonoDelta& max_age) {
 // from this this method, the operation must end up either in the corresponding
 // batcher (success path) or in the error collector (failure path). Otherwise
 // it would be a memory leak.
-Status KuduSession::Data::ApplyWriteOp(
-    sp::weak_ptr<KuduSession> weak_session,
-    KuduWriteOperation* write_op) {
-
+Status KuduSession::Data::ApplyWriteOp(KuduWriteOperation* write_op) {
   if (PREDICT_FALSE(!write_op)) {
     return Status::InvalidArgument("NULL operation");
   }
@@ -337,14 +333,7 @@ Status KuduSession::Data::ApplyWriteOp(
   // Get 'wire size' of the write operation.
   const int64_t required_size = Batcher::GetOperationSizeInBuffer(write_op);
 
-  // Thread-safety note: the buffer_bytes_limit_ and
-  // buffer_watermark_pct_ are not supposed to be modified
-  // from any other thread of control since no thread-safety is advertised
-  // for the kudu::KuduSession interface.
-  // So, no protection while accessing those members.
   const size_t max_size = buffer_bytes_limit_;
-  const size_t flush_watermark =
-      buffer_bytes_limit_ * buffer_watermark_pct_ / 100;
   // Thread-safety note: the flush_mode_ is accessed from the background
   // time-based flush task for reading. Practically, it would be possible
   // to get away with not protecting the flush_mode_ since it's read-only
@@ -426,13 +415,12 @@ Status KuduSession::Data::ApplyWriteOp(
       DCHECK(!batcher_);
       // Thread-safety note: the external_consistecy_mode_ and timeout_ms_
       // are not supposed to be accessed or modified from any other thread
-      // of control since no thread-safety is advertised
-      // for the kudu::KuduSession interface.
+      // no thread-safety is advertised for the kudu::KuduSession interface.
       scoped_refptr<Batcher> batcher(
-          new Batcher(client_.get(), error_collector_, std::move(weak_session),
+          new Batcher(client_.get(), error_collector_, session_,
                       external_consistency_mode_));
-      if (timeout_ms_ != -1) {
-        batcher->SetTimeoutMillis(timeout_ms_);
+      if (timeout_.Initialized()) {
+        batcher->SetTimeout(timeout_);
       }
       batcher.swap(batcher_);
       ++batchers_num_;
@@ -448,6 +436,8 @@ Status KuduSession::Data::ApplyWriteOp(
   }
 
   if (flush_mode == AUTO_FLUSH_BACKGROUND) {
+    const size_t flush_watermark =
+        buffer_bytes_limit_ * buffer_watermark_pct_ / 100;
     // In AUTO_FLUSH_BACKGROUND mode it's necessary to flush the newly added
     // operations if the flush watermark is reached. The current batcher is
     // the exclusive and the only container for the newly added operations.
@@ -459,10 +449,9 @@ Status KuduSession::Data::ApplyWriteOp(
   return Status::OK();
 }
 
-void KuduSession::Data::TimeBasedFlushInit(
-    sp::weak_ptr<KuduSession> weak_session) {
+void KuduSession::Data::TimeBasedFlushInit() {
   KuduSession::Data::TimeBasedFlushTask(
-      Status::OK(), messenger_, std::move(weak_session), true);
+      Status::OK(), messenger_, session_, true);
 }
 
 void KuduSession::Data::TimeBasedFlushTask(

http://git-wip-us.apache.org/repos/asf/kudu/blob/1f639da6/src/kudu/client/session-internal.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/session-internal.h 
b/src/kudu/client/session-internal.h
index fe2225c..66a70a6 100644
--- a/src/kudu/client/session-internal.h
+++ b/src/kudu/client/session-internal.h
@@ -60,7 +60,7 @@ class KuduSession::Data {
   void Init(sp::weak_ptr<KuduSession> session);
 
   // Called by Batcher when a flush has finished.
-  void FlushFinished(internal::Batcher* b);
+  void FlushFinished(internal::Batcher* batcher);
 
   // Returns Status::IllegalState() if 'force' is false and there are still 
pending
   // operations. If 'force' is true batcher_ is aborted even if there are 
pending
@@ -68,7 +68,7 @@ class KuduSession::Data {
   Status Close(bool force);
 
   // Set flush mode for the session.
-  Status SetFlushMode(FlushMode mode, sp::weak_ptr<KuduSession> session);
+  Status SetFlushMode(FlushMode mode);
 
   // Set external consistency mode for the session.
   Status SetExternalConsistencyMode(KuduSession::ExternalConsistencyMode m);
@@ -123,11 +123,10 @@ class KuduSession::Data {
   MonoDelta FlushCurrentBatcher(const MonoDelta& max_age);
 
   // Apply a write operation, i.e. push it through the batcher chain.
-  Status ApplyWriteOp(sp::weak_ptr<KuduSession> session,
-                      KuduWriteOperation* write_op);
+  Status ApplyWriteOp(KuduWriteOperation* write_op);
 
   // Check and start the time-based flush task in background, if necessary.
-  void TimeBasedFlushInit(sp::weak_ptr<KuduSession> weak_session);
+  void TimeBasedFlushInit();
 
   // The self-rescheduling task to flush write operations which have been
   // accumulating for too long (controlled by flush_interval_).
@@ -158,6 +157,12 @@ class KuduSession::Data {
   // The client that this session is associated with.
   const sp::shared_ptr<KuduClient> client_;
 
+  // Weak reference to the containing session. The reference is weak to
+  // avoid circular referencing.  The reference to the KuduSession object
+  // is needed by batchers and time-based flush task: being run in independent
+  // threads, they need to make sure the object is alive before accessing it.
+  sp::weak_ptr<KuduSession> session_;
+
   // The reference to the client's messenger (keeping the reference instead of
   // declaring friendship to KuduClient and accessing it via the client_).
   std::weak_ptr<rpc::Messenger> messenger_;
@@ -168,7 +173,7 @@ class KuduSession::Data {
   kudu::client::KuduSession::ExternalConsistencyMode 
external_consistency_mode_;
 
   // Timeout for the next batch.
-  int timeout_ms_;
+  MonoDelta timeout_;
 
   // Interval for the max-wait flush background task.
   MonoDelta flush_interval_;  // protected by mutex_
@@ -207,12 +212,18 @@ class KuduSession::Data {
   // operations. The buffer is a virtual entity: there isn't contiguous place
   // in the memory which would contain that 'buffered' data. Instead, buffer's
   // data is spread across all pending operations in all active batchers.
+  // Thread-safety note: buffer_bytes_limit_ is not supposed to be modified
+  // from any other thread since no thread-safety is advertised for the
+  // kudu::KuduSession interface.
   size_t buffer_bytes_limit_;
 
   // The high-watermark level as the percentage of the buffer space used by
   // freshly added (not-yet-scheduled-for-flush) write operations.
   // Once the level is reached, the BackgroundFlusher triggers flushing
   // of accumulated write operations when running in AUTO_FLUSH_BACKGROUND 
mode.
+  // Thread-safety note: buffer_watermark_pct_ is not supposed to be modified
+  // from any other thread since no thread-safety is advertised for the
+  // kudu::KuduSession interface.
   int32_t buffer_watermark_pct_;
 
   // The total number of bytes used by buffered write operations.

Reply via email to