This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 25d7173  [fuzz-itest] add WAL GCs
25d7173 is described below

commit 25d71738c05972eba1383a9b93883a7f5617b7c9
Author: Andrew Wong <[email protected]>
AuthorDate: Tue Aug 17 22:07:07 2021 -0700

    [fuzz-itest] add WAL GCs
    
    In the past we've had bugs that only surface when WALs were GCed.
    Additionally, implementing certain changes to the tablet key-value store
    typically must also account for being able to correctly bootstrap from
    the WALs. To facilitate testing such changes, this patch adds a WAL GC
    op to the fuzz test.
    
    This also adds a bytes equivalent of the --log_segment_size_mb flag, to
    speed up the generation of WAL segments.
    
    I had to add some error handling to allow writes to collide with the
    destruction of the server.
    
    Change-Id: Iabd9fe33f9fa21f62b513d7607bf62b6fce6bb59
    Reviewed-on: http://gerrit.cloudera.org:8080/17787
    Tested-by: Andrew Wong <[email protected]>
    Reviewed-by: Alexey Serbin <[email protected]>
---
 src/kudu/consensus/consensus_queue.cc    |  5 +++++
 src/kudu/consensus/log.cc                | 12 +++++++++++-
 src/kudu/integration-tests/fuzz-itest.cc | 21 ++++++++++++++++++++-
 3 files changed, 36 insertions(+), 2 deletions(-)

diff --git a/src/kudu/consensus/consensus_queue.cc 
b/src/kudu/consensus/consensus_queue.cc
index eecaecb..1ce9da0 100644
--- a/src/kudu/consensus/consensus_queue.cc
+++ b/src/kudu/consensus/consensus_queue.cc
@@ -366,6 +366,11 @@ void 
PeerMessageQueue::CheckPeersInActiveConfigIfLeaderUnlocked() const {
 void PeerMessageQueue::LocalPeerAppendFinished(const OpId& id,
                                                const StatusCallback& callback,
                                                const Status& status) {
+  // If the server is being destructed, the log may be unavailable.
+  if (PREDICT_FALSE(status.IsServiceUnavailable())) {
+    callback(Status::OK());
+    return;
+  }
   CHECK_OK(status);
 
   // Fake an RPC response from the local peer.
diff --git a/src/kudu/consensus/log.cc b/src/kudu/consensus/log.cc
index e94b36c..ea24555 100644
--- a/src/kudu/consensus/log.cc
+++ b/src/kudu/consensus/log.cc
@@ -23,6 +23,7 @@
 #include <memory>
 #include <mutex>
 #include <ostream>
+#include <type_traits>
 #include <utility>
 
 #include <boost/range/adaptor/reversed.hpp>
@@ -142,6 +143,12 @@ DEFINE_double(log_inject_io_error_on_preallocate_fraction, 
0.0,
 TAG_FLAG(log_inject_io_error_on_preallocate_fraction, unsafe);
 TAG_FLAG(log_inject_io_error_on_preallocate_fraction, runtime);
 
+DEFINE_int32(log_segment_size_bytes_for_tests, 0,
+             "The size for log segments, in bytes. This takes precedence over "
+             "--log_segment_size_mb in cases where significantly smaller 
segments are desired. "
+             "If non-positive, --log_segment_size_mb is honored.");
+TAG_FLAG(log_segment_size_bytes_for_tests, unsafe);
+
 // Other flags.
 // -----------------------------
 DEFINE_int64(fs_wal_dir_reserved_bytes, -1,
@@ -435,7 +442,10 @@ SegmentAllocator::SegmentAllocator(const LogOptions* opts,
                                    uint32_t schema_version)
     : opts_(opts),
       ctx_(ctx),
-      max_segment_size_(opts_->segment_size_mb * 1024 * 1024),
+      max_segment_size_(
+          FLAGS_log_segment_size_bytes_for_tests > 0
+              ? FLAGS_log_segment_size_bytes_for_tests
+              : opts_->segment_size_mb * 1024 * 1024),
       schema_(std::move(schema)),
       schema_version_(schema_version),
       sync_disabled_(false) {}
diff --git a/src/kudu/integration-tests/fuzz-itest.cc 
b/src/kudu/integration-tests/fuzz-itest.cc
index 225c0c0..9eaeee4 100644
--- a/src/kudu/integration-tests/fuzz-itest.cc
+++ b/src/kudu/integration-tests/fuzz-itest.cc
@@ -78,6 +78,7 @@
 
 DEFINE_int32(keyspace_size, 5,  "number of distinct primary keys to test 
with");
 DEFINE_int32(max_open_txns, 5,  "maximum number of open transactions to test 
with");
+DECLARE_int32(log_segment_size_bytes_for_tests);
 DECLARE_bool(enable_txn_partition_lock);
 DECLARE_bool(enable_maintenance_manager);
 DECLARE_bool(scanner_allow_snapshot_scans_with_logical_timestamps);
@@ -142,6 +143,7 @@ enum TestOpType {
   TEST_BEGIN_TXN,
   TEST_COMMIT_TXN,
   TEST_ABORT_TXN,
+  TEST_GC_LOG,
   TEST_NUM_OP_TYPES // max value for enum
 };
 
@@ -169,12 +171,20 @@ const char* TestOpType_names[] = {
   "TEST_BEGIN_TXN",
   "TEST_COMMIT_TXN",
   "TEST_ABORT_TXN",
+  "TEST_GC_LOG",
 };
 constexpr const int kNoTxnId = -1;
 // Identical to kNoTxnId but more generic-sounding for ops that don't use
 // transaction IDs.
 constexpr const int kNoVal = -1;
 
+// To facilitate WAL GC, we'll make our segment size very small.
+//
+// TODO(awong): implement a rollover op. It's a bit tricky today, given how we
+// allocate and append to segments in background threads, which may both race
+// with a naive rollover implementation.
+constexpr const int kLowSegmentSizeBytes = 1024;
+
 // An operation in a fuzz-test sequence.
 struct TestOp {
   // NOLINTNEXTLINE(google-explicit-constructor)
@@ -207,6 +217,7 @@ struct TestOp {
       case TEST_MAJOR_COMPACT_DELTAS:
       case TEST_MINOR_COMPACT_DELTAS:
       case TEST_RESTART_TS:
+      case TEST_GC_LOG:
         return strings::Substitute("{$0}", TestOpType_names[type]);
       case TEST_FLUSH_OPS:
       case TEST_UPSERT:
@@ -282,7 +293,8 @@ const vector<TestOpType> kAllOps {TEST_INSERT,
                                   TEST_DIFF_SCAN,
                                   TEST_BEGIN_TXN,
                                   TEST_COMMIT_TXN,
-                                  TEST_ABORT_TXN};
+                                  TEST_ABORT_TXN,
+                                  TEST_GC_LOG};
 
 // Ops that focus on hammering workloads in which rows come in and out of
 // existence.
@@ -316,6 +328,7 @@ class FuzzTest : public KuduTest {
     FLAGS_enable_maintenance_manager = false;
     FLAGS_use_hybrid_clock = false;
     FLAGS_scanner_allow_snapshot_scans_with_logical_timestamps = true;
+    FLAGS_log_segment_size_bytes_for_tests = kLowSegmentSizeBytes;
     // The scenarios of this test do not assume using the standard control path
     // for txn-enabled write operations.
     FLAGS_tserver_txn_write_op_handling_enabled = false;
@@ -1185,6 +1198,9 @@ void GenerateTestCase(vector<TestOp>* ops, int len, 
TestOpSets sets = ALL) {
         op_timestamps++;
         break;
       }
+      case TEST_GC_LOG:
+        ops->emplace_back(TEST_GC_LOG);
+        break;
       default:
         LOG(FATAL) << "Invalid op type: " << r;
     }
@@ -1486,6 +1502,9 @@ void FuzzTest::RunFuzzCase(const vector<TestOp>& test_ops,
         txn_sessions_.erase(txn_id);
         break;
       }
+      case TEST_GC_LOG:
+        ASSERT_OK(tablet_replica_->RunLogGC());
+        break;
       default:
         LOG(FATAL) << test_op.type;
     }

Reply via email to