This is an automated email from the ASF dual-hosted git repository. awong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 524ed47e3f56fdb90f384fd8454efe179d5aa920 Author: Andrew Wong <[email protected]> AuthorDate: Tue Oct 15 22:13:24 2019 -0700 log: rename a couple of things for clarity This patch: - renames Log::DoAppend() to Log::WriteBatch(), - renames AppendThread::HandleGroup() to AppendThread::HandleBatches(), - renames AppendThread::WorkerState to AppendThread::ThreadState, to homogenize the terminology used in that class, - renames AppendThread::{WORKER_STOPPED,WORKER_ACTIVE} to IDLE and ACTIVE respectively I think these names make it slightly more obvious what the functions are doing. There are no functional changes here. Change-Id: I9fcaf2656bc31e5015c285392029fa98f6137cef Reviewed-on: http://gerrit.cloudera.org:8080/14477 Tested-by: Kudu Jenkins Reviewed-by: Andrew Wong <[email protected]> --- src/kudu/consensus/log.cc | 72 ++++++++++++++++++++++++----------------------- src/kudu/consensus/log.h | 2 +- 2 files changed, 38 insertions(+), 36 deletions(-) diff --git a/src/kudu/consensus/log.cc b/src/kudu/consensus/log.cc index 7a95afa..df3ac4b 100644 --- a/src/kudu/consensus/log.cc +++ b/src/kudu/consensus/log.cc @@ -193,16 +193,18 @@ using strings::Substitute; // // The design of submitting tasks to the threadpool is slightly tricky in order // to achieve group commit and not have to submit one task per appended batch. -// Instead, a generic 'DoWork()' task is used which loops collecting work until -// it finds that it has been idle for a while, at which point the task finishes. +// Instead, a generic 'ProcessQueue()' task is used which loops collecting +// batches to write until it finds that the queue has been empty for a while, +// at which point the task finishes. // // The trick, then, lies in two areas: // -// 1) after appending a batch, we need to ensure that a task is already running, -// and if not, start one. This is done in Wake(). +// 1) After adding a batch to the queue, we need to ensure that a task is +// already running, and if not, start one. This is done in Wake(). // -// 2) when the task finds no more work to do and wants to go idle, it needs to -// ensure that it doesn't miss a concurrent wake-up. This is done in GoIdle(). +// 2) When the task finds no more batches to write and wants to go idle, it +// needs to ensure that it doesn't miss a concurrent additions to the queue. +// This is done in GoIdle(). // // See the implementation comments in Wake() and GoIdle() for details. class Log::AppendThread { @@ -224,15 +226,15 @@ class Log::AppendThread { void Wake(); bool active() const { - return base::subtle::NoBarrier_Load(&worker_state_) == WORKER_ACTIVE; + return base::subtle::NoBarrier_Load(&thread_state_) == ACTIVE; } private: // The task submitted to the threadpool which collects batches from the queue // and appends them, until it determines that the queue is idle. - void DoWork(); + void ProcessQueue(); - // Tries to transition back to WORKER_STOPPED state. If successful, returns true. + // Tries to transition back to IDLE state. If successful, returns true. // // Otherwise, returns false to indicate that the task should keep running because // a new task was enqueued just as we were trying to go idle. @@ -240,21 +242,21 @@ class Log::AppendThread { // Handle the actual appending of a group of entries. Responsible for deleting the // LogEntryBatch* pointers. - void HandleGroup(vector<LogEntryBatch*> entry_batches); + void HandleBatches(vector<LogEntryBatch*> entry_batches); string LogPrefix() const; Log* const log_; - // Atomic state machine for whether there is any worker task currently - // queued or running on append_pool_. See Wake() and GoIdle() for more details. - enum WorkerState { - // No worker task is queued or running. - WORKER_STOPPED, - // A worker task is queued or running. - WORKER_ACTIVE + // Atomic state machine for whether there is any task currently queued or + // running on append_pool_. See Wake() and GoIdle() for more details. + enum ThreadState { + // No task is queued or running. + IDLE, + // A task is queued or running. + ACTIVE }; - Atomic32 worker_state_ = WORKER_STOPPED; + Atomic32 thread_state_ = IDLE; // Pool with a single thread, which handles shutting down the thread // when idle. @@ -284,9 +286,9 @@ Status Log::AppendThread::Init() { void Log::AppendThread::Wake() { DCHECK(append_pool_); auto old_status = base::subtle::NoBarrier_CompareAndSwap( - &worker_state_, WORKER_STOPPED, WORKER_ACTIVE); - if (old_status == WORKER_STOPPED) { - CHECK_OK(append_pool_->SubmitClosure(Bind(&Log::AppendThread::DoWork, Unretained(this)))); + &thread_state_, IDLE, ACTIVE); + if (old_status == IDLE) { + CHECK_OK(append_pool_->SubmitClosure(Bind(&Log::AppendThread::ProcessQueue, Unretained(this)))); } } @@ -306,19 +308,19 @@ bool Log::AppendThread::GoIdle() { // So, we first transition back to STOPPED state, and then re-check to see // if there has been something enqueued in the meantime. - auto old_state = base::subtle::NoBarrier_AtomicExchange(&worker_state_, WORKER_STOPPED); - DCHECK_EQ(old_state, WORKER_ACTIVE); + auto old_state = base::subtle::NoBarrier_AtomicExchange(&thread_state_, IDLE); + DCHECK_EQ(old_state, ACTIVE); if (log_->entry_queue()->empty()) { // Nothing got enqueued, which means there must not have been any missed wakeup. - // We are now in WORKER_STOPPED state. + // We are now in IDLE state. return true; } MAYBE_INJECT_RANDOM_LATENCY(FLAGS_log_inject_thread_lifecycle_latency_ms); // Someone enqueued something. We don't know whether their wakeup was successful // or not, but we can just try to transition back to ACTIVE mode here. - if (base::subtle::NoBarrier_CompareAndSwap(&worker_state_, WORKER_STOPPED, WORKER_ACTIVE) - == WORKER_STOPPED) { + if (base::subtle::NoBarrier_CompareAndSwap(&thread_state_, IDLE, ACTIVE) + == IDLE) { // Their wake-up was lost, but we've now marked ourselves as running. MAYBE_INJECT_RANDOM_LATENCY(FLAGS_log_inject_thread_lifecycle_latency_ms); return false; @@ -330,8 +332,8 @@ bool Log::AppendThread::GoIdle() { return true; } -void Log::AppendThread::DoWork() { - DCHECK_EQ(ANNOTATE_UNPROTECTED_READ(worker_state_), WORKER_ACTIVE); +void Log::AppendThread::ProcessQueue() { + DCHECK_EQ(ANNOTATE_UNPROTECTED_READ(thread_state_), ACTIVE); VLOG_WITH_PREFIX(2) << "WAL Appender going active"; while (true) { MonoTime deadline = MonoTime::Now() + @@ -344,12 +346,12 @@ void Log::AppendThread::DoWork() { if (GoIdle()) break; continue; } - HandleGroup(std::move(entry_batches)); + HandleBatches(std::move(entry_batches)); } VLOG_WITH_PREFIX(2) << "WAL Appender going idle"; } -void Log::AppendThread::HandleGroup(vector<LogEntryBatch*> entry_batches) { +void Log::AppendThread::HandleBatches(vector<LogEntryBatch*> entry_batches) { if (log_->metrics_) { log_->metrics_->entry_batches_per_group->Increment(entry_batches.size()); } @@ -360,7 +362,7 @@ void Log::AppendThread::HandleGroup(vector<LogEntryBatch*> entry_batches) { bool is_all_commits = true; for (LogEntryBatch* entry_batch : entry_batches) { TRACE_EVENT_FLOW_END0("log", "Batch", entry_batch); - Status s = log_->DoAppend(entry_batch); + Status s = log_->WriteBatch(entry_batch); if (PREDICT_FALSE(!s.ok())) { LOG_WITH_PREFIX(ERROR) << "Error appending to the log: " << s.ToString(); // TODO(af): If a single transaction fails to append, should we @@ -630,12 +632,12 @@ Status Log::AsyncAppendCommit(gscoped_ptr<consensus::CommitMsg> commit_msg, return Status::OK(); } -Status Log::DoAppend(LogEntryBatch* entry_batch) { +Status Log::WriteBatch(LogEntryBatch* entry_batch) { size_t num_entries = entry_batch->count(); - DCHECK_GT(num_entries, 0) << "Cannot call DoAppend() with zero entries reserved"; + DCHECK_GT(num_entries, 0) << "Cannot call WriteBatch() with zero entries reserved"; MAYBE_RETURN_FAILURE(FLAGS_log_inject_io_error_on_append_fraction, - Status::IOError("Injected IOError in Log::DoAppend()")); + Status::IOError("Injected IOError in Log::WriteBatch()")); Slice entry_batch_data = entry_batch->data(); uint32_t entry_batch_bytes = entry_batch->total_size_bytes(); @@ -804,7 +806,7 @@ Status Log::Append(LogEntryPB* entry) { entry_batch_pb->mutable_entry()->AddAllocated(entry); LogEntryBatch entry_batch(entry->type(), std::move(entry_batch_pb), 1); entry_batch.Serialize(); - Status s = DoAppend(&entry_batch); + Status s = WriteBatch(&entry_batch); if (s.ok()) { s = Sync(); } diff --git a/src/kudu/consensus/log.h b/src/kudu/consensus/log.h index 34ffe15..1df3a20 100644 --- a/src/kudu/consensus/log.h +++ b/src/kudu/consensus/log.h @@ -300,7 +300,7 @@ class Log : public RefCountedThreadSafe<Log> { // Writes serialized contents of 'entry' to the log. Called inside // AppenderThread. - Status DoAppend(LogEntryBatch* entry_batch); + Status WriteBatch(LogEntryBatch* entry_batch); // Update footer_builder_ to reflect the log indexes seen in 'batch'. void UpdateFooterForBatch(LogEntryBatch* batch);
