http://git-wip-us.apache.org/repos/asf/kudu/blob/93be1310/src/kudu/client/session-internal.cc ---------------------------------------------------------------------- diff --git a/src/kudu/client/session-internal.cc b/src/kudu/client/session-internal.cc index 552b507..8da7679 100644 --- a/src/kudu/client/session-internal.cc +++ b/src/kudu/client/session-internal.cc @@ -20,64 +20,514 @@ #include <mutex> #include "kudu/client/batcher.h" +#include "kudu/client/callbacks.h" #include "kudu/client/error_collector.h" #include "kudu/client/shared_ptr.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/rpc/messenger.h" namespace kudu { +using rpc::Messenger; + namespace client { using internal::Batcher; using internal::ErrorCollector; using sp::shared_ptr; +using sp::weak_ptr; + -KuduSession::Data::Data(shared_ptr<KuduClient> client) +KuduSession::Data::Data(shared_ptr<KuduClient> client, + std::shared_ptr<rpc::Messenger> messenger) : client_(std::move(client)), + messenger_(std::move(messenger)), error_collector_(new ErrorCollector()), - flush_mode_(AUTO_FLUSH_SYNC), external_consistency_mode_(CLIENT_PROPAGATED), - timeout_ms_(-1) { + timeout_ms_(-1), + flush_interval_(MonoDelta::FromMilliseconds(1000)), + flush_task_active_(false), + flush_mode_(AUTO_FLUSH_SYNC), + condition_(&mutex_), + batchers_num_(0), + batchers_num_limit_(2), + buffer_bytes_limit_(7 * 1024 * 1024), + buffer_watermark_pct_(80), + buffer_bytes_used_(0), + buffer_pre_flush_enabled_(true) { } -KuduSession::Data::~Data() { +void KuduSession::Data::Init(const weak_ptr<KuduSession>& session) { + TimeBasedFlushInit(session); } -void KuduSession::Data::Init(const shared_ptr<KuduSession>& session) { - std::lock_guard<simple_spinlock> l(lock_); - CHECK(!batcher_); - NewBatcher(session, NULL); +void KuduSession::Data::FlushFinished(Batcher* batcher) { + const int64_t bytes_flushed = batcher->buffer_bytes_used(); + { + std::lock_guard<Mutex> l(mutex_); + buffer_bytes_used_ -= bytes_flushed; + --batchers_num_; + // The logic of KuduSession::ApplyWriteOp() needs to know + // if total number of batchers or buffer byte count decreases. + // There can be a thread waiting on the corresponding condition + // variable: the thread which runs KuduSession::Apply(), and + // since KuduSession interface does not advertise thread-safety, it's + // the only thread to notify. + condition_.Signal(); + } } -void KuduSession::Data::NewBatcher(const shared_ptr<KuduSession>& session, - scoped_refptr<Batcher>* old_batcher) { - DCHECK(lock_.is_locked()); +Status KuduSession::Data::Close(bool force) { + std::lock_guard<Mutex> l(mutex_); + if (!batcher_) { + return Status::OK(); + } + if (batcher_->HasPendingOperations() && !force) { + return Status::IllegalState("Could not close. There are pending operations."); + } + batcher_->Abort(); + + return Status::OK(); +} - scoped_refptr<Batcher> batcher( - new Batcher(client_.get(), error_collector_.get(), session, - external_consistency_mode_)); - if (timeout_ms_ != -1) { - batcher->SetTimeoutMillis(timeout_ms_); +Status KuduSession::Data::SetExternalConsistencyMode( + KuduSession::ExternalConsistencyMode m) { + std::lock_guard<Mutex> l(mutex_); + if (HasPendingOperationsUnlocked()) { + // NOTE: this is an artificial restriction. + return Status::IllegalState( + "Cannot change external consistency mode when writes are buffered"); } - batcher.swap(batcher_); + // Thread-safety note: the external_consistency_mode_ is not supposed + // to be accessed or modified from any other thread of control: + // no thread-safety is assumed for the kudu::KuduSession interface. + // However, the lock is needed to check for pending operations because + // there may be pending RPCs and the background flush task may be running. + external_consistency_mode_ = m; + return Status::OK(); +} - if (old_batcher) { - old_batcher->swap(batcher); +Status KuduSession::Data::SetFlushMode(FlushMode mode, + const sp::weak_ptr<KuduSession>& session) { + { + std::lock_guard<Mutex> l(mutex_); + if (HasPendingOperationsUnlocked()) { + // Don't allow to change flush mode otherwise it might lead to + // unexpected behavior while working with the KuduSession interface. + // E.g., if changing from MANUAL_FLUSH/AUTO_FLUSH_BACKGROUND to + // AUTO_FLUSH_SYNC while there are pending operations, on the next call of + // KuduSession::Apply() the buffered operations will be flushed along + // with the new one, which is not a good predictable behavior. + return Status::IllegalState( + "Cannot change flush mode when writes are buffered."); + } + // Thread-safety note: the flush_mode_ is accessed from the background flush + // thread for reading, so it should be modified under protection. + // There should not be any threads waiting on conditions + // which are affected by the setting, so no signalling is necessary here. + flush_mode_ = mode; } + + TimeBasedFlushInit(session); + + return Status::OK(); } -void KuduSession::Data::FlushFinished(Batcher* batcher) { - std::lock_guard<simple_spinlock> l(lock_); - CHECK_EQ(flushed_batchers_.erase(batcher), 1); +Status KuduSession::Data::SetBufferBytesLimit(size_t size) { + std::lock_guard<Mutex> l(mutex_); + if (HasPendingOperationsUnlocked()) { + // NOTE: this is an artificial restriction. + return Status::IllegalState( + "Cannot change buffer size limit when writes are buffered."); + } + // Thread-safety note: the buffer_bytes_limit_ is not supposed to be accessed + // or modified from any other thread of control: no thread-safety is assumed + // for the kudu::KuduSession interface. Due to the latter reason, + // there should not be any threads waiting on conditions which are affected + // by the change, so signalling other threads isn't necessary here. + // However, the lock is needed to check for pending operations because + // there may be pending RPCs and the background flush task may be running. + buffer_bytes_limit_ = size; + return Status::OK(); } -Status KuduSession::Data::Close(bool force) { - if (batcher_->HasPendingOperations() && !force) { - return Status::IllegalState("Could not close. There are pending operations."); +Status KuduSession::Data::SetBufferFlushWatermark(int watermark_pct) { + if (watermark_pct < 0 || watermark_pct > 100) { + return Status::InvalidArgument( + strings::Substitute("$0: watermark must be between 0 and 100 inclusive", + watermark_pct)); } - batcher_->Abort(); + std::lock_guard<Mutex> l(mutex_); + if (HasPendingOperationsUnlocked()) { + // NOTE: this is an artificial restriction. + return Status::IllegalState( + "Cannot change buffer flush watermark when writes are buffered."); + } + // Thread-safety note: the buffer_watermark_pct_ is not supposed + // to be accessed or modified from any other thread of control: + // no thread-safety is assumed for the kudu::KuduSession interface. + // Due to the latter reason, there should not be any threads waiting on + // conditions which are affected by the setting, so no signalling + // is necessary here. + // However, the lock is needed to check for pending operations because + // there may be pending RPCs and the background flush task may be running. + buffer_watermark_pct_ = watermark_pct; + return Status::OK(); +} + +Status KuduSession::Data::SetBufferFlushInterval(unsigned int millis) { + std::lock_guard<Mutex> l(mutex_); + if (HasPendingOperationsUnlocked()) { + // NOTE: this is an artificial restriction. + return Status::IllegalState( + "Cannot change buffer flush interval when writes are buffered."); + } + // Thread-safety note: the flush_interval_ is accessed from the background + // flush thread for reading, so it should be modified under protection. + flush_interval_ = MonoDelta::FromMilliseconds(millis); + return Status::OK(); +} + +Status KuduSession::Data::SetMaxBatchersNum(unsigned int max_num) { + // 1 is the minimum possible number of batchers per session. + // 0 means there isn't any limit on the maximum number of batchers. + std::lock_guard<Mutex> l(mutex_); + if (HasPendingOperationsUnlocked()) { + // NOTE: this is an artificial restriction. + return Status::IllegalState( + "Cannot change the limit on maximum number of batchers when writes are buffered."); + } + // Thread-safety note: the batchers_num_limit_ is not supposed + // to be accessed or modified from any other thread of control: + // no thread-safety is assumed for the kudu::KuduSession interface. + // Due to the latter reason, there should not be any threads waiting + // on conditions which are affected by the setting, so no signalling + // is necessary here. + // However, the lock is needed to check for pending operations because + // there may be pending RPCs and the background flush task may be running. + batchers_num_limit_ = max_num; return Status::OK(); } +void KuduSession::Data::SetTimeoutMillis(int timeout_ms) { + if (timeout_ms < 0) { + timeout_ms = 0; + } + { + std::lock_guard<Mutex> l(mutex_); + timeout_ms_ = timeout_ms; + if (batcher_) { + batcher_->SetTimeoutMillis(timeout_ms); + } + } +} + +void KuduSession::Data::FlushAsync(KuduStatusCallback* cb) { + // Flush the current batcher if it is non-empty. + FlushCurrentBatcher(kWatermarkNonEmptyBatcher, cb); +} + +Status KuduSession::Data::Flush() { + // The synchronous flush should initiate flushing of the current batcher, + // if it exists and has some data, and wait for flush completion of + // all session's batchers. + FlushCurrentBatcher(kWatermarkNonEmptyBatcher, nullptr); + { + std::lock_guard<Mutex> l(mutex_); + while (buffer_bytes_used_ > 0) { + condition_.Wait(); + } + } + return error_collector_->CountErrors() + ? Status::IOError("Some errors occurred") : Status::OK(); +} + +bool KuduSession::Data::HasPendingOperations() const { + // Thread-safety note: the buffer_bytes_used_ can be accessed or modified + // from the threads busy with pending RPCs or from the background flush task. + std::lock_guard<Mutex> l(mutex_); + return HasPendingOperationsUnlocked(); +} + +bool KuduSession::Data::HasPendingOperationsUnlocked() const { + mutex_.AssertAcquired(); + return buffer_bytes_used_ > 0; +} + +int KuduSession::Data::CountBufferedOperations() const { + std::lock_guard<Mutex> l(mutex_); + if (batcher_) { + // Prior batchers (if any) with pending operations are not relevant here: + // the flushed operations, even if they have not reached the tablet server, + // are not considered "buffered". Yes, they are "pending", + // but not "buffered". + return batcher_->CountBufferedOperations(); + } + return 0; +} + +void KuduSession::Data::FlushCurrentBatcher(int64_t watermark, + KuduStatusCallback* cb) { + scoped_refptr<Batcher> batcher_to_flush; + { + std::lock_guard<Mutex> l(mutex_); + if (batcher_ && batcher_->buffer_bytes_used() >= watermark) { + batcher_to_flush.swap(batcher_); + } + } + if (batcher_to_flush) { + // Send off the buffered data. Important to do this outside of the lock + // since the callback may itself try to take the lock, in the case that + // the batch fails "inline" on the same thread. + batcher_to_flush->FlushAsync(cb); + } else { + // Nothing to do -- declare a victory. + if (cb) { + cb->Run(Status::OK()); + } + } +} + +MonoDelta KuduSession::Data::FlushCurrentBatcher(const MonoDelta& max_age) { + MonoDelta time_left; + scoped_refptr<Batcher> batcher_to_flush; + { + std::lock_guard<Mutex> l(mutex_); + if (batcher_) { + const MonoTime first_op_time = batcher_->first_op_time(); + if (PREDICT_TRUE(first_op_time.Initialized())) { + const MonoTime now = MonoTime::Now(); + if (first_op_time + max_age <= now) { + batcher_to_flush.swap(batcher_); + } else { + time_left = first_op_time + max_age - now; + } + } + } + } + if (batcher_to_flush) { + // Send off the buffered data. Important to do this outside of the lock + // since the callback may itself try to take the lock, in the case that + // the batch fails "inline" on the same thread. + batcher_to_flush->FlushAsync(nullptr); + } + return time_left; +} + +// This method takes ownership over the specified write operation. On the return +// from this this method, the operation must end up either in the corresponding +// batcher (success path) or in the error collector (failure path). Otherwise +// it would be a memory leak. +Status KuduSession::Data::ApplyWriteOp( + const sp::weak_ptr<KuduSession>& weak_session, + KuduWriteOperation* write_op) { + + if (!write_op) { + return Status::InvalidArgument("NULL operation"); + } + if (!write_op->row().IsKeySet()) { + Status status = Status::IllegalState( + "Key not specified", write_op->ToString()); + error_collector_->AddError( + gscoped_ptr<KuduError>(new KuduError(write_op, status))); + return status; + } + + // Get 'wire size' of the write operation. + const int64_t required_size = Batcher::GetOperationSizeInBuffer(write_op); + + // Thread-safety note: the buffer_bytes_limit_ and + // buffer_watermark_pct_ are not supposed to be modified + // from any other thread of control since no thread-safety is advertised + // for the kudu::KuduSession interface. + // So, no protection while accessing those members. + const size_t max_size = buffer_bytes_limit_; + const size_t flush_watermark = + buffer_bytes_limit_ * buffer_watermark_pct_ / 100; + // Thread-safety note: the flush_mode_ is accessed from the background + // time-based flush task for reading. Practically, it would be possible + // to get away with not protecting the flush_mode_ since it's read-only + // access here as well, but TSAN does not like that. + FlushMode flush_mode; + { + std::lock_guard<Mutex> l(mutex_); + flush_mode = flush_mode_; + } + + // A sanity check: before trying to validate against any of run-time metrics, + // verify that the single operation can fit into an empty buffer + // given the restriction on the buffer size. + if (required_size > max_size) { + Status s = Status::Incomplete(strings::Substitute( + "buffer size limit is too small to fit operation: " + "required $0, size limit $1", + required_size, max_size)); + error_collector_->AddError( + gscoped_ptr<KuduError>(new KuduError(write_op, s))); + return s; + } + + if (flush_mode == AUTO_FLUSH_BACKGROUND) { + if (PREDICT_TRUE(buffer_pre_flush_enabled_)) { + // NOTE: the buffer_pre_flush_enabled_ is set to false only in tests. + // + // In need of an extra flush in some cases like shown in the diagram + // below, otherwise it will require waiting for the time-based flush + // to happen. Waiting for the time-based flush delays the stream + // of incoming write operations if such situation happens in the middle. + // The diagram shows the data layout in the buffer, the flush watermark, + // and the incoming write operation: + // +----required_size-----+ + // | | + // | Data of the | + // +-------max_size-------+ | operation to add. | + // flush_watermark-> | | | | + // +--buffer_bytes_used---+ +----------0-----------+ + // | | + // | Data of fresh (newly | + // | added) operations. | + // | | + // +----------------------+ + // | Data of operations | + // | being flushed now. | + // +----------0-----------+ + FlushCurrentBatcher(max_size - required_size + 1, nullptr); + } + } + { + std::lock_guard<Mutex> l(mutex_); + if (flush_mode == AUTO_FLUSH_BACKGROUND) { + // In AUTO_FLUSH_BACKGROUND mode Apply() blocks if total would-be-used + // buffer space is over the limit. Once amount of buffered data drops + // below the limit, a blocking call to Apply() is unblocked. + while (buffer_bytes_used_ + required_size > max_size) { + condition_.Wait(); + } + } else if (PREDICT_FALSE(buffer_bytes_used_ + required_size > max_size)) { + Status s = Status::Incomplete(strings::Substitute( + "not enough mutation buffer space remaining for operation: " + "required additional $0 when $1 of $2 already used", + required_size, buffer_bytes_used_, max_size)); + error_collector_->AddError( + gscoped_ptr<KuduError>(new KuduError(write_op, s))); + return s; + } + + // Add the operation to the current batcher. If the current batcher + // is not there, allocate one and set it to be current. + if (!batcher_) { + while (batchers_num_limit_ != 0 && + batchers_num_ >= batchers_num_limit_) { + // Wait until it's possible to add a new batcher given the limit + // on the maximum outstanding batchers per session. + condition_.Wait(); + } + DCHECK(!batcher_); + // Thread-safety note: the external_consistecy_mode_ and timeout_ms_ + // are not supposed to be accessed or modified from any other thread + // of control since no thread-safety is advertised + // for the kudu::KuduSession interface. + scoped_refptr<Batcher> batcher( + new Batcher(client_.get(), error_collector_, weak_session, + external_consistency_mode_)); + if (timeout_ms_ != -1) { + batcher->SetTimeoutMillis(timeout_ms_); + } + batcher.swap(batcher_); + ++batchers_num_; + } + Status op_add_status = batcher_->Add(write_op); + if (PREDICT_FALSE(!op_add_status.ok())) { + error_collector_->AddError( + gscoped_ptr<KuduError>(new KuduError(write_op, op_add_status))); + return op_add_status; + } + // Finally, update the buffer space usage. + buffer_bytes_used_ += required_size; + } + + if (flush_mode == AUTO_FLUSH_BACKGROUND) { + // In AUTO_FLUSH_BACKGROUND mode it's necessary to flush the newly added + // operations if the flush watermark is reached. The current batcher is + // the exclusive and the only container for the newly added operations. + // All other batchers, if any, contain operations which are scheduled + // to be sent or already on their way to corresponding tablet servers. + FlushCurrentBatcher(flush_watermark, nullptr); + } + + return Status::OK(); +} + +void KuduSession::Data::TimeBasedFlushInit( + sp::weak_ptr<KuduSession> weak_session) { + KuduSession::Data::TimeBasedFlushTask( + Status::OK(), messenger_, weak_session, true); +} + +void KuduSession::Data::TimeBasedFlushTask( + const Status& status, + std::weak_ptr<rpc::Messenger> weak_messenger, + sp::weak_ptr<KuduSession> weak_session, + bool do_startup_check) { + if (PREDICT_FALSE(!status.ok())) { + return; + } + // Check that the session is still alive to access the data safely. + sp::shared_ptr<KuduSession> session(weak_session.lock()); + if (PREDICT_FALSE(!session)) { + return; + } + + KuduSession::Data* data = session->data_; + MonoDelta max_batcher_age; + { + std::lock_guard<Mutex> l(data->mutex_); + if (do_startup_check && data->flush_task_active_) { + // The task is already active. + return; + } + if (data->flush_mode_ == AUTO_FLUSH_BACKGROUND) { + data->flush_task_active_ = true; + } else { + // Flush mode could change during the operation. If current mode + // is no longer AUTO_FLUSH_BACKGROUND, do not re-schedule the task. + data->flush_task_active_ = false; + return; + } + max_batcher_age = data->flush_interval_; + } + + // Let's measure the age of a batcher as the time elapsed from the moment + // of adding the very first operation into the batcher. + // The idea is to flush the batcher when its age is very close to the + // flush_interval_: let's call it 'batcher flush age'. If current batcher + // hasn't reached its flush age yet, just re-schedule the task to + // re-evaluate the age of then-will-be-current batcher. So, if the current + // batcher is still current at that time, it will be exactly of its flush age. + MonoDelta time_left = data->FlushCurrentBatcher(max_batcher_age); + MonoDelta next_run = time_left.Initialized() ? time_left : max_batcher_age; + + // Re-schedule the task to check and flush the current batcher + // when its age is closer to the flush_interval_. + std::shared_ptr<rpc::Messenger> messenger(weak_messenger.lock()); + if (PREDICT_TRUE(messenger)) { + messenger->ScheduleOnReactor( + boost::bind(&KuduSession::Data::TimeBasedFlushTask, + _1, messenger, session, false), + next_run); + } +} + +int64_t KuduSession::Data::GetPendingOperationsSizeForTests() const { + std::lock_guard<Mutex> l(mutex_); + return buffer_bytes_used_; +} + +size_t KuduSession::Data::GetBatchersCountForTests() const { + std::lock_guard<Mutex> l(mutex_); + return batchers_num_; +} + } // namespace client } // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/93be1310/src/kudu/client/session-internal.h ---------------------------------------------------------------------- diff --git a/src/kudu/client/session-internal.h b/src/kudu/client/session-internal.h index d839314..e050301 100644 --- a/src/kudu/client/session-internal.h +++ b/src/kudu/client/session-internal.h @@ -17,13 +17,19 @@ #ifndef KUDU_CLIENT_SESSION_INTERNAL_H #define KUDU_CLIENT_SESSION_INTERNAL_H -#include <unordered_set> +#include <memory> #include "kudu/client/client.h" -#include "kudu/util/locks.h" +#include "kudu/gutil/ref_counted.h" +#include "kudu/util/condition_variable.h" +#include "kudu/util/mutex.h" namespace kudu { +namespace rpc { +class Messenger; +} // namespace rpc + namespace client { namespace internal { @@ -31,17 +37,27 @@ class Batcher; class ErrorCollector; } // internal +// This class contains the code to do the heavy-lifting for the +// kudu::KuduSession-related operations. Its interface does not assume +// thread-safety in general, but it's thread-safe regarding the following +// concurrent actions: +// +// * calls to KuduSession::Apply(): +// (there is at most one call at any moment, no concurrency is assumed here). +// +// * activity of the time-based background flush task +// (there is at most one task running at any moment). +// +// * calls from messenger/reactor threads upon completion of the flushed +// operations to the corresponding tablet server +// (there can be multiple of those at any moment). +// class KuduSession::Data { public: - explicit Data(sp::shared_ptr<KuduClient> client); - ~Data(); - - void Init(const sp::shared_ptr<KuduSession>& session); + explicit Data(sp::shared_ptr<KuduClient> client, + std::shared_ptr<rpc::Messenger> messenger); - // Swap in a new Batcher instance, returning the old one in '*old_batcher', unless it is - // NULL. - void NewBatcher(const sp::shared_ptr<KuduSession>& session, - scoped_refptr<internal::Batcher>* old_batcher); + void Init(const sp::weak_ptr<KuduSession>& session); // Called by Batcher when a flush has finished. void FlushFinished(internal::Batcher* b); @@ -51,35 +67,162 @@ class KuduSession::Data { // operations. Status Close(bool force); + // Set flush mode for the session. + Status SetFlushMode(FlushMode mode, const sp::weak_ptr<KuduSession>& session); + + // Set external consistency mode for the session. + Status SetExternalConsistencyMode(KuduSession::ExternalConsistencyMode m); + + // Set limit on buffer space consumed by buffered write operations. + Status SetBufferBytesLimit(size_t size); + + // Set buffer flush watermark (in percentage of the total buffer space). + Status SetBufferFlushWatermark(int32_t watermark_pct); + + // Set the interval of the background max-wait flushing (in milliseconds). + Status SetBufferFlushInterval(unsigned int period_ms); + + // Set the limit on maximum number of batchers with pending operations. + Status SetMaxBatchersNum(unsigned int period_ms); + + // Set timeout for write operations, in milliseconds. + void SetTimeoutMillis(int timeout_ms); + + // Initiate flushing of the current batcher and invoke the specified callback + // once the flushing is finished. + void FlushAsync(KuduStatusCallback* cb); + + // Initiate flushing of the current batcher and await until all batchers + // complete flushing. The return value is Status::OK() if none of the + // batchers encountered errors and Status::IOError() otherwise. + Status Flush(); + + // Check whether there are operations not yet sent to tablet servers. + bool HasPendingOperations() const; + bool HasPendingOperationsUnlocked() const; + + // Get total number of buffered operations. + int CountBufferedOperations() const; + + // Initiate flushing of the current batcher if its accumulated operations' + // on-the-wire size has reached the specified watermark. The result + // of the asynchronous flushing is reported via the specified callback. + // Even if the callback is null (nullptr), that does not mean the errors + // are dropped on the floor -- in case of an error, corresponding information + // is added into the session's error collector and can be retrieved later. + void FlushCurrentBatcher(int64_t watermark, + KuduStatusCallback* cb); + + // Initiate flushing of the current batcher if it has reached the specified + // age. If the current batcher is present but it hasn't reached + // the specified age yet, just return the amount of time left until it reaches + // the specified age, not flushing the batcher. If the current batcher is + // of the specified age or older, flush the batcher and return uninitialized + // MonoDelta object. If there isn't current batcher, return uninitialized + // MonoDelta object. + MonoDelta FlushCurrentBatcher(const MonoDelta& max_age); + + // Apply a write operation, i.e. push it through the batcher chain. + Status ApplyWriteOp(const sp::weak_ptr<KuduSession>& session, + KuduWriteOperation* write_op); + + // Check and start the time-based flush task in background, if necessary. + void TimeBasedFlushInit(sp::weak_ptr<KuduSession> weak_session); + + // The self-rescheduling task to flush write operations which have been + // accumulating for too long (controlled by flush_interval_). + // This does real work only in case of AUTO_FLUSH_BACKGROUND mode. + // This method is used to initiate/re-initiate the run of the task + // and re-schedule the task from within. The 'do_startup_check' parameter + // must be set to 'true' when the method is called not from the task thread. + static void TimeBasedFlushTask(const Status& status, + std::weak_ptr<rpc::Messenger> weak_messenger, + sp::weak_ptr<KuduSession> weak_session, + bool do_startup_check); + + // Get the total size of pending (i.e. both freshly added and + // in process of being flushed) operations. This method is used by tests only. + int64_t GetPendingOperationsSizeForTests() const; + + // Get the total number of batchers in the session. + // This method is used by tests only. + size_t GetBatchersCountForTests() const; + + // This constant represents a meaningful name for the first argument in + // expressions like FlushCurrentBatcher(1, cbk): this is the watermark + // corresponding to 1 byte of data. This watermark level is the minimum + // possible for a non-empty batcher, so any non-empty batcher will be flushed + // if calling FlushCurrentBatcher() using this watermark. + static const int64_t kWatermarkNonEmptyBatcher = 1; + // The client that this session is associated with. const sp::shared_ptr<KuduClient> client_; - // Lock protecting internal state. - // Note that this lock should not be taken if the thread is already holding - // a Batcher lock. This must be acquired first. - mutable simple_spinlock lock_; + // The reference to the client's messenger (keeping the reference instead of + // declaring friendship to KuduClient and accessing it via the client_). + std::shared_ptr<rpc::Messenger> messenger_; // Buffer for errors. scoped_refptr<internal::ErrorCollector> error_collector_; - // The current batcher being prepared. - scoped_refptr<internal::Batcher> batcher_; - - // Any batchers which have been flushed but not yet finished. - // - // Upon a batch finishing, it will call FlushFinished(), which removes the batcher from - // this set. This set does not hold any reference count to the Batcher, since, while - // the flush is active, the batcher manages its own refcount. The Batcher will always - // call FlushFinished() before it destructs itself, so we're guaranteed that these - // pointers stay valid. - std::unordered_set<internal::Batcher*> flushed_batchers_; - - FlushMode flush_mode_; kudu::client::KuduSession::ExternalConsistencyMode external_consistency_mode_; // Timeout for the next batch. int timeout_ms_; + // Interval for the max-wait flush background task. + MonoDelta flush_interval_; // protected by mutex_ + + // Whether the flush task is active/scheduled. + bool flush_task_active_; // protected by mutex_ + + // Current flush mode for the session's data. + FlushMode flush_mode_; // protected by mutex_ + + // Mutex for the condition_ member (the condition variable). + // This lock protects variables from simultaneous access: + // batcher- and byte-counting members, data flow control and other variables + // whose modification requires to notify the thread which might be waiting + // on the 'condition_' variable. + mutable Mutex mutex_; + + // Condition variable used by the code which allocates next batcher + // and count bytes used by the buffered write operations. + // I.e., it used by the data flow control logic while applying/adding + // new write operations (based on mutex_). + ConditionVariable condition_; + + // The current batcher being prepared. + scoped_refptr<internal::Batcher> batcher_;// protected by mutex_ + + // Total number of active batchers. Include the current batcher accumulating + // the newly applied operations, and other batchers with not yet flushed + // or flushed but not yet finished operations. + size_t batchers_num_; // protected by mutex_ + + // Limit on the number of active batchers. + size_t batchers_num_limit_; + + // Session-wide limit on total size of buffer used by all batched write + // operations. The buffer is a virtual entity: there isn't contiguous place + // in the memory which would contain that 'buffered' data. Instead, buffer's + // data is spread across all pending operations in all active batchers. + size_t buffer_bytes_limit_; + + // The high-watermark level as the percentage of the buffer space used by + // freshly added (not-yet-scheduled-for-flush) write operations. + // Once the level is reached, the BackgroundFlusher triggers flushing + // of accumulated write operations when running in AUTO_FLUSH_BACKGROUND mode. + int32_t buffer_watermark_pct_; + + // The total number of bytes used by buffered write operations. + int64_t buffer_bytes_used_; // protected by mutex_ + + private: + FRIEND_TEST(ClientTest, TestAutoFlushBackgroundApplyBlocks); + + bool buffer_pre_flush_enabled_; // Set to 'false' only in test scenarios. + DISALLOW_COPY_AND_ASSIGN(Data); }; http://git-wip-us.apache.org/repos/asf/kudu/blob/93be1310/src/kudu/client/write_op.cc ---------------------------------------------------------------------- diff --git a/src/kudu/client/write_op.cc b/src/kudu/client/write_op.cc index 25891d9..bfb9550 100644 --- a/src/kudu/client/write_op.cc +++ b/src/kudu/client/write_op.cc @@ -41,8 +41,8 @@ RowOperationsPB_Type ToInternalWriteType(KuduWriteOperation::Type type) { KuduWriteOperation::KuduWriteOperation(const shared_ptr<KuduTable>& table) : table_(table), - row_(table->schema().schema_) { -} + row_(table->schema().schema_), + size_in_buffer_(0) {} KuduWriteOperation::~KuduWriteOperation() {} @@ -59,6 +59,11 @@ EncodedKey* KuduWriteOperation::CreateKey() const { } int64_t KuduWriteOperation::SizeInBuffer() const { + if (size_in_buffer_ > 0) { + // Once computed, the raw size of the operation is cached and returned + // for all subsequent calls. + return size_in_buffer_; + } const Schema* schema = row_.schema(); int size = 1; // for the operation type @@ -78,6 +83,7 @@ int64_t KuduWriteOperation::SizeInBuffer() const { } } } + size_in_buffer_ = size; return size; } http://git-wip-us.apache.org/repos/asf/kudu/blob/93be1310/src/kudu/client/write_op.h ---------------------------------------------------------------------- diff --git a/src/kudu/client/write_op.h b/src/kudu/client/write_op.h index b534f1d..16fe5af 100644 --- a/src/kudu/client/write_op.h +++ b/src/kudu/client/write_op.h @@ -109,9 +109,12 @@ class KUDU_EXPORT KuduWriteOperation { const KuduTable* table() const { return table_.get(); } // Return the number of bytes required to buffer this operation, - // including direct and indirect data. + // including direct and indirect data. Once called, the result is cached + // so subsequent calls will return the size previously computed. int64_t SizeInBuffer() const; + mutable int64_t size_in_buffer_; + DISALLOW_COPY_AND_ASSIGN(KuduWriteOperation); };
