Add Reinserts to tablet_history_gc-itest

This adds Reinserts as a new operation to tablet_history_gc-itest.
Reinserts happen on previously deleted rows and, like deletes,
have a change to happen mid-compaction.

This test used to fail frequently before the reinsert patches
are in and now passes.

Ran 500 loops of tablet_history_gc-itest in dist-test in ASAN with:
- KUDU_ALLOW_SLOW_TESTS=1
- --stress_cpu_threads=4

All tests passed. Result:
http://dist-test.cloudera.org//job?job_id=david.alves.1480101254.2208

Change-Id: I6b774b14d79bd7c0c0bf62b585bd70d6d975feaa
Reviewed-on: http://gerrit.cloudera.org:8080/4997
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <[email protected]>
Reviewed-by: Mike Percy <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/6f423302
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/6f423302
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/6f423302

Branch: refs/heads/master
Commit: 6f423302d18460355f6b6111ba283a54ca2d9faa
Parents: 10f2ce2
Author: David Alves <[email protected]>
Authored: Mon Nov 7 21:15:45 2016 -0800
Committer: David Ribeiro Alves <[email protected]>
Committed: Wed Nov 30 10:14:48 2016 +0000

----------------------------------------------------------------------
 .../tablet_history_gc-itest.cc                  | 158 ++++++++++++++-----
 1 file changed, 121 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/6f423302/src/kudu/integration-tests/tablet_history_gc-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/tablet_history_gc-itest.cc 
b/src/kudu/integration-tests/tablet_history_gc-itest.cc
index e9503c6..0bce2c3 100644
--- a/src/kudu/integration-tests/tablet_history_gc-itest.cc
+++ b/src/kudu/integration-tests/tablet_history_gc-itest.cc
@@ -17,6 +17,7 @@
 
 #include <map>
 #include <memory>
+#include <unordered_set>
 #include <utility>
 
 #include "kudu/client/client-test-util.h"
@@ -118,6 +119,7 @@ class RandomizedTabletHistoryGcITest : public 
TabletHistoryGcITest {
     kInsert,
     kUpdate,
     kDelete,
+    kReinsert,
     kFlush,
     kMergeCompaction,
     kRedoDeltaCompaction,
@@ -277,11 +279,11 @@ class RandomizedTabletHistoryGcITest : public 
TabletHistoryGcITest {
   }
 
   MaterializedTestSnapshots snapshots_;
+  std::unordered_set<int32_t> deleted_rows_;
   ScannerMap scanners_;
   HybridClock* clock_ = nullptr;
 
   Timestamp latest_snapshot_ts_;
-  int32_t rows_inserted_ = 0;
   int cur_round_ = 0;
 };
 
@@ -308,6 +310,13 @@ class ReupdateHooks : public 
Tablet::FlushCompactCommonHooks {
       CHECK_OK(row.SetInt32(0, row_key));
       CHECK_OK(writer.Delete(row));
     }
+    for (const MaterializedTestRow& reinsert : reinserts_) {
+      KuduPartialRow row(&client_schema_);
+      CHECK_OK(row.SetInt32(0, reinsert.key));
+      CHECK_OK(row.SetInt32(1, reinsert.int_val));
+      CHECK_OK(row.SetStringCopy(2, reinsert.string_val));
+      CHECK_OK(writer.Insert(row));
+    }
     return Status::OK();
   }
 
@@ -319,26 +328,29 @@ class ReupdateHooks : public 
Tablet::FlushCompactCommonHooks {
     deletes_ = std::move(deletes);
   }
 
+  void set_reinserts(vector<MaterializedTestRow> reinserts) {
+    reinserts_ = std::move(reinserts);
+  }
+
   void Reset() {
     updates_.clear();
     deletes_.clear();
+    reinserts_.clear();
   }
 
  private:
-  enum Action {
-    kUpdate,
-    kDelete
-  };
-
   Tablet* const tablet_;
   const Schema client_schema_;
 
   vector<MaterializedTestRow> updates_;
   vector<int32_t> deletes_;
+  vector<MaterializedTestRow> reinserts_;
 };
 
 // Randomized test that attempts to test many arbitrary history GC use cases.
 TEST_F(RandomizedTabletHistoryGcITest, TestRandomHistoryGCWorkload) {
+  const int kSessionTimeoutMillis = 20000;
+
   OverrideFlagForSlowTests("test_num_rounds",
                            Substitute("$0", FLAGS_test_num_rounds * 5));
 
@@ -376,6 +388,8 @@ TEST_F(RandomizedTabletHistoryGcITest, 
TestRandomHistoryGCWorkload) {
   // Save an empty snapshot at the "beginning of time";
   NO_FATALS(SaveSnapshot(MaterializedTestTable(), Timestamp(0)));
 
+  int32_t rows_inserted = 0;
+
   for (cur_round_ = 0; cur_round_ < FLAGS_test_num_rounds; cur_round_++) {
     VLOG(1) << "Starting round " << cur_round_;
     NO_FATALS(VerifyScannersForRound(cur_round_));
@@ -383,19 +397,17 @@ TEST_F(RandomizedTabletHistoryGcITest, 
TestRandomHistoryGCWorkload) {
     int action = random.Uniform(kNumActions);
     switch (action) {
       case kInsert: {
-        // TODO: Allow for reinsert onto deleted rows after KUDU-237 has been
-        // implemented.
         int32_t num_rows_to_insert = random.Uniform(1000);
         VLOG(1) << "Inserting " << num_rows_to_insert << " rows";
         if (num_rows_to_insert == 0) continue;
         MaterializedTestTable snapshot = CloneLatestSnapshot();
 
         client::sp::shared_ptr<client::KuduSession> session = 
client_->NewSession();
-        session->SetTimeoutMillis(20000);
+        session->SetTimeoutMillis(kSessionTimeoutMillis);
         
ASSERT_OK_FAST(session->SetFlushMode(client::KuduSession::MANUAL_FLUSH));
 
         for (int32_t i = 0; i < num_rows_to_insert; i++) {
-          int32_t row_key = rows_inserted_;
+          int32_t row_key = rows_inserted;
           MaterializedTestRow test_row = { row_key,
                                            static_cast<int32_t>(random.Next()),
                                            Substitute("$0", random.Next()),
@@ -404,20 +416,20 @@ TEST_F(RandomizedTabletHistoryGcITest, 
TestRandomHistoryGCWorkload) {
           KuduPartialRow* row = insert->mutable_row();
           ASSERT_OK_FAST(row->SetInt32(0, test_row.key));
           ASSERT_OK_FAST(row->SetInt32(1, test_row.int_val));
-          ASSERT_OK_FAST(row->SetStringCopy(2, test_row.string_val));
+          ASSERT_OK_FAST(row->SetString(2, test_row.string_val));
           ASSERT_OK_FAST(session->Apply(insert.release()));
 
           VLOG(2) << "Inserting row " << StringifyTestRow(test_row);
           snapshot[row_key] = std::move(test_row);
-          rows_inserted_++;
+          rows_inserted++;
         }
         FlushSessionOrDie(session);
         SaveSnapshot(std::move(snapshot), clock_->Now());
         break;
       }
       case kUpdate: {
-        if (rows_inserted_ == 0) continue;
-        int32_t num_rows_to_update = random.Uniform(std::min(rows_inserted_, 
1000));
+        if (rows_inserted == 0) continue;
+        int32_t num_rows_to_update = random.Uniform(std::min(rows_inserted, 
1000));
         VLOG(1) << "Updating up to " << num_rows_to_update << " rows";
         if (num_rows_to_update == 0) continue;
 
@@ -431,18 +443,17 @@ TEST_F(RandomizedTabletHistoryGcITest, 
TestRandomHistoryGCWorkload) {
 
         vector<MaterializedTestRow> updates;
         for (int i = 0; i < num_rows_to_update; i++) {
-          int32_t row_key = random.Uniform(rows_inserted_);
-          MaterializedTestRow test_row = snapshot[row_key];
-          CHECK_EQ(row_key, test_row.key) << "Rows inserted: " << 
rows_inserted_
-              << ", row: " << StringifyTestRow(test_row);
-          if (test_row.is_deleted == DELETED) continue;
+          int32_t row_key = random.Uniform(rows_inserted);
+          MaterializedTestRow* test_row = &snapshot[row_key];
+          ASSERT_EQ(row_key, test_row->key) << "Rows inserted: " << 
rows_inserted
+              << ", row: " << StringifyTestRow(*test_row);
+          if (test_row->is_deleted == DELETED) continue;
 
-          test_row.int_val = random.Next();
-          test_row.string_val = Substitute("$0", random.Next());
+          test_row->int_val = random.Next();
+          test_row->string_val = Substitute("$0", random.Next());
 
-          VLOG(2) << "Updating row to " << StringifyTestRow(test_row);
-          updates.push_back(test_row);
-          snapshot[row_key] = std::move(test_row);
+          VLOG(2) << "Updating row to " << StringifyTestRow(*test_row);
+          updates.push_back(*test_row);
         }
 
         int rows_updated = updates.size();
@@ -457,7 +468,7 @@ TEST_F(RandomizedTabletHistoryGcITest, 
TestRandomHistoryGCWorkload) {
           tablet->SetFlushCompactCommonHooksForTests(nullptr); // Clear the 
hook.
         } else {
           client::sp::shared_ptr<client::KuduSession> session = 
client_->NewSession();
-          session->SetTimeoutMillis(20000);
+          session->SetTimeoutMillis(kSessionTimeoutMillis);
           
ASSERT_OK_FAST(session->SetFlushMode(client::KuduSession::MANUAL_FLUSH));
 
           for (const MaterializedTestRow& test_row : updates) {
@@ -465,7 +476,7 @@ TEST_F(RandomizedTabletHistoryGcITest, 
TestRandomHistoryGCWorkload) {
             KuduPartialRow* row = update->mutable_row();
             ASSERT_OK_FAST(row->SetInt32(0, test_row.key));
             ASSERT_OK_FAST(row->SetInt32(1, test_row.int_val));
-            ASSERT_OK_FAST(row->SetStringCopy(2, test_row.string_val));
+            ASSERT_OK_FAST(row->SetString(2, test_row.string_val));
             ASSERT_OK_FAST(session->Apply(update.release()));
           }
           FlushSessionOrDie(session);
@@ -475,8 +486,8 @@ TEST_F(RandomizedTabletHistoryGcITest, 
TestRandomHistoryGCWorkload) {
         break;
       }
       case kDelete: {
-        if (rows_inserted_ == 0) continue;
-        int32_t num_rows_to_delete = random.Uniform(std::min(rows_inserted_, 
1000));
+        if (rows_inserted == 0) continue;
+        int32_t num_rows_to_delete = random.Uniform(std::min(rows_inserted, 
1000));
         VLOG(1) << "Deleting up to " << num_rows_to_delete << " rows";
         if (num_rows_to_delete == 0) continue;
 
@@ -490,23 +501,25 @@ TEST_F(RandomizedTabletHistoryGcITest, 
TestRandomHistoryGCWorkload) {
 
         vector<int32_t> deletes;
         for (int i = 0; i < num_rows_to_delete; i++) {
-          int32_t row_key = random.Uniform(rows_inserted_);
-          MaterializedTestRow test_row = snapshot[row_key];
-          CHECK_EQ(row_key, test_row.key);
+          int32_t row_key = random.Uniform(rows_inserted);
+          MaterializedTestRow* test_row = &snapshot[row_key];
+          CHECK_EQ(row_key, test_row->key);
 
-          if (test_row.is_deleted == DELETED) {
-            VLOG(2) << "Row " << test_row.key << " is already deleted";
+          if (test_row->is_deleted == DELETED) {
+            VLOG(2) << "Row " << test_row->key << " is already deleted";
             continue;
           }
-          VLOG(2) << "Deleting row " << row_key;
+
+          test_row->is_deleted = DELETED;
+          VLOG(2) << "Deleting row " << StringifyTestRow(*test_row);
           deletes.push_back(row_key);
-          test_row.is_deleted = DELETED;
-          snapshot[row_key] = std::move(test_row);
         }
 
         int rows_deleted = deletes.size();
         if (rows_deleted == 0) continue;
 
+        deleted_rows_.insert(deletes.begin(), deletes.end());
+
         if (force_reupdate_missed_deltas) {
           std::shared_ptr<ReupdateHooks> hooks =
               std::make_shared<ReupdateHooks>(tablet, GetSimpleTestSchema());
@@ -516,7 +529,7 @@ TEST_F(RandomizedTabletHistoryGcITest, 
TestRandomHistoryGCWorkload) {
           tablet->SetFlushCompactCommonHooksForTests(nullptr); // Clear the 
hook.
         } else {
           client::sp::shared_ptr<client::KuduSession> session = 
client_->NewSession();
-          session->SetTimeoutMillis(20000);
+          session->SetTimeoutMillis(kSessionTimeoutMillis);
           
ASSERT_OK_FAST(session->SetFlushMode(client::KuduSession::MANUAL_FLUSH));
 
           for (int32_t row_key : deletes) {
@@ -531,6 +544,77 @@ TEST_F(RandomizedTabletHistoryGcITest, 
TestRandomHistoryGCWorkload) {
         VLOG(1) << "Deleted " << rows_deleted << " rows";
         break;
       }
+      case kReinsert: {
+        if (rows_inserted == 0) continue;
+        int32_t max_rows_to_reinsert = random.Uniform(std::min(rows_inserted, 
1000));
+        VLOG(1) << "Reinserting up to " << max_rows_to_reinsert << " rows";
+        if (max_rows_to_reinsert == 0) continue;
+        int num_deleted_rows = deleted_rows_.size();
+        if (num_deleted_rows == 0) continue;
+
+
+        // 5% chance to reupdate while also forcing a full compaction.
+        bool force_reupdate_missed_deltas = random.OneIn(20);
+        if (force_reupdate_missed_deltas) {
+          VLOG(1) << "Forcing a reupdate of missed deltas";
+        }
+
+        MaterializedTestTable snapshot = CloneLatestSnapshot();
+
+        vector<MaterializedTestRow> reinserts;
+        for (int i = 0; i < max_rows_to_reinsert; i++) {
+          const int32_t row_key = *std::next(deleted_rows_.begin(),
+                                             random.Uniform(num_deleted_rows));
+          MaterializedTestRow* test_row = &snapshot[row_key];
+          ASSERT_EQ(row_key, test_row->key);
+
+          if (test_row->is_deleted == NOT_DELETED) {
+            VLOG(2) << "Row " << test_row->key << " is already reinserted";
+            continue;
+          }
+
+          test_row->int_val = random.Next();
+          test_row->string_val = Substitute("$0", random.Next());
+          test_row->is_deleted = NOT_DELETED;
+
+          VLOG(2) << "Reinserting row " << StringifyTestRow(*test_row);
+          reinserts.push_back(*test_row);
+        }
+
+        if (reinserts.empty()) continue;
+
+        for (const MaterializedTestRow& test_row : reinserts) {
+          deleted_rows_.erase(test_row.key);
+        }
+
+        int num_reinserted = reinserts.size();
+
+        if (force_reupdate_missed_deltas) {
+          std::shared_ptr<ReupdateHooks> hooks =
+              std::make_shared<ReupdateHooks>(tablet, GetSimpleTestSchema());
+          hooks->set_reinserts(std::move(reinserts));
+          tablet->SetFlushCompactCommonHooksForTests(hooks);
+          ASSERT_OK(tablet->Compact(Tablet::FORCE_COMPACT_ALL));
+          tablet->SetFlushCompactCommonHooksForTests(nullptr); // Clear the 
hook.
+        } else {
+          client::sp::shared_ptr<client::KuduSession> session = 
client_->NewSession();
+          session->SetTimeoutMillis(kSessionTimeoutMillis);
+          
ASSERT_OK_FAST(session->SetFlushMode(client::KuduSession::MANUAL_FLUSH));
+
+          for (const MaterializedTestRow& test_row : reinserts) {
+            unique_ptr<client::KuduInsert> reinsert(table->NewInsert());
+            KuduPartialRow* row = reinsert->mutable_row();
+            ASSERT_OK_FAST(row->SetInt32(0, test_row.key));
+            ASSERT_OK_FAST(row->SetInt32(1, test_row.int_val));
+            ASSERT_OK_FAST(row->SetString(2, test_row.string_val));
+            ASSERT_OK_FAST(session->Apply(reinsert.release()));
+          }
+          FlushSessionOrDie(session);
+        }
+        SaveSnapshot(std::move(snapshot), clock_->Now());
+        VLOG(1) << "Reinserted " << num_reinserted << " rows";
+        break;
+      }
       case kFlush: {
         if (random.OneIn(2)) {
           VLOG(1) << "Flushing tablet";

Reply via email to