This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new a0b7a5280 [tablet] report per-thread status in mt-tablet-test
a0b7a5280 is described below
commit a0b7a52800de1a03f66c963e25feee8c63041acd
Author: Alexey Serbin <[email protected]>
AuthorDate: Mon Sep 9 14:46:10 2024 -0700
[tablet] report per-thread status in mt-tablet-test
With this patch, mt-tablet-test now records per-thread status
and reports it in the very end of the scenario instead of crashing
on the first sight of non-OK status in a per-thread routine.
This patch also contains other minor modifications to make the code
in mt-tablet-test more robust. This is in the context of KUDU-3465.
The motivation behind this update is to make sure that the sanitizers
(UBSAN/ASAN/TSAN etc) have a chance to report on an issue, if any.
Change-Id: I2e2cb8c3bbc9b12c5d45e603de1b0aeb14191f12
Reviewed-on: http://gerrit.cloudera.org:8080/21776
Tested-by: Alexey Serbin <[email protected]>
Reviewed-by: Mahesh Reddy <[email protected]>
Reviewed-by: Abhishek Chennaka <[email protected]>
---
src/kudu/tablet/mt-tablet-test.cc | 294 ++++++++++++++++++++++----------------
1 file changed, 168 insertions(+), 126 deletions(-)
diff --git a/src/kudu/tablet/mt-tablet-test.cc
b/src/kudu/tablet/mt-tablet-test.cc
index 1e20dad9a..abf5359bb 100644
--- a/src/kudu/tablet/mt-tablet-test.cc
+++ b/src/kudu/tablet/mt-tablet-test.cc
@@ -17,13 +17,14 @@
#include <cstdint>
#include <cstdlib>
+#include <deque>
#include <functional>
#include <memory>
#include <ostream>
#include <string>
#include <thread>
#include <type_traits>
-#include <vector>
+#include <utility>
#include <gflags/gflags.h>
#include <glog/logging.h>
@@ -52,8 +53,11 @@
#include "kudu/util/status.h"
#include "kudu/util/stopwatch.h"
#include "kudu/util/test_graph.h"
+#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
+// IWYU pragma: no_forward_declare kudu::tablet::MultiThreadedTabletTest
+
DECLARE_int32(tablet_history_max_age_sec);
DECLARE_double(tablet_delta_store_major_compact_min_ratio);
DECLARE_int32(tablet_delta_store_minor_compact_max);
@@ -75,13 +79,15 @@ DEFINE_int32(num_major_compact_deltas_threads, 1,
DEFINE_int64(inserts_per_thread, 1000,
"Number of rows inserted by each inserter thread");
DEFINE_int32(tablet_test_flush_threshold_mb, 0, "Minimum memrowset size to
flush");
-DEFINE_double(flusher_backoff, 2.0f, "Ratio to backoff the flusher thread");
+DEFINE_double(flusher_backoff, 2.0F, "Ratio to backoff the flusher thread");
DEFINE_int32(flusher_initial_frequency_ms, 30, "Number of ms to wait between
flushes");
+using std::deque;
using std::shared_ptr;
using std::thread;
using std::unique_ptr;
-using std::vector;
+using strings::Substitute;
+using testing::UnitTest;
namespace kudu {
namespace tablet {
@@ -90,6 +96,14 @@ namespace {
const MonoDelta kBackgroundOpInterval = MonoDelta::FromMilliseconds(100);
} // anonymous namespace
+#define THR_RETURN_NOT_OK(tid, expr) \
+ do { \
+ if (auto s = (expr); !s.ok()) { \
+ threads_statuses_[(tid)] = std::move(s); \
+ return; \
+ } \
+ } while (false)
+
template<class SETUP>
class MultiThreadedTabletTest : public TabletTestBase<SETUP> {
// Import some names from superclass, since C++ is stingy about
@@ -105,13 +119,13 @@ class MultiThreadedTabletTest : public
TabletTestBase<SETUP> {
// Warm up code cache with all the projections we'll be using.
unique_ptr<RowwiseIterator> iter;
- CHECK_OK(tablet()->NewRowIterator(client_schema_, &iter));
+ ASSERT_OK(tablet()->NewRowIterator(client_schema_, &iter));
uint64_t count;
- CHECK_OK(tablet()->CountRows(&count));
+ ASSERT_OK(tablet()->CountRows(&count));
const Schema* schema = tablet()->schema().get();
- ColumnSchema valcol = schema->column(schema->find_column("val"));
+ const ColumnSchema& valcol = schema->column(schema->find_column("val"));
valcol_projection_ = Schema({ valcol }, 0);
- CHECK_OK(tablet()->NewRowIterator(valcol_projection_, &iter));
+ ASSERT_OK(tablet()->NewRowIterator(valcol_projection_, &iter));
codegen::CompilationManager::GetSingleton()->Wait();
ts_collector_.StartDumperThread();
@@ -119,11 +133,11 @@ class MultiThreadedTabletTest : public
TabletTestBase<SETUP> {
explicit MultiThreadedTabletTest(TabletHarness::Options::ClockType
clock_type =
TabletHarness::Options::ClockType::LOGICAL_CLOCK)
- : TabletTestBase<SETUP>(clock_type),
- running_insert_count_(FLAGS_num_insert_threads),
-
ts_collector_(::testing::UnitTest::GetInstance()->current_test_info()->test_case_name())
{}
+ : TabletTestBase<SETUP>(clock_type),
+ running_insert_count_(FLAGS_num_insert_threads),
+
ts_collector_(UnitTest::GetInstance()->current_test_info()->test_suite_name())
{}
- void InsertThread(int tid) {
+ void InsertThread(size_t group_tid, size_t /*tid*/) {
CountDownOnScopeExit dec_count(&running_insert_count_);
shared_ptr<TimeSeries> inserts = ts_collector_.GetTimeSeries("inserted");
@@ -137,37 +151,39 @@ class MultiThreadedTabletTest : public
TabletTestBase<SETUP> {
LOG(WARNING) << "Clamping the inserts per thread to " << max_rows << "
to prevent overflow";
}
- this->InsertTestRows(tid * max_rows,
+ this->InsertTestRows(group_tid * max_rows,
max_rows, 0,
inserts.get());
}
- void UpdateThread(int tid) {
- const Schema &schema = schema_;
+ void UpdateThread(size_t /*group_tid*/, size_t tid) {
+ // This reference is introduced to avoid compiler errors in the code below
+ // due to intricacies of typed tests.
+ const Schema& schema = schema_;
shared_ptr<TimeSeries> updates = ts_collector_.GetTimeSeries("updated");
- LocalTabletWriter writer(this->tablet().get(), &this->client_schema_);
+ LocalTabletWriter writer(tablet().get(), &this->client_schema_);
RowBlockMemory mem(1024);
- RowBlock block(&schema_, 1, &mem);
+ RowBlock block(&schema, 1, &mem);
faststring update_buf;
uint64_t updates_since_last_report = 0;
- int col_idx = schema.num_key_columns() == 1 ? 2 : 3;
+ const int col_idx = schema.num_key_columns() == 1 ? 2 : 3;
LOG(INFO) << "Update thread using schema: " << schema.ToString();
KuduPartialRow row(&client_schema_);
while (running_insert_count_.count() > 0) {
unique_ptr<RowwiseIterator> iter;
- CHECK_OK(tablet()->NewRowIterator(client_schema_, &iter));
- CHECK_OK(iter->Init(nullptr));
+ THR_RETURN_NOT_OK(tid, tablet()->NewRowIterator(client_schema_, &iter));
+ THR_RETURN_NOT_OK(tid, iter->Init(nullptr));
while (iter->HasNext() && running_insert_count_.count() > 0) {
mem.Reset();
- CHECK_OK(iter->NextBlock(&block));
- CHECK_EQ(block.nrows(), 1);
+ THR_RETURN_NOT_OK(tid, iter->NextBlock(&block));
+ CHECK_EQ(1, block.nrows());
if (!block.selection_vector()->IsRowSelected(0)) {
// Don't try to update rows which aren't visible yet --
@@ -175,24 +191,18 @@ class MultiThreadedTabletTest : public
TabletTestBase<SETUP> {
continue;
}
-
RowBlockRow rb_row = block.row(0);
if (rand() % 10 == 7) {
// Increment the "val"
- const int32_t *old_val = schema.ExtractColumnFromRow<INT32>(rb_row,
col_idx);
+ const int32_t* old_val = schema.ExtractColumnFromRow<INT32>(rb_row,
col_idx);
// Issue an update. In the NullableValue setup, many of the rows
start with
// NULL here, so we have to check for it.
- int32_t new_val;
- if (old_val != nullptr) {
- new_val = *old_val + 1;
- } else {
- new_val = 0;
- }
+ const int32_t new_val = (old_val != nullptr) ? (*old_val + 1) : 0;
// Rebuild the key by extracting the cells from the row
setup_.BuildRowKeyFromExistingRow(&row, rb_row);
- CHECK_OK(row.SetInt32(col_idx, new_val));
- CHECK_OK(writer.Update(row));
+ THR_RETURN_NOT_OK(tid, row.SetInt32(col_idx, new_val));
+ THR_RETURN_NOT_OK(tid, writer.Update(row));
if (++updates_since_last_report >= 10) {
updates->AddValue(updates_since_last_report);
@@ -205,12 +215,12 @@ class MultiThreadedTabletTest : public
TabletTestBase<SETUP> {
// Thread which repeatedly issues CountRows() and makes sure
// that the count doesn't go ever down.
- void CountThread(int tid) {
+ void CountThread(size_t /*group_tid*/, size_t tid) {
rowid_t last_count = 0;
while (running_insert_count_.count() > 0) {
uint64_t count;
- CHECK_OK(tablet()->CountRows(&count));
- ASSERT_GE(count, last_count);
+ THR_RETURN_NOT_OK(tid, tablet()->CountRows(&count));
+ CHECK_GE(count, last_count);
last_count = count;
}
}
@@ -218,23 +228,23 @@ class MultiThreadedTabletTest : public
TabletTestBase<SETUP> {
// Thread which iterates slowly over the first 10% of the data.
// This is meant to test that outstanding iterators don't end up
// trying to reference already-freed memrowset memory.
- void SlowReaderThread(int /*tid*/) {
+ void SlowReaderThread(size_t /*group_tid*/, size_t tid) {
RowBlockMemory mem(32 * 1024);
RowBlock block(&schema_, 1, &mem);
- uint64_t max_rows = this->ClampRowCount(FLAGS_inserts_per_thread *
FLAGS_num_insert_threads)
+ const uint64_t max_rows =
+ this->ClampRowCount(FLAGS_inserts_per_thread *
FLAGS_num_insert_threads)
/ FLAGS_num_insert_threads;
-
- int max_iters = FLAGS_num_insert_threads * max_rows / 10;
+ const int max_iters = FLAGS_num_insert_threads * max_rows / 10;
while (running_insert_count_.count() > 0) {
unique_ptr<RowwiseIterator> iter;
- CHECK_OK(tablet()->NewRowIterator(client_schema_, &iter));
- CHECK_OK(iter->Init(nullptr));
+ THR_RETURN_NOT_OK(tid, tablet()->NewRowIterator(client_schema_, &iter));
+ THR_RETURN_NOT_OK(tid, iter->Init(nullptr));
for (int i = 0; i < max_iters && iter->HasNext(); i++) {
mem.Reset();
- CHECK_OK(iter->NextBlock(&block));
+ THR_RETURN_NOT_OK(tid, iter->NextBlock(&block));
if (running_insert_count_.WaitFor(MonoDelta::FromMilliseconds(1))) {
return;
@@ -243,19 +253,17 @@ class MultiThreadedTabletTest : public
TabletTestBase<SETUP> {
}
}
- void SummerThread(int tid) {
- shared_ptr<TimeSeries> scanned_ts = ts_collector_.GetTimeSeries(
- "scanned");
-
+ void SummerThread(size_t /*group_tid*/, size_t tid) {
+ shared_ptr<TimeSeries> scanned_ts = ts_collector_.GetTimeSeries("scanned");
while (running_insert_count_.count() > 0) {
- CountSum(scanned_ts);
+ THR_RETURN_NOT_OK(tid, CountSum(scanned_ts));
}
}
- uint64_t CountSum(const shared_ptr<TimeSeries> &scanned_ts) {
+ Status CountSum(const shared_ptr<TimeSeries>& scanned_ts, uint64_t* res =
nullptr) {
RowBlockMemory mem(1024); // unused, just scanning ints
- static const int kBufInts = 1024*1024 / 8;
+ constexpr const int kBufInts = 1024 * 1024 / 8;
RowBlock block(&valcol_projection_, kBufInts, &mem);
ColumnBlock column = block.column_block(0);
@@ -264,12 +272,12 @@ class MultiThreadedTabletTest : public
TabletTestBase<SETUP> {
int64_t sum = 0;
unique_ptr<RowwiseIterator> iter;
- CHECK_OK(tablet()->NewRowIterator(valcol_projection_, &iter));
- CHECK_OK(iter->Init(nullptr));
+ RETURN_NOT_OK(tablet()->NewRowIterator(valcol_projection_, &iter));
+ RETURN_NOT_OK(iter->Init(nullptr));
while (iter->HasNext()) {
mem.Reset();
- CHECK_OK(iter->NextBlock(&block));
+ RETURN_NOT_OK(iter->NextBlock(&block));
for (size_t j = 0; j < block.nrows(); j++) {
sum += *reinterpret_cast<const int32_t *>(column.cell_ptr(j));
@@ -289,10 +297,13 @@ class MultiThreadedTabletTest : public
TabletTestBase<SETUP> {
scanned_ts->AddValue(count_since_report);
}
- return sum;
+ if (res) {
+ *res = sum;
+ }
+ return Status::OK();
}
- void FlushThread(int /*tid*/) {
+ void FlushThread(size_t /*group_tid*/, size_t tid) {
// Start off with a very short wait time between flushes.
// But, especially in debug mode, this will only allow a few
// rows to get inserted between each flush, and the test will take
@@ -301,13 +312,13 @@ class MultiThreadedTabletTest : public
TabletTestBase<SETUP> {
while (running_insert_count_.count() > 0) {
if (tablet()->MemRowSetSize() > FLAGS_tablet_test_flush_threshold_mb *
1024 * 1024) {
- CHECK_OK(tablet()->Flush());
+ THR_RETURN_NOT_OK(tid, tablet()->Flush());
} else {
LOG(INFO) << "Not flushing, memrowset not very full";
}
if (tablet()->DeltaMemStoresSize() >
FLAGS_tablet_test_flush_threshold_mb * 1024 * 1024) {
- CHECK_OK(tablet()->FlushBiggestDMSForTests());
+ THR_RETURN_NOT_OK(tid, tablet()->FlushBiggestDMSForTests());
}
// Wait, unless the inserters are all done.
@@ -316,52 +327,53 @@ class MultiThreadedTabletTest : public
TabletTestBase<SETUP> {
}
}
- void FlushDeltasThread(int /*tid*/) {
+ void FlushDeltasThread(size_t /*group_tid*/, size_t tid) {
while (running_insert_count_.count() > 0) {
- CHECK_OK(tablet()->FlushBiggestDMSForTests());
+ THR_RETURN_NOT_OK(tid, tablet()->FlushBiggestDMSForTests());
// Wait, unless the inserters are all done.
running_insert_count_.WaitFor(kBackgroundOpInterval);
}
}
- void MinorCompactDeltasThread(int /*tid*/) {
- CompactDeltas(RowSet::MINOR_DELTA_COMPACTION);
+ void MinorCompactDeltasThread(size_t /*group_tid*/, size_t tid) {
+ THR_RETURN_NOT_OK(tid, CompactDeltas(RowSet::MINOR_DELTA_COMPACTION));
}
- void MajorCompactDeltasThread(int /*tid*/) {
- CompactDeltas(RowSet::MAJOR_DELTA_COMPACTION);
+ void MajorCompactDeltasThread(size_t /*group_tid*/, size_t tid) {
+ THR_RETURN_NOT_OK(tid, CompactDeltas(RowSet::MAJOR_DELTA_COMPACTION));
}
- void CompactDeltas(RowSet::DeltaCompactionType type) {
+ Status CompactDeltas(RowSet::DeltaCompactionType type) {
while (running_insert_count_.count() > 0) {
VLOG(1) << "Compacting worst deltas";
- CHECK_OK(tablet()->CompactWorstDeltas(type));
+ RETURN_NOT_OK(tablet()->CompactWorstDeltas(type));
// Wait, unless the inserters are all done.
running_insert_count_.WaitFor(kBackgroundOpInterval);
}
+ return Status::OK();
}
- void CompactThread(int /*tid*/) {
+ void CompactThread(size_t /*group_tid*/, size_t tid) {
while (running_insert_count_.count() > 0) {
- CHECK_OK(tablet()->Compact(Tablet::COMPACT_NO_FLAGS));
+ THR_RETURN_NOT_OK(tid, tablet()->Compact(Tablet::COMPACT_NO_FLAGS));
// Wait, unless the inserters are all done.
running_insert_count_.WaitFor(kBackgroundOpInterval);
}
}
- void DeleteAncientUndoDeltasThread(int /*tid*/) {
+ void DeleteAncientUndoDeltasThread(size_t /*group_tid*/, size_t tid) {
while (running_insert_count_.count() > 0) {
- MonoDelta time_budget = kBackgroundOpInterval;
+ const MonoDelta time_budget = kBackgroundOpInterval;
int64_t bytes_in_ancient_undos = 0;
- CHECK_OK(tablet()->InitAncientUndoDeltas(time_budget,
&bytes_in_ancient_undos));
+ THR_RETURN_NOT_OK(tid, tablet()->InitAncientUndoDeltas(time_budget,
&bytes_in_ancient_undos));
VLOG(1) << "Found " << bytes_in_ancient_undos << " bytes of ancient
delta undos";
int64_t blocks_deleted = 0;
int64_t bytes_deleted = 0;
- CHECK_OK(tablet()->DeleteAncientUndoDeltas(&blocks_deleted,
&bytes_deleted));
+ THR_RETURN_NOT_OK(tid,
tablet()->DeleteAncientUndoDeltas(&blocks_deleted, &bytes_deleted));
if (blocks_deleted > 0) {
LOG(INFO) << "Deleted " << blocks_deleted << " blocks (" <<
bytes_deleted << " bytes) "
<< "of ancient delta undos";
@@ -374,30 +386,31 @@ class MultiThreadedTabletTest : public
TabletTestBase<SETUP> {
// Thread that looks for rowsets that are ancient and fully deleted, GCing
// those that are.
- void DeleteAncientDeletedRowsetsThreads(int /*tid*/) {
+ void DeleteAncientDeletedRowsetsThreads(size_t /*group_tid*/, size_t tid) {
do {
- int64_t bytes_in_ancient_deleted_rowsets = 0;
-
CHECK_OK(tablet()->GetBytesInAncientDeletedRowsets(&bytes_in_ancient_deleted_rowsets));
+ // Number of bytes in ancient deleted rowsets.
+ int64_t adr_bytes = 0;
+ THR_RETURN_NOT_OK(tid,
tablet()->GetBytesInAncientDeletedRowsets(&adr_bytes));
VLOG(1) << Substitute("Found $0 bytes in ancient, fully deleted rowsets",
- bytes_in_ancient_deleted_rowsets);
- if (bytes_in_ancient_deleted_rowsets > 0) {
- CHECK_OK(tablet()->DeleteAncientDeletedRowsets());
+ adr_bytes);
+ if (adr_bytes > 0) {
+ THR_RETURN_NOT_OK(tid, tablet()->DeleteAncientDeletedRowsets());
LOG(INFO) << Substitute("Deleted $0 bytes found in ancient, fully
deleted rowsets",
- bytes_in_ancient_deleted_rowsets);
+ adr_bytes);
}
} while (!running_insert_count_.WaitFor(kBackgroundOpInterval));
}
// Thread which cycles between inserting and deleting a test row, each time
// with a different value.
- void DeleteAndReinsertCycleThread(int tid) {
+ void DeleteAndReinsertCycleThread(size_t group_tid, size_t tid) {
int32_t iteration = 0;
LocalTabletWriter writer(this->tablet().get(), &this->client_schema_);
while (running_insert_count_.count() > 0) {
for (int i = 0; i < 100; i++) {
- CHECK_OK(this->InsertTestRow(&writer, tid, iteration++));
- CHECK_OK(this->DeleteTestRow(&writer, tid));
+ THR_RETURN_NOT_OK(tid, this->InsertTestRow(&writer, group_tid,
iteration++));
+ THR_RETURN_NOT_OK(tid, this->DeleteTestRow(&writer, group_tid));
}
}
}
@@ -406,15 +419,16 @@ class MultiThreadedTabletTest : public
TabletTestBase<SETUP> {
// "not found" errors that might come back. This is used simultaneously with
// DeleteAndReinsertCycleThread to check for races where we might
accidentally
// succeed in UPDATING a ghost row.
- void StubbornlyUpdateSameRowThread(int tid) {
+ void StubbornlyUpdateSameRowThread(size_t group_tid, size_t tid) {
int32_t iteration = 0;
LocalTabletWriter writer(this->tablet().get(), &this->client_schema_);
while (running_insert_count_.count() > 0) {
for (int i = 0; i < 100; i++) {
- Status s = this->UpdateTestRow(&writer, tid, iteration++);
+ Status s = this->UpdateTestRow(&writer, group_tid, iteration++);
if (!s.ok() && !s.IsNotFound()) {
// We expect "not found", but not any other errors.
- CHECK_OK(s);
+ threads_statuses_[tid] = std::move(s);
+ return;
}
}
}
@@ -423,7 +437,7 @@ class MultiThreadedTabletTest : public
TabletTestBase<SETUP> {
// Thread which wakes up periodically and collects metrics like memrowset
// size, etc. Eventually we should have a metrics system to collect things
// like this, but for now, this is what we've got.
- void CollectStatisticsThread(int tid) {
+ void CollectStatisticsThread(size_t /*group_tid*/, size_t /*tid*/) {
shared_ptr<TimeSeries> num_rowsets_ts = ts_collector_.GetTimeSeries(
"num_rowsets");
shared_ptr<TimeSeries> memrowset_size_ts = ts_collector_.GetTimeSeries(
@@ -443,19 +457,42 @@ class MultiThreadedTabletTest : public
TabletTestBase<SETUP> {
}
}
- void StartThreads(int n_threads, const std::function<void(int)>& function) {
- for (int i = 0; i < n_threads; i++) {
- threads_.emplace_back([=]() { function(i); });
+ // Start a group of 'threads_num' threads, each running the specified
+ // 'function' routine. The first parameter of the function is the identifier
+ // of the thread in its group, the second parameter is the global identifier
+ // of the thread among all the threads started by the test fixture. The
+ // latter is used as the index of the corresponding data structures
+ // in the 'threads_' and the 'threads_statuses_' containers.
+ void StartThreads(size_t threads_num,
+ const std::function<void(size_t, size_t)>& function) {
+ DCHECK_EQ(threads_.size(), threads_statuses_.size());
+ const size_t idx_offset = threads_.size();
+ for (size_t idx = 0; idx < threads_num; ++idx) {
+ threads_statuses_.emplace_back(Status::OK());
+ threads_.emplace_back([=]() { function(idx, idx_offset + idx); });
}
}
void JoinThreads() {
+ // First, allow all the threads to join.
for (auto& t : threads_) {
t.join();
}
+
+ // Now, check if there were errors reported by any of the threads.
+ const auto size = threads_statuses_.size();
+ DCHECK_EQ(threads_.size(), size);
+ for (auto idx = 0; idx < size; ++idx) {
+ SCOPED_TRACE(Substitute(
+ "$0 thread ID $1",
UnitTest::GetInstance()->current_test_info()->name(), idx));
+ // Using EXPECT_OK() to allow for all the non-OK statuses to be reported.
+ EXPECT_OK(threads_statuses_[idx]);
+ }
}
- vector<thread> threads_;
+ deque<thread> threads_;
+ deque<Status> threads_statuses_;
+
CountDownLatch running_insert_count_;
// Projection with only an int column.
@@ -477,39 +514,42 @@ TYPED_TEST(MultiThreadedTabletTest, DoTestAllAtOnce) {
}
// Spawn a bunch of threads, each of which will do updates.
- this->StartThreads(1, [this](int i) { this->CollectStatisticsThread(i); });
+ this->StartThreads(1,
+ [this](size_t i, size_t j) {
this->CollectStatisticsThread(i, j); });
this->StartThreads(FLAGS_num_insert_threads,
- [this](int i) { this->InsertThread(i); });
+ [this](size_t i, size_t j) { this->InsertThread(i, j); });
this->StartThreads(FLAGS_num_counter_threads,
- [this](int i) { this->CountThread(i); });
+ [this](size_t i, size_t j) { this->CountThread(i, j); });
this->StartThreads(FLAGS_num_summer_threads,
- [this](int i) { this->SummerThread(i); });
+ [this](size_t i, size_t j) { this->SummerThread(i, j); });
this->StartThreads(FLAGS_num_flush_threads,
- [this](int i) { this->FlushThread(i); });
+ [this](size_t i, size_t j) { this->FlushThread(i, j); });
this->StartThreads(FLAGS_num_compact_threads,
- [this](int i) { this->CompactThread(i); });
+ [this](size_t i, size_t j) { this->CompactThread(i, j);
});
this->StartThreads(FLAGS_num_undo_delta_gc_threads,
- [this](int i) { this->DeleteAncientUndoDeltasThread(i);
});
+ [this](size_t i, size_t j) {
this->DeleteAncientUndoDeltasThread(i, j); });
this->StartThreads(FLAGS_num_flush_delta_threads,
- [this](int i) { this->FlushDeltasThread(i); });
+ [this](size_t i, size_t j) { this->FlushDeltasThread(i,
j); });
this->StartThreads(FLAGS_num_minor_compact_deltas_threads,
- [this](int i) { this->MinorCompactDeltasThread(i); });
+ [this](size_t i, size_t j) {
this->MinorCompactDeltasThread(i, j); });
this->StartThreads(FLAGS_num_major_compact_deltas_threads,
- [this](int i) { this->MajorCompactDeltasThread(i); });
+ [this](size_t i, size_t j) {
this->MajorCompactDeltasThread(i, j); });
this->StartThreads(FLAGS_num_slowreader_threads,
- [this](int i) { this->SlowReaderThread(i); });
+ [this](size_t i, size_t j) { this->SlowReaderThread(i,
j); });
this->StartThreads(FLAGS_num_updater_threads,
- [this](int i) { this->UpdateThread(i); });
- this->JoinThreads();
+ [this](size_t i, size_t j) { this->UpdateThread(i, j); });
+ NO_FATALS(this->JoinThreads());
+
LOG_TIMING(INFO, "Summing int32 column") {
- uint64_t sum = this->CountSum(shared_ptr<TimeSeries>());
+ uint64_t sum = 0;
+ ASSERT_OK(this->CountSum(shared_ptr<TimeSeries>(), &sum));
LOG(INFO) << "Sum = " << sum;
}
- uint64_t max_rows = this->ClampRowCount(FLAGS_inserts_per_thread *
FLAGS_num_insert_threads)
- / FLAGS_num_insert_threads;
+ const uint64_t max_rows = this->ClampRowCount(
+ FLAGS_inserts_per_thread * FLAGS_num_insert_threads) /
FLAGS_num_insert_threads;
- this->VerifyTestRows(0, max_rows * FLAGS_num_insert_threads);
+ NO_FATALS(this->VerifyTestRows(0, max_rows * FLAGS_num_insert_threads));
}
// Start up a bunch of threads which repeatedly insert and delete the same
@@ -521,26 +561,27 @@ TYPED_TEST(MultiThreadedTabletTest, DeleteAndReinsert) {
FLAGS_flusher_initial_frequency_ms = 1;
FLAGS_tablet_delta_store_major_compact_min_ratio = 0.01F;
FLAGS_tablet_delta_store_minor_compact_max = 10;
- this->StartThreads(1, [this](int i) { this->CollectStatisticsThread(i); });
+ this->StartThreads(1,
+ [this](size_t i, size_t j) {
this->CollectStatisticsThread(i, j); });
this->StartThreads(FLAGS_num_flush_threads,
- [this](int i) { this->FlushThread(i); });
+ [this](size_t i, size_t j) { this->FlushThread(i, j); });
this->StartThreads(FLAGS_num_compact_threads,
- [this](int i) { this->CompactThread(i); });
+ [this](size_t i, size_t j) { this->CompactThread(i, j);
});
this->StartThreads(FLAGS_num_undo_delta_gc_threads,
- [this](int i) { this->DeleteAncientUndoDeltasThread(i);
});
+ [this](size_t i, size_t j) {
this->DeleteAncientUndoDeltasThread(i, j); });
this->StartThreads(FLAGS_num_flush_delta_threads,
- [this](int i) { this->FlushDeltasThread(i); });
+ [this](size_t i, size_t j) { this->FlushDeltasThread(i,
j); });
this->StartThreads(FLAGS_num_minor_compact_deltas_threads,
- [this](int i) { this->MinorCompactDeltasThread(i); });
+ [this](size_t i, size_t j) {
this->MinorCompactDeltasThread(i, j); });
this->StartThreads(FLAGS_num_major_compact_deltas_threads,
- [this](int i) { this->MajorCompactDeltasThread(i); });
+ [this](size_t i, size_t j) {
this->MajorCompactDeltasThread(i, j); });
this->StartThreads(10,
- [this](int i) { this->DeleteAndReinsertCycleThread(i); });
+ [this](size_t i, size_t j) {
this->DeleteAndReinsertCycleThread(i, j); });
this->StartThreads(10,
- [this](int i) { this->StubbornlyUpdateSameRowThread(i);
});
+ [this](size_t i, size_t j) {
this->StubbornlyUpdateSameRowThread(i, j); });
// Run very quickly in dev builds, longer in slow builds.
- float runtime_seconds = AllowSlowTests() ? 2 : 0.1;
+ const float runtime_seconds = AllowSlowTests() ? 2 : 0.1;
Stopwatch sw;
sw.start();
while (sw.elapsed().wall < runtime_seconds * NANOS_PER_SECOND &&
@@ -551,7 +592,7 @@ TYPED_TEST(MultiThreadedTabletTest, DeleteAndReinsert) {
// This is sort of a hack -- the flusher thread stops when it sees this
// countdown latch go to 0.
this->running_insert_count_.Reset(0);
- this->JoinThreads();
+ NO_FATALS(this->JoinThreads());
}
// For tests where we want to use the hybrid clock. The hybrid clock is
@@ -560,7 +601,7 @@ template<class SETUP>
class MultiThreadedHybridClockTabletTest : public
MultiThreadedTabletTest<SETUP> {
public:
MultiThreadedHybridClockTabletTest()
- :
MultiThreadedTabletTest<SETUP>(TabletHarness::Options::ClockType::HYBRID_CLOCK)
{
+ :
MultiThreadedTabletTest<SETUP>(TabletHarness::Options::ClockType::HYBRID_CLOCK)
{
}
};
@@ -579,28 +620,29 @@ TYPED_TEST(MultiThreadedHybridClockTabletTest,
UpdateNoMergeCompaction) {
// Start up our background op threads, targeting the creation of delta files.
this->StartThreads(FLAGS_num_flush_threads,
- [this](int i) { this->FlushThread(i); });
+ [this](size_t i, size_t j) { this->FlushThread(i, j); });
this->StartThreads(FLAGS_num_flush_delta_threads,
- [this](int i) { this->FlushDeltasThread(i); });
+ [this](size_t i, size_t j) { this->FlushDeltasThread(i,
j); });
this->StartThreads(FLAGS_num_major_compact_deltas_threads,
- [this](int i) { this->MajorCompactDeltasThread(i); });
+ [this](size_t i, size_t j) {
this->MajorCompactDeltasThread(i, j); });
this->StartThreads(FLAGS_num_undo_delta_gc_threads,
- [this](int i) { this->DeleteAncientUndoDeltasThread(i);
});
+ [this](size_t i, size_t j) {
this->DeleteAncientUndoDeltasThread(i, j); });
this->StartThreads(FLAGS_num_deleted_rowset_gc_threads,
- [this](int i) {
this->DeleteAncientDeletedRowsetsThreads(i); });
+ [this](size_t i, size_t j) {
+ this->DeleteAncientDeletedRowsetsThreads(i, j); });
// Start our workload threads, targeting the creation of deltas that we can
// eventually GC.
this->StartThreads(10,
- [this](int i) { this->DeleteAndReinsertCycleThread(i); });
+ [this](size_t i, size_t j) {
this->DeleteAndReinsertCycleThread(i, j); });
this->StartThreads(10,
- [this](int i) { this->StubbornlyUpdateSameRowThread(i);
});
+ [this](size_t i, size_t j) {
this->StubbornlyUpdateSameRowThread(i, j); });
// For good measure, we'll also start a thread that scans.
this->StartThreads(FLAGS_num_summer_threads,
- [this](int i) { this->SummerThread(i); });
+ [this](size_t i, size_t j) { this->SummerThread(i, j); });
// Run very quickly in dev builds, longer in slow builds.
- float runtime_seconds = AllowSlowTests() ? 2 : 0.1;
+ const float runtime_seconds = AllowSlowTests() ? 2 : 0.1;
Stopwatch sw;
sw.start();
while (sw.elapsed().wall < runtime_seconds * NANOS_PER_SECOND &&
@@ -611,7 +653,7 @@ TYPED_TEST(MultiThreadedHybridClockTabletTest,
UpdateNoMergeCompaction) {
// This is sort of a hack -- the flusher thread stops when it sees this
// countdown latch go to 0.
this->running_insert_count_.Reset(0);
- this->JoinThreads();
+ NO_FATALS(this->JoinThreads());
}
} // namespace tablet