This is an automated email from the ASF dual-hosted git repository. awong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 930fe5bf8f02f97061bddcfa4199ab5a25f15419 Author: Andrew Wong <[email protected]> AuthorDate: Tue Feb 23 17:28:31 2021 -0800 test: add more natural test for KUDU-2233 I have a patch in-flight that touches an area of the code around where we expect the infamous KUDU-2233 crash. Before merging it, I'd like to ensure the graceful handling of this corruption is unaffected, especially in cases where data has previously been corrupted and we've just upgraded to a newer version of Kudu. This patch adds such a test case, where data is corrupted but does not result in a crash, and the tserver is restarted with bits that handle corruption gracefully. In doing so, this patch also adds an off switch to all of the guardrails we installed for KUDU-2233. Change-Id: Icac3ad0a1b6bb9b17d5b6a901dc5bba79c0840fa Reviewed-on: http://gerrit.cloudera.org:8080/17114 Reviewed-by: Alexey Serbin <[email protected]> Tested-by: Andrew Wong <[email protected]> --- src/kudu/integration-tests/test_workload.cc | 6 +- src/kudu/integration-tests/test_workload.h | 6 +- .../timestamp_advancement-itest.cc | 131 +++++++++++++++++++-- src/kudu/tablet/compaction.cc | 35 ++++-- src/kudu/tablet/tablet.cc | 7 +- src/kudu/tablet/tablet_bootstrap.cc | 4 +- src/kudu/tablet/tablet_replica.cc | 6 +- 7 files changed, 173 insertions(+), 22 deletions(-) diff --git a/src/kudu/integration-tests/test_workload.cc b/src/kudu/integration-tests/test_workload.cc index 5cc6bdd..f646a70 100644 --- a/src/kudu/integration-tests/test_workload.cc +++ b/src/kudu/integration-tests/test_workload.cc @@ -161,7 +161,8 @@ void TestWorkload::WriteThread() { unique_ptr<KuduInsert> insert(table->NewInsert()); KuduPartialRow* row = insert->mutable_row(); int32_t key; - if (write_pattern_ == INSERT_SEQUENTIAL_ROWS) { + if (write_pattern_ == INSERT_SEQUENTIAL_ROWS || + write_pattern_ == INSERT_SEQUENTIAL_ROWS_WITH_DELETE) { key = sequential_key_gen_.Increment(); } else { key = rng_.Next(); @@ -195,7 +196,8 @@ void TestWorkload::WriteThread() { SleepFor(MonoDelta::FromMilliseconds(write_interval_millis_)); } // Write delete row to cluster. - if (write_pattern_ == INSERT_RANDOM_ROWS_WITH_DELETE) { + if (write_pattern_ == INSERT_RANDOM_ROWS_WITH_DELETE || + write_pattern_ == INSERT_SEQUENTIAL_ROWS_WITH_DELETE) { for (auto key : keys) { unique_ptr<KuduDelete> op(table->NewDelete()); KuduPartialRow* row = op->mutable_row(); diff --git a/src/kudu/integration-tests/test_workload.h b/src/kudu/integration-tests/test_workload.h index f93cb58..fb86b76 100644 --- a/src/kudu/integration-tests/test_workload.h +++ b/src/kudu/integration-tests/test_workload.h @@ -212,7 +212,10 @@ class TestWorkload { // Insert sequential rows. // This causes flushes but no compactions. - INSERT_SEQUENTIAL_ROWS + INSERT_SEQUENTIAL_ROWS, + + // Insert sequential rows, then delete them. + INSERT_SEQUENTIAL_ROWS_WITH_DELETE, }; void set_write_pattern(WritePattern pattern) { @@ -225,6 +228,7 @@ class TestWorkload { case INSERT_RANDOM_ROWS_WITH_DELETE: case UPDATE_ONE_ROW: case INSERT_SEQUENTIAL_ROWS: + case INSERT_SEQUENTIAL_ROWS_WITH_DELETE: set_already_present_allowed(false); break; default: LOG(FATAL) << "Unsupported WritePattern."; diff --git a/src/kudu/integration-tests/timestamp_advancement-itest.cc b/src/kudu/integration-tests/timestamp_advancement-itest.cc index cc161cc..3e782d9 100644 --- a/src/kudu/integration-tests/timestamp_advancement-itest.cc +++ b/src/kudu/integration-tests/timestamp_advancement-itest.cc @@ -52,6 +52,7 @@ #include "kudu/mini-cluster/internal_mini_cluster.h" #include "kudu/mini-cluster/mini_cluster.h" #include "kudu/rpc/rpc_controller.h" +#include "kudu/tablet/metadata.pb.h" #include "kudu/tablet/mvcc.h" #include "kudu/tablet/tablet.h" #include "kudu/tablet/tablet_replica.h" @@ -68,13 +69,22 @@ #include "kudu/util/test_util.h" DECLARE_bool(enable_maintenance_manager); +DECLARE_bool(compaction_force_small_rowset_tradeoff); +DECLARE_bool(prevent_kudu_2233_corruption); DECLARE_bool(log_preallocate_segments); DECLARE_bool(log_async_preallocate_segments); DECLARE_bool(raft_enable_pre_election); +DECLARE_double(compaction_minimum_improvement); DECLARE_double(leader_failure_max_missed_heartbeat_periods); DECLARE_int32(consensus_inject_latency_ms_in_notifications); DECLARE_int32(heartbeat_interval_ms); DECLARE_int32(raft_heartbeat_interval_ms); +DECLARE_int32(tablet_compaction_budget_mb); +DECLARE_int32(tablet_history_max_age_sec); + +#ifndef NDEBUG +DECLARE_bool(dcheck_on_kudu_2233_invariants); +#endif namespace kudu { @@ -106,7 +116,10 @@ class TimestampAdvancementITest : public MiniClusterITestBase { // Sets up a cluster and returns the tablet replica on 'ts' that has written // to its WAL. 'replica' will write further messages to a new WAL segment. - void SetupClusterWithWritesInWAL(int ts, scoped_refptr<TabletReplica>* replica) { + // If 'delete_and_reinsert' is set to true, rows will be flushed, deleted, + // reinserted, and flushed again. + void SetupClusterWithWritesInWAL(int ts, bool delete_and_reinsert, + scoped_refptr<TabletReplica>* replica) { // We're going to manually trigger maintenance ops, so disable maintenance op // scheduling. FLAGS_enable_maintenance_manager = false; @@ -124,26 +137,47 @@ class TimestampAdvancementITest : public MiniClusterITestBase { // Set a low batch size so we have finer-grained control over flushing of // the WAL. Too large, and the WAL may end up flushing in the background. write.set_write_batch_size(1); + // Certain corruptions only happen when there are deletes and reinserts, so + // perform those ops here and below. + if (delete_and_reinsert) { + write.set_write_pattern(TestWorkload::INSERT_SEQUENTIAL_ROWS_WITH_DELETE); + } write.Setup(); write.Start(); - while (write.rows_inserted() < 10) { + table_name_ = write.table_name(); + while (write.batches_completed() < 10) { SleepFor(MonoDelta::FromMilliseconds(1)); } write.StopAndJoin(); - + auto batches_completed = write.batches_completed(); + scoped_refptr<TabletReplica> tablet_replica = tablet_replica_on_ts(ts); + if (delete_and_reinsert) { + // Flush so we get a disk rowset with our deleted rows, before inserting + // to a new rowset. + ASSERT_OK(tablet_replica->tablet()->Flush()); + TestWorkload write(cluster_.get()); + write.set_write_batch_size(1); + write.set_table_name(table_name_); + write.set_write_pattern(TestWorkload::INSERT_SEQUENTIAL_ROWS); + write.Setup(); + write.Start(); + while (write.batches_completed() < 10) { + SleepFor(MonoDelta::FromMilliseconds(1)); + } + write.StopAndJoin(); + batches_completed += write.batches_completed(); + } // Ensure that the replicas eventually get to a point where all of them // have all the rows. This will allow us to GC the WAL, as they will not // need to retain them if fully-replicated. - scoped_refptr<TabletReplica> tablet_replica = tablet_replica_on_ts(ts); ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(30), - ts_map_, tablet_replica->tablet_id(), write.batches_completed())); + ts_map_, tablet_replica->tablet_id(), batches_completed)); // Flush the current log batch and roll over to get a fresh WAL segment. ASSERT_OK(tablet_replica->log()->WaitUntilAllFlushed()); ASSERT_OK(tablet_replica->log()->AllocateSegmentAndRollOverForTests()); - - // Also flush the MRS so we're free to GC the WAL segment we just wrote. ASSERT_OK(tablet_replica->tablet()->Flush()); + *replica = std::move(tablet_replica); } @@ -244,8 +278,87 @@ class TimestampAdvancementITest : public MiniClusterITestBase { TServerDetails* ts_details = FindOrDie(ts_map_, tserver->uuid()); return WaitUntilTabletRunning(ts_details, tablet_id, kTimeout); } + protected: + string table_name_; }; +// Simulate being on an older version of Kudu and set up the conditions for +// KUDU-2233, where there are no writes in the WAL, and perform a compaction. +// Then, "upgrade" to a more recent version that handles KUDU-2233 corruption, +// and ensure that compacting corrupted data is handled gracefully. +TEST_F(TimestampAdvancementITest, TestUpgradeFromOlderCorruptedData) { + FLAGS_prevent_kudu_2233_corruption = false; + // Set a low Raft heartbeat interval so we can inject churn elections. + FLAGS_raft_heartbeat_interval_ms = 100; + + // This test case starts out with two overlapping rowsets, and to witness + // corruption in a way that causes compaction failures post-upgrade, we'll + // need to KUDU-2233-compact just one of them. Do so by reducing the + // compaction budget, and allowing small minimum improvement scores. + FLAGS_tablet_compaction_budget_mb = 1; + FLAGS_compaction_force_small_rowset_tradeoff = true; + FLAGS_compaction_minimum_improvement = -1; + + // Setup a cluster with some writes and a new WAL segment. + scoped_refptr<TabletReplica> replica; + NO_FATALS(SetupClusterWithWritesInWAL(kTserver, /*delete_and_reinsert=*/true, &replica)); + + MiniTabletServer* ts = tserver(kTserver); + const string tablet_id = replica->tablet_id(); + + // Now that we're on a new WAL segment, inject latency to consensus so we + // trigger elections, and wait for some terms to pass. + FLAGS_raft_enable_pre_election = false; + FLAGS_leader_failure_max_missed_heartbeat_periods = 1.0; + FLAGS_consensus_inject_latency_ms_in_notifications = 100; + const int64_t kNumExtraTerms = 10; + int64_t initial_raft_term = replica->consensus()->CurrentTerm(); + int64_t raft_term = initial_raft_term; + while (raft_term < initial_raft_term + kNumExtraTerms) { + SleepFor(MonoDelta::FromMilliseconds(10)); + raft_term = replica->consensus()->CurrentTerm(); + } + + // Reduce election churn so we can achieve a stable quorum and get to a point + // where we can GC our logs. Note: we won't GC if there are replicas that + // need to be caught up. + FLAGS_consensus_inject_latency_ms_in_notifications = 0; + NO_FATALS(GCUntilNoWritesInWAL(ts, replica)); + replica.reset(); + ASSERT_OK(ShutdownAllNodesAndRestartTserver(ts, tablet_id)); + + replica = tablet_replica_on_ts(kTserver); + Timestamp cleantime = replica->tablet()->mvcc_manager()->GetCleanTimestamp(); + ASSERT_EQ(Timestamp::kInitialTimestamp, cleantime); + + // Perform a compaction with the low budget so only a single rowset is + // "compacted" (yes, we can compact a single rowset, no it's not natural to + // do so, but it's what we need here). This will ensure that only one of the + // two version of a given row is corrupted, which should allow us to perform + // a compaction later that fails. + ASSERT_OK(replica->tablet()->Compact(tablet::Tablet::COMPACT_NO_FLAGS)); + + // Now restart the server with more normal flags. Despite disallowing further + // corruptions, when we compact, we should still be left with an error, as + // our data has been corrupted on an simulated older version. + replica.reset(); + FLAGS_prevent_kudu_2233_corruption = true; + FLAGS_tablet_compaction_budget_mb = 128; + // Don't crash in the event of broken invariants though, since we're working + // with corrupted data here. +#ifndef NDEBUG + FLAGS_dcheck_on_kudu_2233_invariants = false; +#endif + ASSERT_OK(ShutdownAllNodesAndRestartTserver(ts, tablet_id)); + + replica = tablet_replica_on_ts(kTserver); + Status s = replica->tablet()->Compact(tablet::Tablet::COMPACT_NO_FLAGS); + ASSERT_TRUE(s.IsCorruption()); + ASSERT_EVENTUALLY([&] { + ASSERT_EQ(tablet::TabletStatePB::FAILED, replica->state()); + }); +} + // Test that bootstrapping a Raft no-op from the WAL will advance the replica's // MVCC clean time timestamps. TEST_F(TimestampAdvancementITest, TestNoOpAdvancesMvccSafeTimeOnBootstrap) { @@ -254,7 +367,7 @@ TEST_F(TimestampAdvancementITest, TestNoOpAdvancesMvccSafeTimeOnBootstrap) { // Setup a cluster with some writes and a new WAL segment. scoped_refptr<TabletReplica> replica; - NO_FATALS(SetupClusterWithWritesInWAL(kTserver, &replica)); + NO_FATALS(SetupClusterWithWritesInWAL(kTserver, /*delete_and_reinsert=*/false, &replica)); MiniTabletServer* ts = tserver(kTserver); const string tablet_id = replica->tablet_id(); @@ -296,7 +409,7 @@ TEST_F(TimestampAdvancementITest, TestNoOpAdvancesMvccSafeTimeOnBootstrap) { // configs, and the tablet cannot join a quorum. TEST_F(TimestampAdvancementITest, Kudu2463Test) { scoped_refptr<TabletReplica> replica; - NO_FATALS(SetupClusterWithWritesInWAL(kTserver, &replica)); + NO_FATALS(SetupClusterWithWritesInWAL(kTserver, /*delete_and_reinsert=*/false, &replica)); MiniTabletServer* ts = tserver(kTserver); const string tablet_id = replica->tablet_id(); diff --git a/src/kudu/tablet/compaction.cc b/src/kudu/tablet/compaction.cc index a4a8258..07c7e28 100644 --- a/src/kudu/tablet/compaction.cc +++ b/src/kudu/tablet/compaction.cc @@ -74,10 +74,19 @@ using std::unordered_set; using std::vector; using strings::Substitute; +#ifndef NDEBUG +DEFINE_bool(dcheck_on_kudu_2233_invariants, true, + "Whether to DCHECK on broken invariants caused by KUDU-2233. " + "Used in tests only when NDEBUG is not defined!"); +TAG_FLAG(dcheck_on_kudu_2233_invariants, unsafe); +TAG_FLAG(dcheck_on_kudu_2233_invariants, hidden); +#endif + DEFINE_double(tablet_inject_kudu_2233, 0, "Fraction of the time that compactions that merge the history " "of a single row spread across multiple rowsets will return " "with a corruption status"); +TAG_FLAG(tablet_inject_kudu_2233, unsafe); TAG_FLAG(tablet_inject_kudu_2233, hidden); namespace kudu { @@ -300,15 +309,23 @@ int CompareDuplicatedRows(const CompactionInputRow& left, AdvanceToLastInList(&right_last); if (left.redo_head == nullptr) { +#ifndef NDEBUG // left must still be alive, meaning right must have at least a DELETE redo. - DCHECK(right_last != nullptr); - DCHECK(right_last->changelist().is_delete()); + if (PREDICT_TRUE(FLAGS_dcheck_on_kudu_2233_invariants)) { + DCHECK(right_last != nullptr); + DCHECK(right_last->changelist().is_delete()); + } +#endif return 1; } if (right.redo_head == nullptr) { +#ifndef NDEBUG // right must still be alive, meaning left must have at least a DELETE redo. - DCHECK(left_last != nullptr); - DCHECK(left_last->changelist().is_delete()); + if (PREDICT_TRUE(FLAGS_dcheck_on_kudu_2233_invariants)) { + DCHECK(left_last != nullptr); + DCHECK(left_last->changelist().is_delete()); + } +#endif return -1; } @@ -808,9 +825,13 @@ Status MergeDuplicatedRowHistory(const string& tablet_id, &previous_ghost->row)); // We should be left with only one redo, the delete. - DCHECK(pv_delete_redo != nullptr); - DCHECK(pv_delete_redo->changelist().is_delete()); - DCHECK(pv_delete_redo->next() == nullptr); +#ifndef NDEBUG + if (PREDICT_TRUE(FLAGS_dcheck_on_kudu_2233_invariants)) { + DCHECK(pv_delete_redo != nullptr); + DCHECK(pv_delete_redo->changelist().is_delete()); + DCHECK(pv_delete_redo->next() == nullptr); + } +#endif if (PREDICT_FALSE( pv_delete_redo == nullptr || !pv_delete_redo->changelist().is_delete() || diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc index b2a79ef..6945594 100644 --- a/src/kudu/tablet/tablet.cc +++ b/src/kudu/tablet/tablet.cc @@ -97,6 +97,10 @@ #include "kudu/util/trace.h" #include "kudu/util/url-coding.h" +DEFINE_bool(prevent_kudu_2233_corruption, true, + "Whether or not to prevent KUDU-2233 corruptions. Used for testing only!"); +TAG_FLAG(prevent_kudu_2233_corruption, unsafe); + DEFINE_int32(tablet_compaction_budget_mb, 128, "Budget for a single compaction"); TAG_FLAG(tablet_compaction_budget_mb, experimental); @@ -2136,7 +2140,8 @@ Status Tablet::Compact(CompactFlags flags) { void Tablet::UpdateCompactionStats(MaintenanceOpStats* stats) { - if (mvcc_.GetCleanTimestamp() == Timestamp::kInitialTimestamp) { + if (mvcc_.GetCleanTimestamp() == Timestamp::kInitialTimestamp && + PREDICT_TRUE(FLAGS_prevent_kudu_2233_corruption)) { KLOG_EVERY_N_SECS(WARNING, 30) << LogPrefix() << "Can't schedule compaction. Clean time has " << "not been advanced past its initial value."; stats->set_runnable(false); diff --git a/src/kudu/tablet/tablet_bootstrap.cc b/src/kudu/tablet/tablet_bootstrap.cc index 55baf55..6795b5a 100644 --- a/src/kudu/tablet/tablet_bootstrap.cc +++ b/src/kudu/tablet/tablet_bootstrap.cc @@ -93,6 +93,7 @@ #include "kudu/util/scoped_cleanup.h" #include "kudu/util/stopwatch.h" +DECLARE_bool(prevent_kudu_2233_corruption); DECLARE_int32(group_commit_queue_size_bytes); DEFINE_double(fault_crash_during_log_replay, 0.0, @@ -1151,7 +1152,8 @@ Status TabletBootstrap::HandleEntryPair(const IOContext* io_context, LogEntryPB* default: break; } - if (!timestamp_assigned_in_opid_order) { + if (!timestamp_assigned_in_opid_order || + PREDICT_FALSE(!FLAGS_prevent_kudu_2233_corruption)) { return Status::OK(); } diff --git a/src/kudu/tablet/tablet_replica.cc b/src/kudu/tablet/tablet_replica.cc index 7f810cb..d1086a5 100644 --- a/src/kudu/tablet/tablet_replica.cc +++ b/src/kudu/tablet/tablet_replica.cc @@ -27,6 +27,7 @@ #include <vector> #include <boost/optional/optional.hpp> +#include <gflags/gflags_declare.h> #include <glog/logging.h> #include "kudu/common/common.pb.h" @@ -104,6 +105,8 @@ METRIC_DEFINE_gauge_uint64(tablet, live_row_count, "Tablet Live Row Count", "Number of live rows in this tablet, excludes deleted rows.", kudu::MetricLevel::kInfo); +DECLARE_bool(prevent_kudu_2233_corruption); + using kudu::consensus::ALTER_SCHEMA_OP; using kudu::consensus::ConsensusBootstrapInfo; using kudu::consensus::ConsensusOptions; @@ -800,7 +803,8 @@ void TabletReplica::FinishConsensusOnlyRound(ConsensusRound* round) { // a no-op to assert a new leadership term, in which case it would be in order. if (op_type == consensus::NO_OP && (!replicate_msg->noop_request().has_timestamp_in_opid_order() || - replicate_msg->noop_request().timestamp_in_opid_order())) { + replicate_msg->noop_request().timestamp_in_opid_order()) && + PREDICT_TRUE(FLAGS_prevent_kudu_2233_corruption)) { DCHECK(replicate_msg->has_noop_request()); int64_t ts = replicate_msg->timestamp(); // We are guaranteed that the prepare pool token is running now because
