This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new a0b7a5280 [tablet] report per-thread status in mt-tablet-test
a0b7a5280 is described below

commit a0b7a52800de1a03f66c963e25feee8c63041acd
Author: Alexey Serbin <[email protected]>
AuthorDate: Mon Sep 9 14:46:10 2024 -0700

    [tablet] report per-thread status in mt-tablet-test
    
    With this patch, mt-tablet-test now records per-thread status
    and reports it in the very end of the scenario instead of crashing
    on the first sight of non-OK status in a per-thread routine.
    This patch also contains other minor modifications to make the code
    in mt-tablet-test more robust.  This is in the context of KUDU-3465.
    
    The motivation behind this update is to make sure that the sanitizers
    (UBSAN/ASAN/TSAN etc) have a chance to report on an issue, if any.
    
    Change-Id: I2e2cb8c3bbc9b12c5d45e603de1b0aeb14191f12
    Reviewed-on: http://gerrit.cloudera.org:8080/21776
    Tested-by: Alexey Serbin <[email protected]>
    Reviewed-by: Mahesh Reddy <[email protected]>
    Reviewed-by: Abhishek Chennaka <[email protected]>
---
 src/kudu/tablet/mt-tablet-test.cc | 294 ++++++++++++++++++++++----------------
 1 file changed, 168 insertions(+), 126 deletions(-)

diff --git a/src/kudu/tablet/mt-tablet-test.cc 
b/src/kudu/tablet/mt-tablet-test.cc
index 1e20dad9a..abf5359bb 100644
--- a/src/kudu/tablet/mt-tablet-test.cc
+++ b/src/kudu/tablet/mt-tablet-test.cc
@@ -17,13 +17,14 @@
 
 #include <cstdint>
 #include <cstdlib>
+#include <deque>
 #include <functional>
 #include <memory>
 #include <ostream>
 #include <string>
 #include <thread>
 #include <type_traits>
-#include <vector>
+#include <utility>
 
 #include <gflags/gflags.h>
 #include <glog/logging.h>
@@ -52,8 +53,11 @@
 #include "kudu/util/status.h"
 #include "kudu/util/stopwatch.h"
 #include "kudu/util/test_graph.h"
+#include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
+// IWYU pragma: no_forward_declare kudu::tablet::MultiThreadedTabletTest
+
 DECLARE_int32(tablet_history_max_age_sec);
 DECLARE_double(tablet_delta_store_major_compact_min_ratio);
 DECLARE_int32(tablet_delta_store_minor_compact_max);
@@ -75,13 +79,15 @@ DEFINE_int32(num_major_compact_deltas_threads, 1,
 DEFINE_int64(inserts_per_thread, 1000,
              "Number of rows inserted by each inserter thread");
 DEFINE_int32(tablet_test_flush_threshold_mb, 0, "Minimum memrowset size to 
flush");
-DEFINE_double(flusher_backoff, 2.0f, "Ratio to backoff the flusher thread");
+DEFINE_double(flusher_backoff, 2.0F, "Ratio to backoff the flusher thread");
 DEFINE_int32(flusher_initial_frequency_ms, 30, "Number of ms to wait between 
flushes");
 
+using std::deque;
 using std::shared_ptr;
 using std::thread;
 using std::unique_ptr;
-using std::vector;
+using strings::Substitute;
+using testing::UnitTest;
 
 namespace kudu {
 namespace tablet {
@@ -90,6 +96,14 @@ namespace {
 const MonoDelta kBackgroundOpInterval = MonoDelta::FromMilliseconds(100);
 } // anonymous namespace
 
+#define THR_RETURN_NOT_OK(tid, expr) \
+  do { \
+    if (auto s = (expr); !s.ok()) { \
+      threads_statuses_[(tid)] = std::move(s); \
+      return; \
+    } \
+  } while (false)
+
 template<class SETUP>
 class MultiThreadedTabletTest : public TabletTestBase<SETUP> {
   // Import some names from superclass, since C++ is stingy about
@@ -105,13 +119,13 @@ class MultiThreadedTabletTest : public 
TabletTestBase<SETUP> {
 
     // Warm up code cache with all the projections we'll be using.
     unique_ptr<RowwiseIterator> iter;
-    CHECK_OK(tablet()->NewRowIterator(client_schema_, &iter));
+    ASSERT_OK(tablet()->NewRowIterator(client_schema_, &iter));
     uint64_t count;
-    CHECK_OK(tablet()->CountRows(&count));
+    ASSERT_OK(tablet()->CountRows(&count));
     const Schema* schema = tablet()->schema().get();
-    ColumnSchema valcol = schema->column(schema->find_column("val"));
+    const ColumnSchema& valcol = schema->column(schema->find_column("val"));
     valcol_projection_ = Schema({ valcol }, 0);
-    CHECK_OK(tablet()->NewRowIterator(valcol_projection_, &iter));
+    ASSERT_OK(tablet()->NewRowIterator(valcol_projection_, &iter));
     codegen::CompilationManager::GetSingleton()->Wait();
 
     ts_collector_.StartDumperThread();
@@ -119,11 +133,11 @@ class MultiThreadedTabletTest : public 
TabletTestBase<SETUP> {
 
   explicit MultiThreadedTabletTest(TabletHarness::Options::ClockType 
clock_type =
                                    
TabletHarness::Options::ClockType::LOGICAL_CLOCK)
-    : TabletTestBase<SETUP>(clock_type),
-      running_insert_count_(FLAGS_num_insert_threads),
-      
ts_collector_(::testing::UnitTest::GetInstance()->current_test_info()->test_case_name())
 {}
+      : TabletTestBase<SETUP>(clock_type),
+        running_insert_count_(FLAGS_num_insert_threads),
+        
ts_collector_(UnitTest::GetInstance()->current_test_info()->test_suite_name()) 
{}
 
-  void InsertThread(int tid) {
+  void InsertThread(size_t group_tid, size_t /*tid*/) {
     CountDownOnScopeExit dec_count(&running_insert_count_);
     shared_ptr<TimeSeries> inserts = ts_collector_.GetTimeSeries("inserted");
 
@@ -137,37 +151,39 @@ class MultiThreadedTabletTest : public 
TabletTestBase<SETUP> {
       LOG(WARNING) << "Clamping the inserts per thread to " << max_rows << " 
to prevent overflow";
     }
 
-    this->InsertTestRows(tid * max_rows,
+    this->InsertTestRows(group_tid * max_rows,
                          max_rows, 0,
                          inserts.get());
   }
 
-  void UpdateThread(int tid) {
-    const Schema &schema = schema_;
+  void UpdateThread(size_t /*group_tid*/, size_t tid) {
+    // This reference is introduced to avoid compiler errors in the code below
+    // due to intricacies of typed tests.
+    const Schema& schema = schema_;
 
     shared_ptr<TimeSeries> updates = ts_collector_.GetTimeSeries("updated");
 
-    LocalTabletWriter writer(this->tablet().get(), &this->client_schema_);
+    LocalTabletWriter writer(tablet().get(), &this->client_schema_);
 
     RowBlockMemory mem(1024);
-    RowBlock block(&schema_, 1, &mem);
+    RowBlock block(&schema, 1, &mem);
     faststring update_buf;
 
     uint64_t updates_since_last_report = 0;
-    int col_idx = schema.num_key_columns() == 1 ? 2 : 3;
+    const int col_idx = schema.num_key_columns() == 1 ? 2 : 3;
     LOG(INFO) << "Update thread using schema: " << schema.ToString();
 
     KuduPartialRow row(&client_schema_);
 
     while (running_insert_count_.count() > 0) {
       unique_ptr<RowwiseIterator> iter;
-      CHECK_OK(tablet()->NewRowIterator(client_schema_, &iter));
-      CHECK_OK(iter->Init(nullptr));
+      THR_RETURN_NOT_OK(tid, tablet()->NewRowIterator(client_schema_, &iter));
+      THR_RETURN_NOT_OK(tid, iter->Init(nullptr));
 
       while (iter->HasNext() && running_insert_count_.count() > 0) {
         mem.Reset();
-        CHECK_OK(iter->NextBlock(&block));
-        CHECK_EQ(block.nrows(), 1);
+        THR_RETURN_NOT_OK(tid, iter->NextBlock(&block));
+        CHECK_EQ(1, block.nrows());
 
         if (!block.selection_vector()->IsRowSelected(0)) {
           // Don't try to update rows which aren't visible yet --
@@ -175,24 +191,18 @@ class MultiThreadedTabletTest : public 
TabletTestBase<SETUP> {
           continue;
         }
 
-
         RowBlockRow rb_row = block.row(0);
         if (rand() % 10 == 7) {
           // Increment the "val"
-          const int32_t *old_val = schema.ExtractColumnFromRow<INT32>(rb_row, 
col_idx);
+          const int32_t* old_val = schema.ExtractColumnFromRow<INT32>(rb_row, 
col_idx);
           // Issue an update. In the NullableValue setup, many of the rows 
start with
           // NULL here, so we have to check for it.
-          int32_t new_val;
-          if (old_val != nullptr) {
-            new_val = *old_val + 1;
-          } else {
-            new_val = 0;
-          }
+          const int32_t new_val = (old_val != nullptr) ? (*old_val + 1) : 0;
 
           // Rebuild the key by extracting the cells from the row
           setup_.BuildRowKeyFromExistingRow(&row, rb_row);
-          CHECK_OK(row.SetInt32(col_idx, new_val));
-          CHECK_OK(writer.Update(row));
+          THR_RETURN_NOT_OK(tid, row.SetInt32(col_idx, new_val));
+          THR_RETURN_NOT_OK(tid, writer.Update(row));
 
           if (++updates_since_last_report >= 10) {
             updates->AddValue(updates_since_last_report);
@@ -205,12 +215,12 @@ class MultiThreadedTabletTest : public 
TabletTestBase<SETUP> {
 
   // Thread which repeatedly issues CountRows() and makes sure
   // that the count doesn't go ever down.
-  void CountThread(int tid) {
+  void CountThread(size_t /*group_tid*/, size_t tid) {
     rowid_t last_count = 0;
     while (running_insert_count_.count() > 0) {
       uint64_t count;
-      CHECK_OK(tablet()->CountRows(&count));
-      ASSERT_GE(count, last_count);
+      THR_RETURN_NOT_OK(tid, tablet()->CountRows(&count));
+      CHECK_GE(count, last_count);
       last_count = count;
     }
   }
@@ -218,23 +228,23 @@ class MultiThreadedTabletTest : public 
TabletTestBase<SETUP> {
   // Thread which iterates slowly over the first 10% of the data.
   // This is meant to test that outstanding iterators don't end up
   // trying to reference already-freed memrowset memory.
-  void SlowReaderThread(int /*tid*/) {
+  void SlowReaderThread(size_t /*group_tid*/, size_t tid) {
     RowBlockMemory mem(32 * 1024);
     RowBlock block(&schema_, 1, &mem);
 
-    uint64_t max_rows = this->ClampRowCount(FLAGS_inserts_per_thread * 
FLAGS_num_insert_threads)
+    const uint64_t max_rows =
+        this->ClampRowCount(FLAGS_inserts_per_thread * 
FLAGS_num_insert_threads)
             / FLAGS_num_insert_threads;
-
-    int max_iters = FLAGS_num_insert_threads * max_rows / 10;
+    const int max_iters = FLAGS_num_insert_threads * max_rows / 10;
 
     while (running_insert_count_.count() > 0) {
       unique_ptr<RowwiseIterator> iter;
-      CHECK_OK(tablet()->NewRowIterator(client_schema_, &iter));
-      CHECK_OK(iter->Init(nullptr));
+      THR_RETURN_NOT_OK(tid, tablet()->NewRowIterator(client_schema_, &iter));
+      THR_RETURN_NOT_OK(tid, iter->Init(nullptr));
 
       for (int i = 0; i < max_iters && iter->HasNext(); i++) {
         mem.Reset();
-        CHECK_OK(iter->NextBlock(&block));
+        THR_RETURN_NOT_OK(tid, iter->NextBlock(&block));
 
         if (running_insert_count_.WaitFor(MonoDelta::FromMilliseconds(1))) {
           return;
@@ -243,19 +253,17 @@ class MultiThreadedTabletTest : public 
TabletTestBase<SETUP> {
     }
   }
 
-  void SummerThread(int tid) {
-    shared_ptr<TimeSeries> scanned_ts = ts_collector_.GetTimeSeries(
-      "scanned");
-
+  void SummerThread(size_t /*group_tid*/, size_t tid) {
+    shared_ptr<TimeSeries> scanned_ts = ts_collector_.GetTimeSeries("scanned");
     while (running_insert_count_.count() > 0) {
-      CountSum(scanned_ts);
+      THR_RETURN_NOT_OK(tid, CountSum(scanned_ts));
     }
   }
 
-  uint64_t CountSum(const shared_ptr<TimeSeries> &scanned_ts) {
+  Status CountSum(const shared_ptr<TimeSeries>& scanned_ts, uint64_t* res = 
nullptr) {
     RowBlockMemory mem(1024);  // unused, just scanning ints
 
-    static const int kBufInts = 1024*1024 / 8;
+    constexpr const int kBufInts = 1024 * 1024 / 8;
     RowBlock block(&valcol_projection_, kBufInts, &mem);
     ColumnBlock column = block.column_block(0);
 
@@ -264,12 +272,12 @@ class MultiThreadedTabletTest : public 
TabletTestBase<SETUP> {
     int64_t sum = 0;
 
     unique_ptr<RowwiseIterator> iter;
-    CHECK_OK(tablet()->NewRowIterator(valcol_projection_, &iter));
-    CHECK_OK(iter->Init(nullptr));
+    RETURN_NOT_OK(tablet()->NewRowIterator(valcol_projection_, &iter));
+    RETURN_NOT_OK(iter->Init(nullptr));
 
     while (iter->HasNext()) {
       mem.Reset();
-      CHECK_OK(iter->NextBlock(&block));
+      RETURN_NOT_OK(iter->NextBlock(&block));
 
       for (size_t j = 0; j < block.nrows(); j++) {
         sum += *reinterpret_cast<const int32_t *>(column.cell_ptr(j));
@@ -289,10 +297,13 @@ class MultiThreadedTabletTest : public 
TabletTestBase<SETUP> {
       scanned_ts->AddValue(count_since_report);
     }
 
-    return sum;
+    if (res) {
+      *res = sum;
+    }
+    return Status::OK();
   }
 
-  void FlushThread(int /*tid*/) {
+  void FlushThread(size_t /*group_tid*/, size_t tid) {
     // Start off with a very short wait time between flushes.
     // But, especially in debug mode, this will only allow a few
     // rows to get inserted between each flush, and the test will take
@@ -301,13 +312,13 @@ class MultiThreadedTabletTest : public 
TabletTestBase<SETUP> {
     while (running_insert_count_.count() > 0) {
 
       if (tablet()->MemRowSetSize() > FLAGS_tablet_test_flush_threshold_mb * 
1024 * 1024) {
-        CHECK_OK(tablet()->Flush());
+        THR_RETURN_NOT_OK(tid, tablet()->Flush());
       } else {
         LOG(INFO) << "Not flushing, memrowset not very full";
       }
 
       if (tablet()->DeltaMemStoresSize() > 
FLAGS_tablet_test_flush_threshold_mb * 1024 * 1024) {
-        CHECK_OK(tablet()->FlushBiggestDMSForTests());
+        THR_RETURN_NOT_OK(tid, tablet()->FlushBiggestDMSForTests());
       }
 
       // Wait, unless the inserters are all done.
@@ -316,52 +327,53 @@ class MultiThreadedTabletTest : public 
TabletTestBase<SETUP> {
     }
   }
 
-  void FlushDeltasThread(int /*tid*/) {
+  void FlushDeltasThread(size_t /*group_tid*/, size_t tid) {
     while (running_insert_count_.count() > 0) {
-      CHECK_OK(tablet()->FlushBiggestDMSForTests());
+      THR_RETURN_NOT_OK(tid, tablet()->FlushBiggestDMSForTests());
 
       // Wait, unless the inserters are all done.
       running_insert_count_.WaitFor(kBackgroundOpInterval);
     }
   }
 
-  void MinorCompactDeltasThread(int /*tid*/) {
-    CompactDeltas(RowSet::MINOR_DELTA_COMPACTION);
+  void MinorCompactDeltasThread(size_t /*group_tid*/, size_t tid) {
+    THR_RETURN_NOT_OK(tid, CompactDeltas(RowSet::MINOR_DELTA_COMPACTION));
   }
 
-  void MajorCompactDeltasThread(int /*tid*/) {
-    CompactDeltas(RowSet::MAJOR_DELTA_COMPACTION);
+  void MajorCompactDeltasThread(size_t /*group_tid*/, size_t tid) {
+    THR_RETURN_NOT_OK(tid, CompactDeltas(RowSet::MAJOR_DELTA_COMPACTION));
   }
 
-  void CompactDeltas(RowSet::DeltaCompactionType type) {
+  Status CompactDeltas(RowSet::DeltaCompactionType type) {
     while (running_insert_count_.count() > 0) {
       VLOG(1) << "Compacting worst deltas";
-      CHECK_OK(tablet()->CompactWorstDeltas(type));
+      RETURN_NOT_OK(tablet()->CompactWorstDeltas(type));
 
       // Wait, unless the inserters are all done.
       running_insert_count_.WaitFor(kBackgroundOpInterval);
     }
+    return Status::OK();
   }
 
-  void CompactThread(int /*tid*/) {
+  void CompactThread(size_t /*group_tid*/, size_t tid) {
     while (running_insert_count_.count() > 0) {
-      CHECK_OK(tablet()->Compact(Tablet::COMPACT_NO_FLAGS));
+      THR_RETURN_NOT_OK(tid, tablet()->Compact(Tablet::COMPACT_NO_FLAGS));
 
       // Wait, unless the inserters are all done.
       running_insert_count_.WaitFor(kBackgroundOpInterval);
     }
   }
 
-  void DeleteAncientUndoDeltasThread(int /*tid*/) {
+  void DeleteAncientUndoDeltasThread(size_t /*group_tid*/, size_t tid) {
     while (running_insert_count_.count() > 0) {
-      MonoDelta time_budget = kBackgroundOpInterval;
+      const MonoDelta time_budget = kBackgroundOpInterval;
       int64_t bytes_in_ancient_undos = 0;
-      CHECK_OK(tablet()->InitAncientUndoDeltas(time_budget, 
&bytes_in_ancient_undos));
+      THR_RETURN_NOT_OK(tid, tablet()->InitAncientUndoDeltas(time_budget, 
&bytes_in_ancient_undos));
       VLOG(1) << "Found " << bytes_in_ancient_undos << " bytes of ancient 
delta undos";
 
       int64_t blocks_deleted = 0;
       int64_t bytes_deleted = 0;
-      CHECK_OK(tablet()->DeleteAncientUndoDeltas(&blocks_deleted, 
&bytes_deleted));
+      THR_RETURN_NOT_OK(tid, 
tablet()->DeleteAncientUndoDeltas(&blocks_deleted, &bytes_deleted));
       if (blocks_deleted > 0) {
         LOG(INFO) << "Deleted " << blocks_deleted << " blocks (" << 
bytes_deleted << " bytes) "
                   << "of ancient delta undos";
@@ -374,30 +386,31 @@ class MultiThreadedTabletTest : public 
TabletTestBase<SETUP> {
 
   // Thread that looks for rowsets that are ancient and fully deleted, GCing
   // those that are.
-  void DeleteAncientDeletedRowsetsThreads(int /*tid*/) {
+  void DeleteAncientDeletedRowsetsThreads(size_t /*group_tid*/, size_t tid) {
     do {
-      int64_t bytes_in_ancient_deleted_rowsets = 0;
-      
CHECK_OK(tablet()->GetBytesInAncientDeletedRowsets(&bytes_in_ancient_deleted_rowsets));
+      // Number of bytes in ancient deleted rowsets.
+      int64_t adr_bytes = 0;
+      THR_RETURN_NOT_OK(tid, 
tablet()->GetBytesInAncientDeletedRowsets(&adr_bytes));
       VLOG(1) << Substitute("Found $0 bytes in ancient, fully deleted rowsets",
-                            bytes_in_ancient_deleted_rowsets);
-      if (bytes_in_ancient_deleted_rowsets > 0) {
-        CHECK_OK(tablet()->DeleteAncientDeletedRowsets());
+                            adr_bytes);
+      if (adr_bytes > 0) {
+        THR_RETURN_NOT_OK(tid, tablet()->DeleteAncientDeletedRowsets());
         LOG(INFO) << Substitute("Deleted $0 bytes found in ancient, fully 
deleted rowsets",
-                                bytes_in_ancient_deleted_rowsets);
+                                adr_bytes);
       }
     } while (!running_insert_count_.WaitFor(kBackgroundOpInterval));
   }
 
   // Thread which cycles between inserting and deleting a test row, each time
   // with a different value.
-  void DeleteAndReinsertCycleThread(int tid) {
+  void DeleteAndReinsertCycleThread(size_t group_tid, size_t tid) {
     int32_t iteration = 0;
     LocalTabletWriter writer(this->tablet().get(), &this->client_schema_);
 
     while (running_insert_count_.count() > 0) {
       for (int i = 0; i < 100; i++) {
-        CHECK_OK(this->InsertTestRow(&writer, tid, iteration++));
-        CHECK_OK(this->DeleteTestRow(&writer, tid));
+        THR_RETURN_NOT_OK(tid, this->InsertTestRow(&writer, group_tid, 
iteration++));
+        THR_RETURN_NOT_OK(tid, this->DeleteTestRow(&writer, group_tid));
       }
     }
   }
@@ -406,15 +419,16 @@ class MultiThreadedTabletTest : public 
TabletTestBase<SETUP> {
   // "not found" errors that might come back. This is used simultaneously with
   // DeleteAndReinsertCycleThread to check for races where we might 
accidentally
   // succeed in UPDATING a ghost row.
-  void StubbornlyUpdateSameRowThread(int tid) {
+  void StubbornlyUpdateSameRowThread(size_t group_tid, size_t tid) {
     int32_t iteration = 0;
     LocalTabletWriter writer(this->tablet().get(), &this->client_schema_);
     while (running_insert_count_.count() > 0) {
       for (int i = 0; i < 100; i++) {
-        Status s = this->UpdateTestRow(&writer, tid, iteration++);
+        Status s = this->UpdateTestRow(&writer, group_tid, iteration++);
         if (!s.ok() && !s.IsNotFound()) {
           // We expect "not found", but not any other errors.
-          CHECK_OK(s);
+          threads_statuses_[tid] = std::move(s);
+          return;
         }
       }
     }
@@ -423,7 +437,7 @@ class MultiThreadedTabletTest : public 
TabletTestBase<SETUP> {
   // Thread which wakes up periodically and collects metrics like memrowset
   // size, etc. Eventually we should have a metrics system to collect things
   // like this, but for now, this is what we've got.
-  void CollectStatisticsThread(int tid) {
+  void CollectStatisticsThread(size_t /*group_tid*/, size_t /*tid*/) {
     shared_ptr<TimeSeries> num_rowsets_ts = ts_collector_.GetTimeSeries(
       "num_rowsets");
     shared_ptr<TimeSeries> memrowset_size_ts = ts_collector_.GetTimeSeries(
@@ -443,19 +457,42 @@ class MultiThreadedTabletTest : public 
TabletTestBase<SETUP> {
     }
   }
 
-  void StartThreads(int n_threads, const std::function<void(int)>& function) {
-    for (int i = 0; i < n_threads; i++) {
-      threads_.emplace_back([=]() { function(i); });
+  // Start a group of 'threads_num' threads, each running the specified
+  // 'function' routine. The first parameter of the function is the identifier
+  // of the thread in its group, the second parameter is the global identifier
+  // of the thread among all the threads started by the test fixture.  The
+  // latter is used as the index of the corresponding data structures
+  // in the 'threads_' and the 'threads_statuses_' containers.
+  void StartThreads(size_t threads_num,
+                    const std::function<void(size_t, size_t)>& function) {
+    DCHECK_EQ(threads_.size(), threads_statuses_.size());
+    const size_t idx_offset = threads_.size();
+    for (size_t idx = 0; idx < threads_num; ++idx) {
+      threads_statuses_.emplace_back(Status::OK());
+      threads_.emplace_back([=]() { function(idx, idx_offset + idx); });
     }
   }
 
   void JoinThreads() {
+    // First, allow all the threads to join.
     for (auto& t : threads_) {
       t.join();
     }
+
+    // Now, check if there were errors reported by any of the threads.
+    const auto size = threads_statuses_.size();
+    DCHECK_EQ(threads_.size(), size);
+    for (auto idx = 0; idx < size; ++idx) {
+      SCOPED_TRACE(Substitute(
+          "$0 thread ID $1", 
UnitTest::GetInstance()->current_test_info()->name(), idx));
+      // Using EXPECT_OK() to allow for all the non-OK statuses to be reported.
+      EXPECT_OK(threads_statuses_[idx]);
+    }
   }
 
-  vector<thread> threads_;
+  deque<thread> threads_;
+  deque<Status> threads_statuses_;
+
   CountDownLatch running_insert_count_;
 
   // Projection with only an int column.
@@ -477,39 +514,42 @@ TYPED_TEST(MultiThreadedTabletTest, DoTestAllAtOnce) {
   }
 
   // Spawn a bunch of threads, each of which will do updates.
-  this->StartThreads(1, [this](int i) { this->CollectStatisticsThread(i); });
+  this->StartThreads(1,
+                     [this](size_t i, size_t j) { 
this->CollectStatisticsThread(i, j); });
   this->StartThreads(FLAGS_num_insert_threads,
-                     [this](int i) { this->InsertThread(i); });
+                     [this](size_t i, size_t j) { this->InsertThread(i, j); });
   this->StartThreads(FLAGS_num_counter_threads,
-                     [this](int i) { this->CountThread(i); });
+                     [this](size_t i, size_t j) { this->CountThread(i, j); });
   this->StartThreads(FLAGS_num_summer_threads,
-                     [this](int i) { this->SummerThread(i); });
+                     [this](size_t i, size_t j) { this->SummerThread(i, j); });
   this->StartThreads(FLAGS_num_flush_threads,
-                     [this](int i) { this->FlushThread(i); });
+                     [this](size_t i, size_t j) { this->FlushThread(i, j); });
   this->StartThreads(FLAGS_num_compact_threads,
-                     [this](int i) { this->CompactThread(i); });
+                     [this](size_t i, size_t j) { this->CompactThread(i, j); 
});
   this->StartThreads(FLAGS_num_undo_delta_gc_threads,
-                     [this](int i) { this->DeleteAncientUndoDeltasThread(i); 
});
+                     [this](size_t i, size_t j) { 
this->DeleteAncientUndoDeltasThread(i, j); });
   this->StartThreads(FLAGS_num_flush_delta_threads,
-                     [this](int i) { this->FlushDeltasThread(i); });
+                     [this](size_t i, size_t j) { this->FlushDeltasThread(i, 
j); });
   this->StartThreads(FLAGS_num_minor_compact_deltas_threads,
-                     [this](int i) { this->MinorCompactDeltasThread(i); });
+                     [this](size_t i, size_t j) { 
this->MinorCompactDeltasThread(i, j); });
   this->StartThreads(FLAGS_num_major_compact_deltas_threads,
-                     [this](int i) { this->MajorCompactDeltasThread(i); });
+                     [this](size_t i, size_t j) { 
this->MajorCompactDeltasThread(i, j); });
   this->StartThreads(FLAGS_num_slowreader_threads,
-                     [this](int i) { this->SlowReaderThread(i); });
+                     [this](size_t i, size_t j) { this->SlowReaderThread(i, 
j); });
   this->StartThreads(FLAGS_num_updater_threads,
-                     [this](int i) { this->UpdateThread(i); });
-  this->JoinThreads();
+                     [this](size_t i, size_t j) { this->UpdateThread(i, j); });
+  NO_FATALS(this->JoinThreads());
+
   LOG_TIMING(INFO, "Summing int32 column") {
-    uint64_t sum = this->CountSum(shared_ptr<TimeSeries>());
+    uint64_t sum = 0;
+    ASSERT_OK(this->CountSum(shared_ptr<TimeSeries>(), &sum));
     LOG(INFO) << "Sum = " << sum;
   }
 
-  uint64_t max_rows = this->ClampRowCount(FLAGS_inserts_per_thread * 
FLAGS_num_insert_threads)
-          / FLAGS_num_insert_threads;
+  const uint64_t max_rows = this->ClampRowCount(
+      FLAGS_inserts_per_thread * FLAGS_num_insert_threads) / 
FLAGS_num_insert_threads;
 
-  this->VerifyTestRows(0, max_rows * FLAGS_num_insert_threads);
+  NO_FATALS(this->VerifyTestRows(0, max_rows * FLAGS_num_insert_threads));
 }
 
 // Start up a bunch of threads which repeatedly insert and delete the same
@@ -521,26 +561,27 @@ TYPED_TEST(MultiThreadedTabletTest, DeleteAndReinsert) {
   FLAGS_flusher_initial_frequency_ms = 1;
   FLAGS_tablet_delta_store_major_compact_min_ratio = 0.01F;
   FLAGS_tablet_delta_store_minor_compact_max = 10;
-  this->StartThreads(1, [this](int i) { this->CollectStatisticsThread(i); });
+  this->StartThreads(1,
+                     [this](size_t i, size_t j) { 
this->CollectStatisticsThread(i, j); });
   this->StartThreads(FLAGS_num_flush_threads,
-                     [this](int i) { this->FlushThread(i); });
+                     [this](size_t i, size_t j) { this->FlushThread(i, j); });
   this->StartThreads(FLAGS_num_compact_threads,
-                     [this](int i) { this->CompactThread(i); });
+                     [this](size_t i, size_t j) { this->CompactThread(i, j); 
});
   this->StartThreads(FLAGS_num_undo_delta_gc_threads,
-                     [this](int i) { this->DeleteAncientUndoDeltasThread(i); 
});
+                     [this](size_t i, size_t j) { 
this->DeleteAncientUndoDeltasThread(i, j); });
   this->StartThreads(FLAGS_num_flush_delta_threads,
-                     [this](int i) { this->FlushDeltasThread(i); });
+                     [this](size_t i, size_t j) { this->FlushDeltasThread(i, 
j); });
   this->StartThreads(FLAGS_num_minor_compact_deltas_threads,
-                     [this](int i) { this->MinorCompactDeltasThread(i); });
+                     [this](size_t i, size_t j) { 
this->MinorCompactDeltasThread(i, j); });
   this->StartThreads(FLAGS_num_major_compact_deltas_threads,
-                     [this](int i) { this->MajorCompactDeltasThread(i); });
+                     [this](size_t i, size_t j) { 
this->MajorCompactDeltasThread(i, j); });
   this->StartThreads(10,
-                     [this](int i) { this->DeleteAndReinsertCycleThread(i); });
+                     [this](size_t i, size_t j) { 
this->DeleteAndReinsertCycleThread(i, j); });
   this->StartThreads(10,
-                     [this](int i) { this->StubbornlyUpdateSameRowThread(i); 
});
+                     [this](size_t i, size_t j) { 
this->StubbornlyUpdateSameRowThread(i, j); });
 
   // Run very quickly in dev builds, longer in slow builds.
-  float runtime_seconds = AllowSlowTests() ? 2 : 0.1;
+  const float runtime_seconds = AllowSlowTests() ? 2 : 0.1;
   Stopwatch sw;
   sw.start();
   while (sw.elapsed().wall < runtime_seconds * NANOS_PER_SECOND &&
@@ -551,7 +592,7 @@ TYPED_TEST(MultiThreadedTabletTest, DeleteAndReinsert) {
   // This is sort of a hack -- the flusher thread stops when it sees this
   // countdown latch go to 0.
   this->running_insert_count_.Reset(0);
-  this->JoinThreads();
+  NO_FATALS(this->JoinThreads());
 }
 
 // For tests where we want to use the hybrid clock. The hybrid clock is
@@ -560,7 +601,7 @@ template<class SETUP>
 class MultiThreadedHybridClockTabletTest : public 
MultiThreadedTabletTest<SETUP> {
  public:
   MultiThreadedHybridClockTabletTest()
-    : 
MultiThreadedTabletTest<SETUP>(TabletHarness::Options::ClockType::HYBRID_CLOCK) 
{
+      : 
MultiThreadedTabletTest<SETUP>(TabletHarness::Options::ClockType::HYBRID_CLOCK) 
{
   }
 };
 
@@ -579,28 +620,29 @@ TYPED_TEST(MultiThreadedHybridClockTabletTest, 
UpdateNoMergeCompaction) {
 
   // Start up our background op threads, targeting the creation of delta files.
   this->StartThreads(FLAGS_num_flush_threads,
-                     [this](int i) { this->FlushThread(i); });
+                     [this](size_t i, size_t j) { this->FlushThread(i, j); });
   this->StartThreads(FLAGS_num_flush_delta_threads,
-                     [this](int i) { this->FlushDeltasThread(i); });
+                     [this](size_t i, size_t j) { this->FlushDeltasThread(i, 
j); });
   this->StartThreads(FLAGS_num_major_compact_deltas_threads,
-                     [this](int i) { this->MajorCompactDeltasThread(i); });
+                     [this](size_t i, size_t j) { 
this->MajorCompactDeltasThread(i, j); });
   this->StartThreads(FLAGS_num_undo_delta_gc_threads,
-                     [this](int i) { this->DeleteAncientUndoDeltasThread(i); 
});
+                     [this](size_t i, size_t j) { 
this->DeleteAncientUndoDeltasThread(i, j); });
   this->StartThreads(FLAGS_num_deleted_rowset_gc_threads,
-                     [this](int i) { 
this->DeleteAncientDeletedRowsetsThreads(i); });
+                     [this](size_t i, size_t j) {
+                       this->DeleteAncientDeletedRowsetsThreads(i, j); });
   // Start our workload threads, targeting the creation of deltas that we can
   // eventually GC.
   this->StartThreads(10,
-                     [this](int i) { this->DeleteAndReinsertCycleThread(i); });
+                     [this](size_t i, size_t j) { 
this->DeleteAndReinsertCycleThread(i, j); });
   this->StartThreads(10,
-                     [this](int i) { this->StubbornlyUpdateSameRowThread(i); 
});
+                     [this](size_t i, size_t j) { 
this->StubbornlyUpdateSameRowThread(i, j); });
 
   // For good measure, we'll also start a thread that scans.
   this->StartThreads(FLAGS_num_summer_threads,
-                     [this](int i) { this->SummerThread(i); });
+                     [this](size_t i, size_t j) { this->SummerThread(i, j); });
 
   // Run very quickly in dev builds, longer in slow builds.
-  float runtime_seconds = AllowSlowTests() ? 2 : 0.1;
+  const float runtime_seconds = AllowSlowTests() ? 2 : 0.1;
   Stopwatch sw;
   sw.start();
   while (sw.elapsed().wall < runtime_seconds * NANOS_PER_SECOND &&
@@ -611,7 +653,7 @@ TYPED_TEST(MultiThreadedHybridClockTabletTest, 
UpdateNoMergeCompaction) {
   // This is sort of a hack -- the flusher thread stops when it sees this
   // countdown latch go to 0.
   this->running_insert_count_.Reset(0);
-  this->JoinThreads();
+  NO_FATALS(this->JoinThreads());
 }
 
 } // namespace tablet

Reply via email to