http://git-wip-us.apache.org/repos/asf/kudu/blob/93be1310/src/kudu/client/session-internal.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/session-internal.cc 
b/src/kudu/client/session-internal.cc
index 552b507..8da7679 100644
--- a/src/kudu/client/session-internal.cc
+++ b/src/kudu/client/session-internal.cc
@@ -20,64 +20,514 @@
 #include <mutex>
 
 #include "kudu/client/batcher.h"
+#include "kudu/client/callbacks.h"
 #include "kudu/client/error_collector.h"
 #include "kudu/client/shared_ptr.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/messenger.h"
 
 namespace kudu {
 
+using rpc::Messenger;
+
 namespace client {
 
 using internal::Batcher;
 using internal::ErrorCollector;
 
 using sp::shared_ptr;
+using sp::weak_ptr;
+
 
-KuduSession::Data::Data(shared_ptr<KuduClient> client)
+KuduSession::Data::Data(shared_ptr<KuduClient> client,
+                        std::shared_ptr<rpc::Messenger> messenger)
     : client_(std::move(client)),
+      messenger_(std::move(messenger)),
       error_collector_(new ErrorCollector()),
-      flush_mode_(AUTO_FLUSH_SYNC),
       external_consistency_mode_(CLIENT_PROPAGATED),
-      timeout_ms_(-1) {
+      timeout_ms_(-1),
+      flush_interval_(MonoDelta::FromMilliseconds(1000)),
+      flush_task_active_(false),
+      flush_mode_(AUTO_FLUSH_SYNC),
+      condition_(&mutex_),
+      batchers_num_(0),
+      batchers_num_limit_(2),
+      buffer_bytes_limit_(7 * 1024 * 1024),
+      buffer_watermark_pct_(80),
+      buffer_bytes_used_(0),
+      buffer_pre_flush_enabled_(true) {
 }
 
-KuduSession::Data::~Data() {
+void KuduSession::Data::Init(const weak_ptr<KuduSession>& session) {
+  TimeBasedFlushInit(session);
 }
 
-void KuduSession::Data::Init(const shared_ptr<KuduSession>& session) {
-  std::lock_guard<simple_spinlock> l(lock_);
-  CHECK(!batcher_);
-  NewBatcher(session, NULL);
+void KuduSession::Data::FlushFinished(Batcher* batcher) {
+  const int64_t bytes_flushed = batcher->buffer_bytes_used();
+  {
+    std::lock_guard<Mutex> l(mutex_);
+    buffer_bytes_used_ -= bytes_flushed;
+    --batchers_num_;
+    // The logic of KuduSession::ApplyWriteOp() needs to know
+    // if total number of batchers or buffer byte count decreases.
+    // There can be a thread waiting on the corresponding condition
+    // variable: the thread which runs KuduSession::Apply(), and
+    // since KuduSession interface does not advertise thread-safety, it's
+    // the only thread to notify.
+    condition_.Signal();
+  }
 }
 
-void KuduSession::Data::NewBatcher(const shared_ptr<KuduSession>& session,
-                                   scoped_refptr<Batcher>* old_batcher) {
-  DCHECK(lock_.is_locked());
+Status KuduSession::Data::Close(bool force) {
+  std::lock_guard<Mutex> l(mutex_);
+  if (!batcher_) {
+      return Status::OK();
+  }
+  if (batcher_->HasPendingOperations() && !force) {
+    return Status::IllegalState("Could not close. There are pending 
operations.");
+  }
+  batcher_->Abort();
+
+  return Status::OK();
+}
 
-  scoped_refptr<Batcher> batcher(
-    new Batcher(client_.get(), error_collector_.get(), session,
-                external_consistency_mode_));
-  if (timeout_ms_ != -1) {
-    batcher->SetTimeoutMillis(timeout_ms_);
+Status KuduSession::Data::SetExternalConsistencyMode(
+    KuduSession::ExternalConsistencyMode m) {
+  std::lock_guard<Mutex> l(mutex_);
+  if (HasPendingOperationsUnlocked()) {
+    // NOTE: this is an artificial restriction.
+    return Status::IllegalState(
+        "Cannot change external consistency mode when writes are buffered");
   }
-  batcher.swap(batcher_);
+  // Thread-safety note: the external_consistency_mode_ is not supposed
+  // to be accessed or modified from any other thread of control:
+  // no thread-safety is assumed for the kudu::KuduSession interface.
+  // However, the lock is needed to check for pending operations because
+  // there may be pending RPCs and the background flush task may be running.
+  external_consistency_mode_ = m;
+  return Status::OK();
+}
 
-  if (old_batcher) {
-    old_batcher->swap(batcher);
+Status KuduSession::Data::SetFlushMode(FlushMode mode,
+                                       const sp::weak_ptr<KuduSession>& 
session) {
+  {
+    std::lock_guard<Mutex> l(mutex_);
+    if (HasPendingOperationsUnlocked()) {
+      // Don't allow to change flush mode otherwise it might lead to
+      // unexpected behavior while working with the KuduSession interface.
+      // E.g., if changing from MANUAL_FLUSH/AUTO_FLUSH_BACKGROUND to
+      // AUTO_FLUSH_SYNC while there are pending operations, on the next call 
of
+      // KuduSession::Apply() the buffered operations will be flushed along
+      // with the new one, which is not a good predictable behavior.
+      return Status::IllegalState(
+          "Cannot change flush mode when writes are buffered.");
+    }
+    // Thread-safety note: the flush_mode_ is accessed from the background 
flush
+    // thread for reading, so it should be modified under protection.
+    // There should not be any threads waiting on conditions
+    // which are affected by the setting, so no signalling is necessary here.
+    flush_mode_ = mode;
   }
+
+  TimeBasedFlushInit(session);
+
+  return Status::OK();
 }
 
-void KuduSession::Data::FlushFinished(Batcher* batcher) {
-  std::lock_guard<simple_spinlock> l(lock_);
-  CHECK_EQ(flushed_batchers_.erase(batcher), 1);
+Status KuduSession::Data::SetBufferBytesLimit(size_t size) {
+  std::lock_guard<Mutex> l(mutex_);
+  if (HasPendingOperationsUnlocked()) {
+    // NOTE: this is an artificial restriction.
+    return Status::IllegalState(
+        "Cannot change buffer size limit when writes are buffered.");
+  }
+  // Thread-safety note: the buffer_bytes_limit_ is not supposed to be accessed
+  // or modified from any other thread of control: no thread-safety is assumed
+  // for the kudu::KuduSession interface. Due to the latter reason,
+  // there should not be any threads waiting on conditions which are affected
+  // by the change, so signalling other threads isn't necessary here.
+  // However, the lock is needed to check for pending operations because
+  // there may be pending RPCs and the background flush task may be running.
+  buffer_bytes_limit_ = size;
+  return Status::OK();
 }
 
-Status KuduSession::Data::Close(bool force) {
-  if (batcher_->HasPendingOperations() && !force) {
-    return Status::IllegalState("Could not close. There are pending 
operations.");
+Status KuduSession::Data::SetBufferFlushWatermark(int watermark_pct) {
+  if (watermark_pct < 0 || watermark_pct > 100) {
+    return Status::InvalidArgument(
+        strings::Substitute("$0: watermark must be between 0 and 100 
inclusive",
+                            watermark_pct));
   }
-  batcher_->Abort();
+  std::lock_guard<Mutex> l(mutex_);
+  if (HasPendingOperationsUnlocked()) {
+    // NOTE: this is an artificial restriction.
+    return Status::IllegalState(
+        "Cannot change buffer flush watermark when writes are buffered.");
+  }
+  // Thread-safety note: the buffer_watermark_pct_ is not supposed
+  // to be accessed or modified from any other thread of control:
+  // no thread-safety is assumed for the kudu::KuduSession interface.
+  // Due to the latter reason, there should not be any threads waiting on
+  // conditions which are affected by the setting, so no signalling
+  // is necessary here.
+  // However, the lock is needed to check for pending operations because
+  // there may be pending RPCs and the background flush task may be running.
+  buffer_watermark_pct_ = watermark_pct;
+  return Status::OK();
+}
+
+Status KuduSession::Data::SetBufferFlushInterval(unsigned int millis) {
+  std::lock_guard<Mutex> l(mutex_);
+  if (HasPendingOperationsUnlocked()) {
+    // NOTE: this is an artificial restriction.
+    return Status::IllegalState(
+        "Cannot change buffer flush interval when writes are buffered.");
+  }
+  // Thread-safety note: the flush_interval_ is accessed from the background
+  // flush thread for reading, so it should be modified under protection.
+  flush_interval_ = MonoDelta::FromMilliseconds(millis);
+  return Status::OK();
+}
+
+Status KuduSession::Data::SetMaxBatchersNum(unsigned int max_num) {
+  // 1 is the minimum possible number of batchers per session.
+  // 0 means there isn't any limit on the maximum number of batchers.
+  std::lock_guard<Mutex> l(mutex_);
+  if (HasPendingOperationsUnlocked()) {
+    // NOTE: this is an artificial restriction.
+    return Status::IllegalState(
+        "Cannot change the limit on maximum number of batchers when writes are 
buffered.");
+  }
+  // Thread-safety note: the batchers_num_limit_ is not supposed
+  // to be accessed or modified from any other thread of control:
+  // no thread-safety is assumed for the kudu::KuduSession interface.
+  // Due to the latter reason, there should not be any threads waiting
+  // on conditions which are affected by the setting, so no signalling
+  // is necessary here.
+  // However, the lock is needed to check for pending operations because
+  // there may be pending RPCs and the background flush task may be running.
+  batchers_num_limit_ = max_num;
   return Status::OK();
 }
 
+void KuduSession::Data::SetTimeoutMillis(int timeout_ms) {
+  if (timeout_ms < 0) {
+    timeout_ms = 0;
+  }
+  {
+    std::lock_guard<Mutex> l(mutex_);
+    timeout_ms_ = timeout_ms;
+    if (batcher_) {
+      batcher_->SetTimeoutMillis(timeout_ms);
+    }
+  }
+}
+
+void KuduSession::Data::FlushAsync(KuduStatusCallback* cb) {
+  // Flush the current batcher if it is non-empty.
+  FlushCurrentBatcher(kWatermarkNonEmptyBatcher, cb);
+}
+
+Status KuduSession::Data::Flush() {
+  // The synchronous flush should initiate flushing of the current batcher,
+  // if it exists and has some data, and wait for flush completion of
+  // all session's batchers.
+  FlushCurrentBatcher(kWatermarkNonEmptyBatcher, nullptr);
+  {
+    std::lock_guard<Mutex> l(mutex_);
+    while (buffer_bytes_used_ > 0) {
+      condition_.Wait();
+    }
+  }
+  return error_collector_->CountErrors()
+      ? Status::IOError("Some errors occurred") : Status::OK();
+}
+
+bool KuduSession::Data::HasPendingOperations() const {
+  // Thread-safety note: the buffer_bytes_used_ can be accessed or modified
+  // from the threads busy with pending RPCs or from the background flush task.
+  std::lock_guard<Mutex> l(mutex_);
+  return HasPendingOperationsUnlocked();
+}
+
+bool KuduSession::Data::HasPendingOperationsUnlocked() const {
+  mutex_.AssertAcquired();
+  return buffer_bytes_used_ > 0;
+}
+
+int KuduSession::Data::CountBufferedOperations() const {
+  std::lock_guard<Mutex> l(mutex_);
+  if (batcher_) {
+    // Prior batchers (if any) with pending operations are not relevant here:
+    // the flushed operations, even if they have not reached the tablet server,
+    // are not considered "buffered". Yes, they are "pending",
+    // but not "buffered".
+    return batcher_->CountBufferedOperations();
+  }
+  return 0;
+}
+
+void KuduSession::Data::FlushCurrentBatcher(int64_t watermark,
+                                            KuduStatusCallback* cb) {
+  scoped_refptr<Batcher> batcher_to_flush;
+  {
+    std::lock_guard<Mutex> l(mutex_);
+    if (batcher_ && batcher_->buffer_bytes_used() >= watermark) {
+      batcher_to_flush.swap(batcher_);
+    }
+  }
+  if (batcher_to_flush) {
+    // Send off the buffered data. Important to do this outside of the lock
+    // since the callback may itself try to take the lock, in the case that
+    // the batch fails "inline" on the same thread.
+    batcher_to_flush->FlushAsync(cb);
+  } else {
+    // Nothing to do -- declare a victory.
+    if (cb) {
+      cb->Run(Status::OK());
+    }
+  }
+}
+
+MonoDelta KuduSession::Data::FlushCurrentBatcher(const MonoDelta& max_age) {
+  MonoDelta time_left;
+  scoped_refptr<Batcher> batcher_to_flush;
+  {
+    std::lock_guard<Mutex> l(mutex_);
+    if (batcher_) {
+      const MonoTime first_op_time = batcher_->first_op_time();
+      if (PREDICT_TRUE(first_op_time.Initialized())) {
+        const MonoTime now = MonoTime::Now();
+        if (first_op_time + max_age <= now) {
+          batcher_to_flush.swap(batcher_);
+        } else {
+          time_left = first_op_time + max_age - now;
+        }
+      }
+    }
+  }
+  if (batcher_to_flush) {
+    // Send off the buffered data. Important to do this outside of the lock
+    // since the callback may itself try to take the lock, in the case that
+    // the batch fails "inline" on the same thread.
+    batcher_to_flush->FlushAsync(nullptr);
+  }
+  return time_left;
+}
+
+// This method takes ownership over the specified write operation. On the 
return
+// from this this method, the operation must end up either in the corresponding
+// batcher (success path) or in the error collector (failure path). Otherwise
+// it would be a memory leak.
+Status KuduSession::Data::ApplyWriteOp(
+    const sp::weak_ptr<KuduSession>& weak_session,
+    KuduWriteOperation* write_op) {
+
+  if (!write_op) {
+    return Status::InvalidArgument("NULL operation");
+  }
+  if (!write_op->row().IsKeySet()) {
+    Status status = Status::IllegalState(
+        "Key not specified", write_op->ToString());
+    error_collector_->AddError(
+        gscoped_ptr<KuduError>(new KuduError(write_op, status)));
+    return status;
+  }
+
+  // Get 'wire size' of the write operation.
+  const int64_t required_size = Batcher::GetOperationSizeInBuffer(write_op);
+
+  // Thread-safety note: the buffer_bytes_limit_ and
+  // buffer_watermark_pct_ are not supposed to be modified
+  // from any other thread of control since no thread-safety is advertised
+  // for the kudu::KuduSession interface.
+  // So, no protection while accessing those members.
+  const size_t max_size = buffer_bytes_limit_;
+  const size_t flush_watermark =
+      buffer_bytes_limit_ * buffer_watermark_pct_ / 100;
+  // Thread-safety note: the flush_mode_ is accessed from the background
+  // time-based flush task for reading. Practically, it would be possible
+  // to get away with not protecting the flush_mode_ since it's read-only
+  // access here as well, but TSAN does not like that.
+  FlushMode flush_mode;
+  {
+    std::lock_guard<Mutex> l(mutex_);
+    flush_mode = flush_mode_;
+  }
+
+  // A sanity check: before trying to validate against any of run-time metrics,
+  // verify that the single operation can fit into an empty buffer
+  // given the restriction on the buffer size.
+  if (required_size > max_size) {
+    Status s = Status::Incomplete(strings::Substitute(
+          "buffer size limit is too small to fit operation: "
+          "required $0, size limit $1",
+          required_size, max_size));
+    error_collector_->AddError(
+        gscoped_ptr<KuduError>(new KuduError(write_op, s)));
+    return s;
+  }
+
+  if (flush_mode == AUTO_FLUSH_BACKGROUND) {
+    if (PREDICT_TRUE(buffer_pre_flush_enabled_)) {
+      // NOTE: the buffer_pre_flush_enabled_ is set to false only in tests.
+      //
+      // In need of an extra flush in some cases like shown in the diagram
+      // below, otherwise it will require waiting for the time-based flush
+      // to happen. Waiting for the time-based flush delays the stream
+      // of incoming write operations if such situation happens in the middle.
+      // The diagram shows the data layout in the buffer, the flush watermark,
+      // and the incoming write operation:
+      //                                             +----required_size-----+
+      //                                             |                      |
+      //                                             | Data of the          |
+      //                   +-------max_size-------+  | operation to add.    |
+      // flush_watermark-> |                      |  |                      |
+      //                   +--buffer_bytes_used---+  +----------0-----------+
+      //                   |                      |
+      //                   | Data of fresh (newly |
+      //                   | added) operations.   |
+      //                   |                      |
+      //                   +----------------------+
+      //                   | Data of operations   |
+      //                   | being flushed now.   |
+      //                   +----------0-----------+
+      FlushCurrentBatcher(max_size - required_size + 1, nullptr);
+    }
+  }
+  {
+    std::lock_guard<Mutex> l(mutex_);
+    if (flush_mode == AUTO_FLUSH_BACKGROUND) {
+      // In AUTO_FLUSH_BACKGROUND mode Apply() blocks if total would-be-used
+      // buffer space is over the limit. Once amount of buffered data drops
+      // below the limit, a blocking call to Apply() is unblocked.
+      while (buffer_bytes_used_ + required_size > max_size) {
+        condition_.Wait();
+      }
+    } else if (PREDICT_FALSE(buffer_bytes_used_ + required_size > max_size)) {
+      Status s = Status::Incomplete(strings::Substitute(
+          "not enough mutation buffer space remaining for operation: "
+          "required additional $0 when $1 of $2 already used",
+          required_size, buffer_bytes_used_, max_size));
+      error_collector_->AddError(
+          gscoped_ptr<KuduError>(new KuduError(write_op, s)));
+      return s;
+    }
+
+    // Add the operation to the current batcher. If the current batcher
+    // is not there, allocate one and set it to be current.
+    if (!batcher_) {
+      while (batchers_num_limit_ != 0 &&
+             batchers_num_ >= batchers_num_limit_) {
+        // Wait until it's possible to add a new batcher given the limit
+        // on the maximum outstanding batchers per session.
+        condition_.Wait();
+      }
+      DCHECK(!batcher_);
+      // Thread-safety note: the external_consistecy_mode_ and timeout_ms_
+      // are not supposed to be accessed or modified from any other thread
+      // of control since no thread-safety is advertised
+      // for the kudu::KuduSession interface.
+      scoped_refptr<Batcher> batcher(
+          new Batcher(client_.get(), error_collector_, weak_session,
+                      external_consistency_mode_));
+      if (timeout_ms_ != -1) {
+        batcher->SetTimeoutMillis(timeout_ms_);
+      }
+      batcher.swap(batcher_);
+      ++batchers_num_;
+    }
+    Status op_add_status = batcher_->Add(write_op);
+    if (PREDICT_FALSE(!op_add_status.ok())) {
+      error_collector_->AddError(
+          gscoped_ptr<KuduError>(new KuduError(write_op, op_add_status)));
+      return op_add_status;
+    }
+    // Finally, update the buffer space usage.
+    buffer_bytes_used_ += required_size;
+  }
+
+  if (flush_mode == AUTO_FLUSH_BACKGROUND) {
+    // In AUTO_FLUSH_BACKGROUND mode it's necessary to flush the newly added
+    // operations if the flush watermark is reached. The current batcher is
+    // the exclusive and the only container for the newly added operations.
+    // All other batchers, if any, contain operations which are scheduled
+    // to be sent or already on their way to corresponding tablet servers.
+    FlushCurrentBatcher(flush_watermark, nullptr);
+  }
+
+  return Status::OK();
+}
+
+void KuduSession::Data::TimeBasedFlushInit(
+    sp::weak_ptr<KuduSession> weak_session) {
+  KuduSession::Data::TimeBasedFlushTask(
+      Status::OK(), messenger_, weak_session, true);
+}
+
+void KuduSession::Data::TimeBasedFlushTask(
+    const Status& status,
+    std::weak_ptr<rpc::Messenger> weak_messenger,
+    sp::weak_ptr<KuduSession> weak_session,
+    bool do_startup_check) {
+  if (PREDICT_FALSE(!status.ok())) {
+    return;
+  }
+  // Check that the session is still alive to access the data safely.
+  sp::shared_ptr<KuduSession> session(weak_session.lock());
+  if (PREDICT_FALSE(!session)) {
+    return;
+  }
+
+  KuduSession::Data* data = session->data_;
+  MonoDelta max_batcher_age;
+  {
+    std::lock_guard<Mutex> l(data->mutex_);
+    if (do_startup_check && data->flush_task_active_) {
+      // The task is already active.
+      return;
+    }
+    if (data->flush_mode_ == AUTO_FLUSH_BACKGROUND) {
+      data->flush_task_active_ = true;
+    } else {
+      // Flush mode could change during the operation. If current mode
+      // is no longer AUTO_FLUSH_BACKGROUND, do not re-schedule the task.
+      data->flush_task_active_ = false;
+      return;
+    }
+    max_batcher_age = data->flush_interval_;
+  }
+
+  // Let's measure the age of a batcher as the time elapsed from the moment
+  // of adding the very first operation into the batcher.
+  // The idea is to flush the batcher when its age is very close to the
+  // flush_interval_: let's call it 'batcher flush age'. If current batcher
+  // hasn't reached its flush age yet, just re-schedule the task to
+  // re-evaluate the age of then-will-be-current batcher. So, if the current
+  // batcher is still current at that time, it will be exactly of its flush 
age.
+  MonoDelta time_left = data->FlushCurrentBatcher(max_batcher_age);
+  MonoDelta next_run = time_left.Initialized() ? time_left : max_batcher_age;
+
+  // Re-schedule the task to check and flush the current batcher
+  // when its age is closer to the flush_interval_.
+  std::shared_ptr<rpc::Messenger> messenger(weak_messenger.lock());
+  if (PREDICT_TRUE(messenger)) {
+    messenger->ScheduleOnReactor(
+        boost::bind(&KuduSession::Data::TimeBasedFlushTask,
+                    _1, messenger, session, false),
+        next_run);
+  }
+}
+
+int64_t KuduSession::Data::GetPendingOperationsSizeForTests() const {
+  std::lock_guard<Mutex> l(mutex_);
+  return buffer_bytes_used_;
+}
+
+size_t KuduSession::Data::GetBatchersCountForTests() const {
+  std::lock_guard<Mutex> l(mutex_);
+  return batchers_num_;
+}
+
 } // namespace client
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/93be1310/src/kudu/client/session-internal.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/session-internal.h 
b/src/kudu/client/session-internal.h
index d839314..e050301 100644
--- a/src/kudu/client/session-internal.h
+++ b/src/kudu/client/session-internal.h
@@ -17,13 +17,19 @@
 #ifndef KUDU_CLIENT_SESSION_INTERNAL_H
 #define KUDU_CLIENT_SESSION_INTERNAL_H
 
-#include <unordered_set>
+#include <memory>
 
 #include "kudu/client/client.h"
-#include "kudu/util/locks.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/condition_variable.h"
+#include "kudu/util/mutex.h"
 
 namespace kudu {
 
+namespace rpc {
+class Messenger;
+} // namespace rpc
+
 namespace client {
 
 namespace internal {
@@ -31,17 +37,27 @@ class Batcher;
 class ErrorCollector;
 } // internal
 
+// This class contains the code to do the heavy-lifting for the
+// kudu::KuduSession-related operations. Its interface does not assume
+// thread-safety in general, but it's thread-safe regarding the following
+// concurrent actions:
+//
+//  * calls to KuduSession::Apply():
+//    (there is at most one call at any moment, no concurrency is assumed 
here).
+//
+//  * activity of the time-based background flush task
+//    (there is at most one task running at any moment).
+//
+//  * calls from messenger/reactor threads upon completion of the flushed
+//    operations to the corresponding tablet server
+//    (there can be multiple of those at any moment).
+//
 class KuduSession::Data {
  public:
-  explicit Data(sp::shared_ptr<KuduClient> client);
-  ~Data();
-
-  void Init(const sp::shared_ptr<KuduSession>& session);
+  explicit Data(sp::shared_ptr<KuduClient> client,
+                std::shared_ptr<rpc::Messenger> messenger);
 
-  // Swap in a new Batcher instance, returning the old one in '*old_batcher', 
unless it is
-  // NULL.
-  void NewBatcher(const sp::shared_ptr<KuduSession>& session,
-                  scoped_refptr<internal::Batcher>* old_batcher);
+  void Init(const sp::weak_ptr<KuduSession>& session);
 
   // Called by Batcher when a flush has finished.
   void FlushFinished(internal::Batcher* b);
@@ -51,35 +67,162 @@ class KuduSession::Data {
   // operations.
   Status Close(bool force);
 
+  // Set flush mode for the session.
+  Status SetFlushMode(FlushMode mode, const sp::weak_ptr<KuduSession>& 
session);
+
+  // Set external consistency mode for the session.
+  Status SetExternalConsistencyMode(KuduSession::ExternalConsistencyMode m);
+
+  // Set limit on buffer space consumed by buffered write operations.
+  Status SetBufferBytesLimit(size_t size);
+
+  // Set buffer flush watermark (in percentage of the total buffer space).
+  Status SetBufferFlushWatermark(int32_t watermark_pct);
+
+  // Set the interval of the background max-wait flushing (in milliseconds).
+  Status SetBufferFlushInterval(unsigned int period_ms);
+
+  // Set the limit on maximum number of batchers with pending operations.
+  Status SetMaxBatchersNum(unsigned int period_ms);
+
+  // Set timeout for write operations, in milliseconds.
+  void SetTimeoutMillis(int timeout_ms);
+
+  // Initiate flushing of the current batcher and invoke the specified callback
+  // once the flushing is finished.
+  void FlushAsync(KuduStatusCallback* cb);
+
+  // Initiate flushing of the current batcher and await until all batchers
+  // complete flushing. The return value is Status::OK() if none of the
+  // batchers encountered errors and Status::IOError() otherwise.
+  Status Flush();
+
+  // Check whether there are operations not yet sent to tablet servers.
+  bool HasPendingOperations() const;
+  bool HasPendingOperationsUnlocked() const;
+
+  // Get total number of buffered operations.
+  int CountBufferedOperations() const;
+
+  // Initiate flushing of the current batcher if its accumulated operations'
+  // on-the-wire size has reached the specified watermark. The result
+  // of the asynchronous flushing is reported via the specified callback.
+  // Even if the callback is null (nullptr), that does not mean the errors
+  // are dropped on the floor -- in case of an error, corresponding information
+  // is added into the session's error collector and can be retrieved later.
+  void FlushCurrentBatcher(int64_t watermark,
+                           KuduStatusCallback* cb);
+
+  // Initiate flushing of the current batcher if it has reached the specified
+  // age. If the current batcher is present but it hasn't reached
+  // the specified age yet, just return the amount of time left until it 
reaches
+  // the specified age, not flushing the batcher. If the current batcher is
+  // of the specified age or older, flush the batcher and return uninitialized
+  // MonoDelta object. If there isn't current batcher, return uninitialized
+  // MonoDelta object.
+  MonoDelta FlushCurrentBatcher(const MonoDelta& max_age);
+
+  // Apply a write operation, i.e. push it through the batcher chain.
+  Status ApplyWriteOp(const sp::weak_ptr<KuduSession>& session,
+                      KuduWriteOperation* write_op);
+
+  // Check and start the time-based flush task in background, if necessary.
+  void TimeBasedFlushInit(sp::weak_ptr<KuduSession> weak_session);
+
+  // The self-rescheduling task to flush write operations which have been
+  // accumulating for too long (controlled by flush_interval_).
+  // This does real work only in case of AUTO_FLUSH_BACKGROUND mode.
+  // This method is used to initiate/re-initiate the run of the task
+  // and re-schedule the task from within. The 'do_startup_check' parameter
+  // must be set to 'true' when the method is called not from the task thread.
+  static void TimeBasedFlushTask(const Status& status,
+                                 std::weak_ptr<rpc::Messenger> weak_messenger,
+                                 sp::weak_ptr<KuduSession> weak_session,
+                                 bool do_startup_check);
+
+  // Get the total size of pending (i.e. both freshly added and
+  // in process of being flushed) operations. This method is used by tests 
only.
+  int64_t GetPendingOperationsSizeForTests() const;
+
+  // Get the total number of batchers in the session.
+  // This method is used by tests only.
+  size_t GetBatchersCountForTests() const;
+
+  // This constant represents a meaningful name for the first argument in
+  // expressions like FlushCurrentBatcher(1, cbk): this is the watermark
+  // corresponding to 1 byte of data. This watermark level is the minimum
+  // possible for a non-empty batcher, so any non-empty batcher will be flushed
+  // if calling FlushCurrentBatcher() using this watermark.
+  static const int64_t kWatermarkNonEmptyBatcher = 1;
+
   // The client that this session is associated with.
   const sp::shared_ptr<KuduClient> client_;
 
-  // Lock protecting internal state.
-  // Note that this lock should not be taken if the thread is already holding
-  // a Batcher lock. This must be acquired first.
-  mutable simple_spinlock lock_;
+  // The reference to the client's messenger (keeping the reference instead of
+  // declaring friendship to KuduClient and accessing it via the client_).
+  std::shared_ptr<rpc::Messenger> messenger_;
 
   // Buffer for errors.
   scoped_refptr<internal::ErrorCollector> error_collector_;
 
-  // The current batcher being prepared.
-  scoped_refptr<internal::Batcher> batcher_;
-
-  // Any batchers which have been flushed but not yet finished.
-  //
-  // Upon a batch finishing, it will call FlushFinished(), which removes the 
batcher from
-  // this set. This set does not hold any reference count to the Batcher, 
since, while
-  // the flush is active, the batcher manages its own refcount. The Batcher 
will always
-  // call FlushFinished() before it destructs itself, so we're guaranteed that 
these
-  // pointers stay valid.
-  std::unordered_set<internal::Batcher*> flushed_batchers_;
-
-  FlushMode flush_mode_;
   kudu::client::KuduSession::ExternalConsistencyMode 
external_consistency_mode_;
 
   // Timeout for the next batch.
   int timeout_ms_;
 
+  // Interval for the max-wait flush background task.
+  MonoDelta flush_interval_;  // protected by mutex_
+
+  // Whether the flush task is active/scheduled.
+  bool flush_task_active_; // protected by mutex_
+
+  // Current flush mode for the session's data.
+  FlushMode flush_mode_;  // protected by mutex_
+
+  // Mutex for the condition_ member (the condition variable).
+  // This lock protects variables from simultaneous access:
+  // batcher- and byte-counting members, data flow control and other variables
+  // whose modification requires to notify the thread which might be waiting
+  // on the 'condition_' variable.
+  mutable Mutex mutex_;
+
+  // Condition variable used by the code which allocates next batcher
+  // and count bytes used by the buffered write operations.
+  // I.e., it used by the data flow control logic while applying/adding
+  // new write operations (based on mutex_).
+  ConditionVariable condition_;
+
+  // The current batcher being prepared.
+  scoped_refptr<internal::Batcher> batcher_;// protected by mutex_
+
+  // Total number of active batchers. Include the current batcher accumulating
+  // the newly applied operations, and other batchers with not yet flushed
+  // or flushed but not yet finished operations.
+  size_t batchers_num_; // protected by mutex_
+
+  // Limit on the number of active batchers.
+  size_t batchers_num_limit_;
+
+  // Session-wide limit on total size of buffer used by all batched write
+  // operations. The buffer is a virtual entity: there isn't contiguous place
+  // in the memory which would contain that 'buffered' data. Instead, buffer's
+  // data is spread across all pending operations in all active batchers.
+  size_t buffer_bytes_limit_;
+
+  // The high-watermark level as the percentage of the buffer space used by
+  // freshly added (not-yet-scheduled-for-flush) write operations.
+  // Once the level is reached, the BackgroundFlusher triggers flushing
+  // of accumulated write operations when running in AUTO_FLUSH_BACKGROUND 
mode.
+  int32_t buffer_watermark_pct_;
+
+  // The total number of bytes used by buffered write operations.
+  int64_t buffer_bytes_used_;  // protected by mutex_
+
+ private:
+  FRIEND_TEST(ClientTest, TestAutoFlushBackgroundApplyBlocks);
+
+  bool buffer_pre_flush_enabled_; // Set to 'false' only in test scenarios.
+
   DISALLOW_COPY_AND_ASSIGN(Data);
 };
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/93be1310/src/kudu/client/write_op.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/write_op.cc b/src/kudu/client/write_op.cc
index 25891d9..bfb9550 100644
--- a/src/kudu/client/write_op.cc
+++ b/src/kudu/client/write_op.cc
@@ -41,8 +41,8 @@ RowOperationsPB_Type 
ToInternalWriteType(KuduWriteOperation::Type type) {
 
 KuduWriteOperation::KuduWriteOperation(const shared_ptr<KuduTable>& table)
   : table_(table),
-    row_(table->schema().schema_) {
-}
+    row_(table->schema().schema_),
+    size_in_buffer_(0) {}
 
 KuduWriteOperation::~KuduWriteOperation() {}
 
@@ -59,6 +59,11 @@ EncodedKey* KuduWriteOperation::CreateKey() const {
 }
 
 int64_t KuduWriteOperation::SizeInBuffer() const {
+  if (size_in_buffer_ > 0) {
+    // Once computed, the raw size of the operation is cached and returned
+    // for all subsequent calls.
+    return size_in_buffer_;
+  }
   const Schema* schema = row_.schema();
   int size = 1; // for the operation type
 
@@ -78,6 +83,7 @@ int64_t KuduWriteOperation::SizeInBuffer() const {
       }
     }
   }
+  size_in_buffer_ = size;
   return size;
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/93be1310/src/kudu/client/write_op.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/write_op.h b/src/kudu/client/write_op.h
index b534f1d..16fe5af 100644
--- a/src/kudu/client/write_op.h
+++ b/src/kudu/client/write_op.h
@@ -109,9 +109,12 @@ class KUDU_EXPORT KuduWriteOperation {
   const KuduTable* table() const { return table_.get(); }
 
   // Return the number of bytes required to buffer this operation,
-  // including direct and indirect data.
+  // including direct and indirect data. Once called, the result is cached
+  // so subsequent calls will return the size previously computed.
   int64_t SizeInBuffer() const;
 
+  mutable int64_t size_in_buffer_;
+
   DISALLOW_COPY_AND_ASSIGN(KuduWriteOperation);
 };
 

Reply via email to