Add Reinserts to tablet_history_gc-itest This adds Reinserts as a new operation to tablet_history_gc-itest. Reinserts happen on previously deleted rows and, like deletes, have a change to happen mid-compaction.
This test used to fail frequently before the reinsert patches are in and now passes. Ran 500 loops of tablet_history_gc-itest in dist-test in ASAN with: - KUDU_ALLOW_SLOW_TESTS=1 - --stress_cpu_threads=4 All tests passed. Result: http://dist-test.cloudera.org//job?job_id=david.alves.1480101254.2208 Change-Id: I6b774b14d79bd7c0c0bf62b585bd70d6d975feaa Reviewed-on: http://gerrit.cloudera.org:8080/4997 Tested-by: Kudu Jenkins Reviewed-by: Alexey Serbin <[email protected]> Reviewed-by: Mike Percy <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/6f423302 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/6f423302 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/6f423302 Branch: refs/heads/master Commit: 6f423302d18460355f6b6111ba283a54ca2d9faa Parents: 10f2ce2 Author: David Alves <[email protected]> Authored: Mon Nov 7 21:15:45 2016 -0800 Committer: David Ribeiro Alves <[email protected]> Committed: Wed Nov 30 10:14:48 2016 +0000 ---------------------------------------------------------------------- .../tablet_history_gc-itest.cc | 158 ++++++++++++++----- 1 file changed, 121 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/6f423302/src/kudu/integration-tests/tablet_history_gc-itest.cc ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/tablet_history_gc-itest.cc b/src/kudu/integration-tests/tablet_history_gc-itest.cc index e9503c6..0bce2c3 100644 --- a/src/kudu/integration-tests/tablet_history_gc-itest.cc +++ b/src/kudu/integration-tests/tablet_history_gc-itest.cc @@ -17,6 +17,7 @@ #include <map> #include <memory> +#include <unordered_set> #include <utility> #include "kudu/client/client-test-util.h" @@ -118,6 +119,7 @@ class RandomizedTabletHistoryGcITest : public TabletHistoryGcITest { kInsert, kUpdate, kDelete, + kReinsert, kFlush, kMergeCompaction, kRedoDeltaCompaction, @@ -277,11 +279,11 @@ class RandomizedTabletHistoryGcITest : public TabletHistoryGcITest { } MaterializedTestSnapshots snapshots_; + std::unordered_set<int32_t> deleted_rows_; ScannerMap scanners_; HybridClock* clock_ = nullptr; Timestamp latest_snapshot_ts_; - int32_t rows_inserted_ = 0; int cur_round_ = 0; }; @@ -308,6 +310,13 @@ class ReupdateHooks : public Tablet::FlushCompactCommonHooks { CHECK_OK(row.SetInt32(0, row_key)); CHECK_OK(writer.Delete(row)); } + for (const MaterializedTestRow& reinsert : reinserts_) { + KuduPartialRow row(&client_schema_); + CHECK_OK(row.SetInt32(0, reinsert.key)); + CHECK_OK(row.SetInt32(1, reinsert.int_val)); + CHECK_OK(row.SetStringCopy(2, reinsert.string_val)); + CHECK_OK(writer.Insert(row)); + } return Status::OK(); } @@ -319,26 +328,29 @@ class ReupdateHooks : public Tablet::FlushCompactCommonHooks { deletes_ = std::move(deletes); } + void set_reinserts(vector<MaterializedTestRow> reinserts) { + reinserts_ = std::move(reinserts); + } + void Reset() { updates_.clear(); deletes_.clear(); + reinserts_.clear(); } private: - enum Action { - kUpdate, - kDelete - }; - Tablet* const tablet_; const Schema client_schema_; vector<MaterializedTestRow> updates_; vector<int32_t> deletes_; + vector<MaterializedTestRow> reinserts_; }; // Randomized test that attempts to test many arbitrary history GC use cases. TEST_F(RandomizedTabletHistoryGcITest, TestRandomHistoryGCWorkload) { + const int kSessionTimeoutMillis = 20000; + OverrideFlagForSlowTests("test_num_rounds", Substitute("$0", FLAGS_test_num_rounds * 5)); @@ -376,6 +388,8 @@ TEST_F(RandomizedTabletHistoryGcITest, TestRandomHistoryGCWorkload) { // Save an empty snapshot at the "beginning of time"; NO_FATALS(SaveSnapshot(MaterializedTestTable(), Timestamp(0))); + int32_t rows_inserted = 0; + for (cur_round_ = 0; cur_round_ < FLAGS_test_num_rounds; cur_round_++) { VLOG(1) << "Starting round " << cur_round_; NO_FATALS(VerifyScannersForRound(cur_round_)); @@ -383,19 +397,17 @@ TEST_F(RandomizedTabletHistoryGcITest, TestRandomHistoryGCWorkload) { int action = random.Uniform(kNumActions); switch (action) { case kInsert: { - // TODO: Allow for reinsert onto deleted rows after KUDU-237 has been - // implemented. int32_t num_rows_to_insert = random.Uniform(1000); VLOG(1) << "Inserting " << num_rows_to_insert << " rows"; if (num_rows_to_insert == 0) continue; MaterializedTestTable snapshot = CloneLatestSnapshot(); client::sp::shared_ptr<client::KuduSession> session = client_->NewSession(); - session->SetTimeoutMillis(20000); + session->SetTimeoutMillis(kSessionTimeoutMillis); ASSERT_OK_FAST(session->SetFlushMode(client::KuduSession::MANUAL_FLUSH)); for (int32_t i = 0; i < num_rows_to_insert; i++) { - int32_t row_key = rows_inserted_; + int32_t row_key = rows_inserted; MaterializedTestRow test_row = { row_key, static_cast<int32_t>(random.Next()), Substitute("$0", random.Next()), @@ -404,20 +416,20 @@ TEST_F(RandomizedTabletHistoryGcITest, TestRandomHistoryGCWorkload) { KuduPartialRow* row = insert->mutable_row(); ASSERT_OK_FAST(row->SetInt32(0, test_row.key)); ASSERT_OK_FAST(row->SetInt32(1, test_row.int_val)); - ASSERT_OK_FAST(row->SetStringCopy(2, test_row.string_val)); + ASSERT_OK_FAST(row->SetString(2, test_row.string_val)); ASSERT_OK_FAST(session->Apply(insert.release())); VLOG(2) << "Inserting row " << StringifyTestRow(test_row); snapshot[row_key] = std::move(test_row); - rows_inserted_++; + rows_inserted++; } FlushSessionOrDie(session); SaveSnapshot(std::move(snapshot), clock_->Now()); break; } case kUpdate: { - if (rows_inserted_ == 0) continue; - int32_t num_rows_to_update = random.Uniform(std::min(rows_inserted_, 1000)); + if (rows_inserted == 0) continue; + int32_t num_rows_to_update = random.Uniform(std::min(rows_inserted, 1000)); VLOG(1) << "Updating up to " << num_rows_to_update << " rows"; if (num_rows_to_update == 0) continue; @@ -431,18 +443,17 @@ TEST_F(RandomizedTabletHistoryGcITest, TestRandomHistoryGCWorkload) { vector<MaterializedTestRow> updates; for (int i = 0; i < num_rows_to_update; i++) { - int32_t row_key = random.Uniform(rows_inserted_); - MaterializedTestRow test_row = snapshot[row_key]; - CHECK_EQ(row_key, test_row.key) << "Rows inserted: " << rows_inserted_ - << ", row: " << StringifyTestRow(test_row); - if (test_row.is_deleted == DELETED) continue; + int32_t row_key = random.Uniform(rows_inserted); + MaterializedTestRow* test_row = &snapshot[row_key]; + ASSERT_EQ(row_key, test_row->key) << "Rows inserted: " << rows_inserted + << ", row: " << StringifyTestRow(*test_row); + if (test_row->is_deleted == DELETED) continue; - test_row.int_val = random.Next(); - test_row.string_val = Substitute("$0", random.Next()); + test_row->int_val = random.Next(); + test_row->string_val = Substitute("$0", random.Next()); - VLOG(2) << "Updating row to " << StringifyTestRow(test_row); - updates.push_back(test_row); - snapshot[row_key] = std::move(test_row); + VLOG(2) << "Updating row to " << StringifyTestRow(*test_row); + updates.push_back(*test_row); } int rows_updated = updates.size(); @@ -457,7 +468,7 @@ TEST_F(RandomizedTabletHistoryGcITest, TestRandomHistoryGCWorkload) { tablet->SetFlushCompactCommonHooksForTests(nullptr); // Clear the hook. } else { client::sp::shared_ptr<client::KuduSession> session = client_->NewSession(); - session->SetTimeoutMillis(20000); + session->SetTimeoutMillis(kSessionTimeoutMillis); ASSERT_OK_FAST(session->SetFlushMode(client::KuduSession::MANUAL_FLUSH)); for (const MaterializedTestRow& test_row : updates) { @@ -465,7 +476,7 @@ TEST_F(RandomizedTabletHistoryGcITest, TestRandomHistoryGCWorkload) { KuduPartialRow* row = update->mutable_row(); ASSERT_OK_FAST(row->SetInt32(0, test_row.key)); ASSERT_OK_FAST(row->SetInt32(1, test_row.int_val)); - ASSERT_OK_FAST(row->SetStringCopy(2, test_row.string_val)); + ASSERT_OK_FAST(row->SetString(2, test_row.string_val)); ASSERT_OK_FAST(session->Apply(update.release())); } FlushSessionOrDie(session); @@ -475,8 +486,8 @@ TEST_F(RandomizedTabletHistoryGcITest, TestRandomHistoryGCWorkload) { break; } case kDelete: { - if (rows_inserted_ == 0) continue; - int32_t num_rows_to_delete = random.Uniform(std::min(rows_inserted_, 1000)); + if (rows_inserted == 0) continue; + int32_t num_rows_to_delete = random.Uniform(std::min(rows_inserted, 1000)); VLOG(1) << "Deleting up to " << num_rows_to_delete << " rows"; if (num_rows_to_delete == 0) continue; @@ -490,23 +501,25 @@ TEST_F(RandomizedTabletHistoryGcITest, TestRandomHistoryGCWorkload) { vector<int32_t> deletes; for (int i = 0; i < num_rows_to_delete; i++) { - int32_t row_key = random.Uniform(rows_inserted_); - MaterializedTestRow test_row = snapshot[row_key]; - CHECK_EQ(row_key, test_row.key); + int32_t row_key = random.Uniform(rows_inserted); + MaterializedTestRow* test_row = &snapshot[row_key]; + CHECK_EQ(row_key, test_row->key); - if (test_row.is_deleted == DELETED) { - VLOG(2) << "Row " << test_row.key << " is already deleted"; + if (test_row->is_deleted == DELETED) { + VLOG(2) << "Row " << test_row->key << " is already deleted"; continue; } - VLOG(2) << "Deleting row " << row_key; + + test_row->is_deleted = DELETED; + VLOG(2) << "Deleting row " << StringifyTestRow(*test_row); deletes.push_back(row_key); - test_row.is_deleted = DELETED; - snapshot[row_key] = std::move(test_row); } int rows_deleted = deletes.size(); if (rows_deleted == 0) continue; + deleted_rows_.insert(deletes.begin(), deletes.end()); + if (force_reupdate_missed_deltas) { std::shared_ptr<ReupdateHooks> hooks = std::make_shared<ReupdateHooks>(tablet, GetSimpleTestSchema()); @@ -516,7 +529,7 @@ TEST_F(RandomizedTabletHistoryGcITest, TestRandomHistoryGCWorkload) { tablet->SetFlushCompactCommonHooksForTests(nullptr); // Clear the hook. } else { client::sp::shared_ptr<client::KuduSession> session = client_->NewSession(); - session->SetTimeoutMillis(20000); + session->SetTimeoutMillis(kSessionTimeoutMillis); ASSERT_OK_FAST(session->SetFlushMode(client::KuduSession::MANUAL_FLUSH)); for (int32_t row_key : deletes) { @@ -531,6 +544,77 @@ TEST_F(RandomizedTabletHistoryGcITest, TestRandomHistoryGCWorkload) { VLOG(1) << "Deleted " << rows_deleted << " rows"; break; } + case kReinsert: { + if (rows_inserted == 0) continue; + int32_t max_rows_to_reinsert = random.Uniform(std::min(rows_inserted, 1000)); + VLOG(1) << "Reinserting up to " << max_rows_to_reinsert << " rows"; + if (max_rows_to_reinsert == 0) continue; + int num_deleted_rows = deleted_rows_.size(); + if (num_deleted_rows == 0) continue; + + + // 5% chance to reupdate while also forcing a full compaction. + bool force_reupdate_missed_deltas = random.OneIn(20); + if (force_reupdate_missed_deltas) { + VLOG(1) << "Forcing a reupdate of missed deltas"; + } + + MaterializedTestTable snapshot = CloneLatestSnapshot(); + + vector<MaterializedTestRow> reinserts; + for (int i = 0; i < max_rows_to_reinsert; i++) { + const int32_t row_key = *std::next(deleted_rows_.begin(), + random.Uniform(num_deleted_rows)); + MaterializedTestRow* test_row = &snapshot[row_key]; + ASSERT_EQ(row_key, test_row->key); + + if (test_row->is_deleted == NOT_DELETED) { + VLOG(2) << "Row " << test_row->key << " is already reinserted"; + continue; + } + + test_row->int_val = random.Next(); + test_row->string_val = Substitute("$0", random.Next()); + test_row->is_deleted = NOT_DELETED; + + VLOG(2) << "Reinserting row " << StringifyTestRow(*test_row); + reinserts.push_back(*test_row); + } + + if (reinserts.empty()) continue; + + for (const MaterializedTestRow& test_row : reinserts) { + deleted_rows_.erase(test_row.key); + } + + int num_reinserted = reinserts.size(); + + if (force_reupdate_missed_deltas) { + std::shared_ptr<ReupdateHooks> hooks = + std::make_shared<ReupdateHooks>(tablet, GetSimpleTestSchema()); + hooks->set_reinserts(std::move(reinserts)); + tablet->SetFlushCompactCommonHooksForTests(hooks); + ASSERT_OK(tablet->Compact(Tablet::FORCE_COMPACT_ALL)); + tablet->SetFlushCompactCommonHooksForTests(nullptr); // Clear the hook. + } else { + client::sp::shared_ptr<client::KuduSession> session = client_->NewSession(); + session->SetTimeoutMillis(kSessionTimeoutMillis); + ASSERT_OK_FAST(session->SetFlushMode(client::KuduSession::MANUAL_FLUSH)); + + for (const MaterializedTestRow& test_row : reinserts) { + unique_ptr<client::KuduInsert> reinsert(table->NewInsert()); + KuduPartialRow* row = reinsert->mutable_row(); + ASSERT_OK_FAST(row->SetInt32(0, test_row.key)); + ASSERT_OK_FAST(row->SetInt32(1, test_row.int_val)); + ASSERT_OK_FAST(row->SetString(2, test_row.string_val)); + ASSERT_OK_FAST(session->Apply(reinsert.release())); + } + FlushSessionOrDie(session); + } + SaveSnapshot(std::move(snapshot), clock_->Now()); + VLOG(1) << "Reinserted " << num_reinserted << " rows"; + break; + } case kFlush: { if (random.OneIn(2)) { VLOG(1) << "Flushing tablet";
