This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new a85c330  Revert "Revert "KUDU-2356. Idle WALs should not consume 
significant memory""
a85c330 is described below

commit a85c3302abf935457e4d0f9d5326f0b612e0b614
Author: Todd Lipcon <[email protected]>
AuthorDate: Wed Mar 21 16:20:37 2018 -0700

    Revert "Revert "KUDU-2356. Idle WALs should not consume significant memory""
    
    Prior to this patch, each WAL writer would keep its temporary
    compress_buf_ sized based on the largest data that was ever written to
    it, even if the tablet was idle. Keeping the buffer around for a hot WAL
    probably has some small performance benefit, but when the WAL is idle
    enough to shut down its thread it should also clear its buffer to avoid
    occupying memory.
    
    Revert notes:
    The original patch was reverted because there were TSAN failures in this
    new behavior. This was caused by the thread-unsafe intermingling of the
    append thread going idle (and thus the active segment going idle) and a
    separate thread calling AllocateSegmentAndRollOver() (a method used in test
    only), which would swap out a new active segment.
    
    The race can be illustrated with the following sequence of events, with
    the append thread (P), allocation thread (L), and test thread (T).
    
    T: writes a batch to the log asynchronously, waking up the append thread
    P: handles the batch
    P: after a while, there's no work to be done
    P: sets the thread state to IDLE
    T: calls AllocateSegmentAndRollover()
    L: asynchronously allocates a new segment file
    T: synchronously waits for the allocation to finish
    T: sets the active segment to the newly allocated segment
    P: simultaneously calls active_segment->GoIdle()
    
    The last two steps here are what race with each other. The original
    change assumed that the append thread was the sole thread operating on
    the active segment, which in production is true, but in some tests, is
    not the case.
    
    Since the race only occurs 1) when the active segment is going idle, and
    2) a separate thread is trying to switch active segments, I've added a
    lock to protect the active segment going idle. There may be a more
    elegant way around this that's less ad-hoc, but the new lock is the
    smallest amount of concurrency control I could think of to fix the race.
    
    AllocateSegmentAndRollover() is used in tablet_copy-test-base.h and
    log-test-base.h. Without the new lock, tablet_copy_client-test would
    fail 16/100 times in TSAN mode. With it, it passed 300/300 times.
    log-test.h previously failed 106/300 times without it, and passed
    300/300 with it.
    
    Change-Id: I571a7cb3d310687e6e22bbd547e51a2ea81b8806
    Reviewed-on: http://gerrit.cloudera.org:8080/9747
    Tested-by: Todd Lipcon <[email protected]>
    Reviewed-by: Adar Dembo <[email protected]>
    Reviewed-on: http://gerrit.cloudera.org:8080/14554
    Tested-by: Kudu Jenkins
---
 src/kudu/consensus/log-test-base.h |  3 +--
 src/kudu/consensus/log-test.cc     | 10 ++++++++++
 src/kudu/consensus/log.cc          |  7 +++++++
 src/kudu/consensus/log.h           | 15 +++++++++++----
 src/kudu/consensus/log_util.cc     |  4 ++++
 src/kudu/consensus/log_util.h      |  7 +++++++
 6 files changed, 40 insertions(+), 6 deletions(-)

diff --git a/src/kudu/consensus/log-test-base.h 
b/src/kudu/consensus/log-test-base.h
index 89b39d0..f23b696 100644
--- a/src/kudu/consensus/log-test-base.h
+++ b/src/kudu/consensus/log-test-base.h
@@ -347,8 +347,7 @@ class LogTestBase : public KuduTest {
   }
 
   Status RollLog() {
-    RETURN_NOT_OK(log_->AsyncAllocateSegment());
-    return log_->RollOver();
+    return log_->AllocateSegmentAndRollOver();
   }
 
   std::string DumpSegmentsToString(const SegmentSequence& segments) {
diff --git a/src/kudu/consensus/log-test.cc b/src/kudu/consensus/log-test.cc
index e33a189..dfb4f62 100644
--- a/src/kudu/consensus/log-test.cc
+++ b/src/kudu/consensus/log-test.cc
@@ -51,7 +51,9 @@
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/async_util.h"
 #include "kudu/util/compression/compression.pb.h"
+#include "kudu/util/debug/sanitizer_scopes.h"
 #include "kudu/util/env.h"
+#include "kudu/util/faststring.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/random.h"
 #include "kudu/util/status.h"
@@ -1135,11 +1137,19 @@ TEST_F(LogTest, TestAutoStopIdleAppendThread) {
   ASSERT_EVENTUALLY([&]() {
       AppendNoOpsToLogSync(clock_, log_.get(), &opid, 2);
       ASSERT_TRUE(log_->append_thread_active_for_tests());
+      debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
+      ASSERT_GT(log_->active_segment_->compress_buf_.capacity(), 
faststring::kInitialCapacity);
     });
   // After some time, the append thread should shut itself down.
   ASSERT_EVENTUALLY([&]() {
       ASSERT_FALSE(log_->append_thread_active_for_tests());
     });
+
+  // The log should free its buffer once it is idle.
+  {
+    debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
+    ASSERT_EQ(faststring::kInitialCapacity, 
log_->active_segment_->compress_buf_.capacity());
+  }
 }
 
 // Test that Log::TotalSize() captures creation, addition, and deletion of log 
segments.
diff --git a/src/kudu/consensus/log.cc b/src/kudu/consensus/log.cc
index df3ac4b..27ec91e 100644
--- a/src/kudu/consensus/log.cc
+++ b/src/kudu/consensus/log.cc
@@ -292,6 +292,11 @@ void Log::AppendThread::Wake() {
   }
 }
 
+void Log::SetActiveSegmentIdle() {
+  std::lock_guard<rw_spinlock> l(segment_idle_lock_);
+  active_segment_->GoIdle();
+}
+
 bool Log::AppendThread::GoIdle() {
   // Inject latency at key points in this function for the purposes of tests.
   MAYBE_INJECT_RANDOM_LATENCY(FLAGS_log_inject_thread_lifecycle_latency_ms);
@@ -348,6 +353,7 @@ void Log::AppendThread::ProcessQueue() {
     }
     HandleBatches(std::move(entry_batches));
   }
+  log_->SetActiveSegmentIdle();
   VLOG_WITH_PREFIX(2) << "WAL Appender going idle";
 }
 
@@ -726,6 +732,7 @@ void Log::UpdateFooterForBatch(LogEntryBatch* batch) {
 }
 
 Status Log::AllocateSegmentAndRollOver() {
+  std::lock_guard<rw_spinlock> l(segment_idle_lock_);
   RETURN_NOT_OK(AsyncAllocateSegment());
   return RollOver();
 }
diff --git a/src/kudu/consensus/log.h b/src/kudu/consensus/log.h
index 1df3a20..269237b 100644
--- a/src/kudu/consensus/log.h
+++ b/src/kudu/consensus/log.h
@@ -152,10 +152,6 @@ class Log : public RefCountedThreadSafe<Log> {
     max_segment_size_ = max_segment_size;
   }
 
-  void DisableAsyncAllocationForTests() {
-    options_.async_preallocate_segments = false;
-  }
-
   void DisableSync() {
     sync_disabled_ = true;
   }
@@ -225,6 +221,8 @@ class Log : public RefCountedThreadSafe<Log> {
   // Forces the Log to allocate a new segment and roll over.
   // This can be used to make sure all entries appended up to this point are
   // available in closed, readable segments.
+  //
+  // This is not thread-safe. Used in test only.
   Status AllocateSegmentAndRollOver();
 
   // Returns this Log's FsManager.
@@ -244,6 +242,7 @@ class Log : public RefCountedThreadSafe<Log> {
   FRIEND_TEST(LogTestOptionalCompression, TestMultipleEntriesInABatch);
   FRIEND_TEST(LogTestOptionalCompression, TestReadLogWithReplacedReplicates);
   FRIEND_TEST(LogTest, TestWriteAndReadToAndFromInProgressSegment);
+  FRIEND_TEST(LogTest, TestAutoStopIdleAppendThread);
 
   class AppendThread;
 
@@ -271,6 +270,9 @@ class Log : public RefCountedThreadSafe<Log> {
   // Make segments roll over.
   Status RollOver();
 
+  // Sets that the current active segment is idle.
+  void SetActiveSegmentIdle();
+
   static Status CreateBatchFromPB(LogEntryTypePB type,
                                   std::unique_ptr<LogEntryBatchPB> 
entry_batch_pb,
                                   std::unique_ptr<LogEntryBatch>* entry_batch);
@@ -399,6 +401,11 @@ class Log : public RefCountedThreadSafe<Log> {
   // The status of the most recent log-allocation action.
   Promise<Status> allocation_status_;
 
+  // Protects the active segment as it is going idle, in case other threads
+  // attempt to switch segments concurrently. This shouldn't happen in
+  // production, but may happen if AllocateSegmentAndRollOver() is called.
+  mutable rw_spinlock segment_idle_lock_;
+
   // Read-write lock to protect 'allocation_state_'.
   mutable RWMutex allocation_lock_;
   SegmentAllocationState allocation_state_;
diff --git a/src/kudu/consensus/log_util.cc b/src/kudu/consensus/log_util.cc
index 27c143b..43477db 100644
--- a/src/kudu/consensus/log_util.cc
+++ b/src/kudu/consensus/log_util.cc
@@ -844,6 +844,10 @@ Status WritableLogSegment::WriteEntryBatch(const Slice& 
data,
   return Status::OK();
 }
 
+void WritableLogSegment::GoIdle() {
+  compress_buf_.clear();
+  compress_buf_.shrink_to_fit();
+}
 
 unique_ptr<LogEntryBatchPB> CreateBatchFromAllocatedOperations(
     const vector<consensus::ReplicateRefPtr>& msgs) {
diff --git a/src/kudu/consensus/log_util.h b/src/kudu/consensus/log_util.h
index 9d504f3..8edec1d 100644
--- a/src/kudu/consensus/log_util.h
+++ b/src/kudu/consensus/log_util.h
@@ -403,6 +403,8 @@ class ReadableLogSegment : public 
RefCountedThreadSafe<ReadableLogSegment> {
 };
 
 // A writable log segment where state data is stored.
+//
+// This class is not thread-safe.
 class WritableLogSegment {
  public:
   WritableLogSegment(std::string path,
@@ -434,6 +436,10 @@ class WritableLogSegment {
     return writable_file_->Sync();
   }
 
+  // Indicate that the segment has not been written for some period of time.
+  // In this case, temporary buffers should be freed up.
+  void GoIdle();
+
   // Returns true if the segment header has already been written to disk.
   bool IsHeaderWritten() const {
     return is_header_written_;
@@ -467,6 +473,7 @@ class WritableLogSegment {
   }
 
  private:
+  FRIEND_TEST(LogTest, TestAutoStopIdleAppendThread);
 
   const std::shared_ptr<WritableFile>& writable_file() const {
     return writable_file_;

Reply via email to