This is an automated email from the ASF dual-hosted git repository. adar pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit b9c429c6831cf7aa948490136eeedc984cf7e4c1 Author: oclarms <[email protected]> AuthorDate: Wed Aug 14 11:43:51 2019 +0800 [tablet] Fixed the bug of DeltaTracker::CountDeletedRows When Tablet.CountLiveRows was called in a multi-thread case, there's a chance we'll see the following failure. User stack: F0814 12:05:51.975797 96375 diskrowset.cc:759] Check failed: *count >= 0 (-3 vs. 0) *** Check failure stack trace: *** *** Aborted at 1565755551 (unix time) try "date -d @1565755551" if you are using GNU date *** PC: @ 0x7f9bd20425f7 __GI_raise *** SIGABRT (@0x70900017872) received by PID 96370 (TID 0x7f9bce2d7700) from PID 96370; stack trace: *** @ 0x7f9bdaff6100 (unknown) @ 0x7f9bd20425f7 __GI_raise @ 0x7f9bd2043ce8 __GI_abort @ 0x7f9bd4540c99 google::logging_fail() @ 0x7f9bd454246d google::LogMessage::Fail() @ 0x7f9bd45443c3 google::LogMessage::SendToLog() @ 0x7f9bd4541fc9 google::LogMessage::Flush() @ 0x7f9bd4544d4f google::LogMessageFatal::~LogMessageFatal() @ 0x7f9bddc9aabe kudu::tablet::DiskRowSet::CountLiveRows() @ 0x7f9bddbdeb79 kudu::tablet::Tablet::CountLiveRows() @ 0x49891f kudu::tablet::MultiThreadedTabletTest<>::CollectStatisticsThread() @ 0x4ae34b boost::_mfi::mf1<>::operator()() @ 0x4add25 boost::_bi::list2<>::operator()<>() @ 0x4acfe9 boost::_bi::bind_t<>::operator()() @ 0x4ac8a6 boost::detail::function::void_function_obj_invoker0<>::invoke() @ 0x7f9bd7116492 boost::function0<>::operator()() @ 0x7f9bd62e5324 kudu::Thread::SuperviseThread() @ 0x7f9bdafeedc5 start_thread @ 0x7f9bd2103ced __clone This is because there is DeltaTracker lack of lock protection when modify the number of live rows in rowset_metadata_ and reset the deleted_row_count_. This caused deleted_row_count_ to be duplicated when calculating the number of live rows of DRS. Consider the following sequence: | T1 | T2 |---------- |---------- |+ In DT::Flush | | Take compact_flush_lock_ (excl) | | Take component_lock_ (excl) | | deleted_row_count_ = ... | | Release component_lock_ | | + In DT::FlushDMS | | Call RSMD::IncrementLiveRows | | --> RSMD::live_row_count - deleted_row_count_ | |+ In DRS::CountLiveRows | | Take component_lock_ (shared) | | Call RSMD::live_row_count - DT::CountDeletedRows | | --> RSMD::live_row_count - deleted_row_count_ | | --> we double counted deleted_row_count_ !!! | Take component_lock_ (excl) | | deleted_row_count_ = 0 | | Release component_lock_ | | Release compact_flush_lock_ | Change-Id: I9bb4456123087778c9dc799777c5990938a84fdf Reviewed-on: http://gerrit.cloudera.org:8080/14061 Reviewed-by: Adar Dembo <[email protected]> Tested-by: Adar Dembo <[email protected]> --- src/kudu/integration-tests/raft_consensus-itest.cc | 36 ++++- src/kudu/integration-tests/test_workload.cc | 153 ++++++++++++--------- src/kudu/integration-tests/test_workload.h | 20 +++ src/kudu/tablet/delta_tracker.cc | 15 +- src/kudu/tablet/delta_tracker.h | 2 +- src/kudu/tablet/diskrowset.cc | 2 +- src/kudu/tablet/metadata-test.cc | 2 +- src/kudu/tablet/mt-tablet-test.cc | 9 +- src/kudu/tablet/rowset_metadata.cc | 10 +- src/kudu/tablet/rowset_metadata.h | 8 +- 10 files changed, 177 insertions(+), 80 deletions(-) diff --git a/src/kudu/integration-tests/raft_consensus-itest.cc b/src/kudu/integration-tests/raft_consensus-itest.cc index 4e77708..0c2c114 100644 --- a/src/kudu/integration-tests/raft_consensus-itest.cc +++ b/src/kudu/integration-tests/raft_consensus-itest.cc @@ -220,7 +220,7 @@ class RaftConsensusITest : public RaftConsensusITestBase { void AssertMajorityRequiredForElectionsAndWrites(const TabletServerMap& tablet_servers, const string& leader_uuid); - void CreateClusterForCrashyNodesTests(); + void CreateClusterForCrashyNodesTests(vector<string> extra_ts_flags = {}); void DoTestCrashyNodes(TestWorkload* workload, int max_rows_to_insert); // Prepare for a test where a single replica of a 3-server cluster is left @@ -528,7 +528,7 @@ void RaftConsensusITest::AssertMajorityRequiredForElectionsAndWrites( MonoDelta::FromSeconds(10))); } -void RaftConsensusITest::CreateClusterForCrashyNodesTests() { +void RaftConsensusITest::CreateClusterForCrashyNodesTests(vector<string> extra_ts_flags) { if (AllowSlowTests()) { FLAGS_num_tablet_servers = 7; FLAGS_num_replicas = 7; @@ -555,6 +555,8 @@ void RaftConsensusITest::CreateClusterForCrashyNodesTests() { // log area. ts_flags.emplace_back("--log_preallocate_segments=false"); + ts_flags.insert(ts_flags.end(), extra_ts_flags.begin(), extra_ts_flags.end()); + NO_FATALS(CreateCluster("raft_consensus-itest-crashy-nodes-cluster", std::move(ts_flags))); } @@ -608,7 +610,7 @@ void RaftConsensusITest::DoTestCrashyNodes(TestWorkload* workload, int max_rows_ NO_FATALS(v.CheckCluster()); NO_FATALS(v.CheckRowCount(workload->table_name(), ClusterVerifier::EXACTLY, - workload->rows_inserted())); + workload->rows_inserted() - workload->rows_deleted())); } void RaftConsensusITest::SetupSingleReplicaTest(TServerDetails** replica_ts) { @@ -952,6 +954,34 @@ TEST_F(RaftConsensusITest, InsertDuplicateKeysWithCrashyNodes) { NO_FATALS(DoTestCrashyNodes(&workload, 300)); } +// The same crashy nodes test as above but the keys will be deleted after insertion. +TEST_F(RaftConsensusITest, InsertAndDeleteWithCrashyNodes) { + vector<string> extra_ts_flags = { + "--flush_threshold_mb=0", + "--flush_threshold_secs=1", + "--maintenance_manager_polling_interval_ms=10", + "--heartbeat_interval_ms=10", + "--update_tablet_stats_interval_ms=10", + }; + NO_FATALS(CreateClusterForCrashyNodesTests(std::move(extra_ts_flags))); + + // If the AllowSlowTests is true, test the scenario that deleting data on DRS. + // Otherwise, test deleting data on MRS. + int32_t write_interval_millis = 0; + int32_t write_batch_size = 5; + if (AllowSlowTests()) { + // Wait for MRS to be flushed. + write_interval_millis = 1000; + // Decrease the number of rows per batch to generate more DRSs. + write_batch_size = 1; + } + + TestWorkload workload(cluster_.get()); + workload.set_write_pattern(TestWorkload::INSERT_RANDOM_ROWS_WITH_DELETE); + workload.set_write_interval_millis(write_interval_millis); + workload.set_write_batch_size(write_batch_size); + NO_FATALS(DoTestCrashyNodes(&workload, 100)); +} TEST_F(RaftConsensusITest, MultiThreadedInsertWithFailovers) { int kNumElections = FLAGS_num_replicas; diff --git a/src/kudu/integration-tests/test_workload.cc b/src/kudu/integration-tests/test_workload.cc index 7b9863e..eb2bb70 100644 --- a/src/kudu/integration-tests/test_workload.cc +++ b/src/kudu/integration-tests/test_workload.cc @@ -17,7 +17,6 @@ #include "kudu/integration-tests/test_workload.h" -#include <cstddef> #include <memory> #include <ostream> @@ -43,6 +42,8 @@ namespace kudu { using client::KuduClient; using client::KuduColumnSchema; +using client::KuduDelete; +using client::KuduError; using client::KuduInsert; using client::KuduScanBatch; using client::KuduScanner; @@ -54,6 +55,8 @@ using client::KuduUpdate; using client::sp::shared_ptr; using cluster::MiniCluster; +using std::vector; + const char* const TestWorkload::kDefaultTableName = "test-workload"; TestWorkload::TestWorkload(MiniCluster* cluster) @@ -65,6 +68,7 @@ TestWorkload::TestWorkload(MiniCluster* cluster) // high-stress workloads. read_timeout_millis_(60000), write_batch_size_(50), + write_interval_millis_(0), write_timeout_millis_(20000), timeout_allowed_(false), not_found_allowed_(false), @@ -77,6 +81,7 @@ TestWorkload::TestWorkload(MiniCluster* cluster) start_latch_(0), should_run_(false), rows_inserted_(0), + rows_deleted_(0), batches_completed_(0), sequential_key_gen_(0) { // Make the default write pattern random row inserts. @@ -87,11 +92,11 @@ TestWorkload::~TestWorkload() { StopAndJoin(); } -void TestWorkload::set_schema(const client::KuduSchema& schema) { +void TestWorkload::set_schema(const KuduSchema& schema) { // Do some sanity checks on the schema. They reflect how the rest of // TestWorkload is going to use the schema. CHECK_GT(schema.num_columns(), 0) << "Schema should have at least one column"; - std::vector<int> key_indexes; + vector<int> key_indexes; schema.GetPrimaryKeyColumnIndexes(&key_indexes); CHECK_LE(1, key_indexes.size()) << "Schema should have at least one key column"; CHECK_EQ(0, key_indexes[0]) << "Schema's first key column should be index 0"; @@ -137,73 +142,69 @@ void TestWorkload::WriteThread() { while (should_run_.Load()) { int inserted = 0; - for (int i = 0; i < write_batch_size_; i++) { - if (write_pattern_ == UPDATE_ONE_ROW) { - gscoped_ptr<KuduUpdate> update(table->NewUpdate()); - KuduPartialRow* row = update->mutable_row(); - tools::GenerateDataForRow(schema_, 0, &rng_, row); - CHECK_OK(session->Apply(update.release())); - } else { - gscoped_ptr<KuduInsert> insert(table->NewInsert()); - KuduPartialRow* row = insert->mutable_row(); - int32_t key; - if (write_pattern_ == INSERT_SEQUENTIAL_ROWS) { - key = sequential_key_gen_.Increment(); + int deleted = 0; + vector<int32_t> keys; + // Write insert or update row to cluster. + { + for (int i = 0; i < write_batch_size_; i++) { + if (write_pattern_ == UPDATE_ONE_ROW) { + gscoped_ptr<KuduUpdate> update(table->NewUpdate()); + KuduPartialRow* row = update->mutable_row(); + tools::GenerateDataForRow(schema_, 0, &rng_, row); + CHECK_OK(session->Apply(update.release())); } else { - key = rng_.Next(); - if (write_pattern_ == INSERT_WITH_MANY_DUP_KEYS) { - key %= kNumRowsForDuplicateKeyWorkload; + gscoped_ptr<KuduInsert> insert(table->NewInsert()); + KuduPartialRow* row = insert->mutable_row(); + int32_t key; + if (write_pattern_ == INSERT_SEQUENTIAL_ROWS) { + key = sequential_key_gen_.Increment(); + } else { + key = rng_.Next(); + if (write_pattern_ == INSERT_WITH_MANY_DUP_KEYS) { + key %= kNumRowsForDuplicateKeyWorkload; + } } - } - - tools::GenerateDataForRow(schema_, key, &rng_, row); - if (payload_bytes_) { - // Note: overriding payload_bytes_ requires the "simple" schema. - std::string test_payload(payload_bytes_.get(), '0'); - CHECK_OK(row->SetStringCopy(2, test_payload)); - } - CHECK_OK(session->Apply(insert.release())); - inserted++; - } - } - - Status s = session->Flush(); - - if (PREDICT_FALSE(!s.ok())) { - std::vector<client::KuduError*> errors; - ElementDeleter d(&errors); - bool overflow; - session->GetPendingErrors(&errors, &overflow); - CHECK(!overflow); - for (const client::KuduError* e : errors) { - if (timeout_allowed_ && e->status().IsTimedOut()) { - continue; - } - - if (not_found_allowed_ && e->status().IsNotFound()) { - continue; - } + keys.push_back(key); - if (already_present_allowed_ && e->status().IsAlreadyPresent()) { - continue; - } - - if (network_error_allowed_ && e->status().IsNetworkError()) { - continue; - } + tools::GenerateDataForRow(schema_, key, &rng_, row); + if (payload_bytes_) { + // Note: overriding payload_bytes_ requires the "simple" schema. + std::string test_payload(payload_bytes_.get(), '0'); + CHECK_OK(row->SetStringCopy(2, test_payload)); + } + CHECK_OK(session->Apply(insert.release())); - if (remote_error_allowed_ && e->status().IsRemoteError()) { - continue; + inserted++; } - - LOG(FATAL) << e->status().ToString(); } - inserted -= errors.size(); + Status s = session->Flush(); + if (PREDICT_FALSE(!s.ok())) { + inserted -= GetNumberOfErrors(session.get()); + } + if (inserted > 0) { + rows_inserted_.IncrementBy(inserted); + batches_completed_.Increment(); + } } - - if (inserted > 0) { - rows_inserted_.IncrementBy(inserted); - batches_completed_.Increment(); + if (PREDICT_FALSE(write_interval_millis_ > 0)) { + SleepFor(MonoDelta::FromMilliseconds(write_interval_millis_)); + } + // Write delete row to cluster. + if (write_pattern_ == INSERT_RANDOM_ROWS_WITH_DELETE) { + for (auto key : keys) { + gscoped_ptr<KuduDelete> op(table->NewDelete()); + KuduPartialRow* row = op->mutable_row(); + tools::WriteValueToColumn(schema_, 0, key, row); + CHECK_OK(session->Apply(op.release())); + deleted++; + } + Status s = session->Flush(); + if (PREDICT_FALSE(!s.ok())) { + deleted -= GetNumberOfErrors(session.get()); + } + if (deleted > 0) { + rows_deleted_.IncrementBy(deleted); + } } } } @@ -220,7 +221,10 @@ void TestWorkload::ReadThread() { CHECK_OK(scanner.SetTimeoutMillis(read_timeout_millis_)); CHECK_OK(scanner.SetFaultTolerant()); - int64_t expected_row_count = rows_inserted_.Load(); + // Note: when INSERT_RANDOM_ROWS_WITH_DELETE is used, ReadThread doesn't really verify + // anything except that a scan works. + int64_t expected_row_count = write_pattern_ == INSERT_RANDOM_ROWS_WITH_DELETE ? + 0 : rows_inserted_.Load(); size_t row_count = 0; CHECK_OK(scanner.Open()); @@ -234,6 +238,25 @@ void TestWorkload::ReadThread() { } } +size_t TestWorkload::GetNumberOfErrors(KuduSession* session) { + vector<KuduError*> errors; + ElementDeleter d(&errors); + bool overflow; + session->GetPendingErrors(&errors, &overflow); + CHECK(!overflow); + for (const KuduError* e : errors) { + if ((timeout_allowed_ && e->status().IsTimedOut()) || + (not_found_allowed_ && e->status().IsNotFound()) || + (already_present_allowed_ && e->status().IsAlreadyPresent()) || + (network_error_allowed_ && e->status().IsNetworkError()) || + (remote_error_allowed_ && e->status().IsRemoteError())) { + continue; + } + LOG(FATAL) << e->status().ToString(); + } + return errors.size(); +} + shared_ptr<KuduClient> TestWorkload::CreateClient() { CHECK_OK(cluster_->CreateClient(&client_builder_, &client_)); return client_; @@ -259,7 +282,7 @@ void TestWorkload::Setup() { if (!table_exists) { // Create split rows. - std::vector<const KuduPartialRow*> splits; + vector<const KuduPartialRow*> splits; for (int i = 1; i < num_tablets_; i++) { KuduPartialRow* r = schema_.NewRow(); CHECK_OK(r->SetInt32("key", MathLimits<int32_t>::kMax / num_tablets_ * i)); diff --git a/src/kudu/integration-tests/test_workload.h b/src/kudu/integration-tests/test_workload.h index 689f139..00a35af 100644 --- a/src/kudu/integration-tests/test_workload.h +++ b/src/kudu/integration-tests/test_workload.h @@ -17,6 +17,7 @@ #ifndef KUDU_INTEGRATION_TESTS_TEST_WORKLOAD_H #define KUDU_INTEGRATION_TESTS_TEST_WORKLOAD_H +#include <cstddef> #include <cstdint> #include <ostream> #include <string> @@ -76,6 +77,10 @@ class TestWorkload { write_batch_size_ = s; } + void set_write_interval_millis(int t) { + write_interval_millis_ = t; + } + void set_client_default_rpc_timeout_millis(int t) { client_builder_.default_rpc_timeout(MonoDelta::FromMilliseconds(t)); } @@ -149,6 +154,11 @@ class TestWorkload { // duplicate, but with 32-bit keys, they won't be frequent. INSERT_RANDOM_ROWS, + // Insert random rows, then delete them. + // This may cause an occasional duplicate, but with 32-bit keys, they won't be frequent. + // This requires two flush operations. + INSERT_RANDOM_ROWS_WITH_DELETE, + // All threads generate updates against a single row. UPDATE_ONE_ROW, @@ -170,6 +180,7 @@ class TestWorkload { set_already_present_allowed(true); break; case INSERT_RANDOM_ROWS: + case INSERT_RANDOM_ROWS_WITH_DELETE: case UPDATE_ONE_ROW: case INSERT_SEQUENTIAL_ROWS: set_already_present_allowed(false); @@ -199,6 +210,12 @@ class TestWorkload { return rows_inserted_.Load(); } + // Return the number of rows deleted so far. This may be called either + // during or after the write workload. + int64_t rows_deleted() const { + return rows_deleted_.Load(); + } + // Return the number of batches in which we have successfully inserted at // least one row. // NOTE: it is not safe to assume that this is exactly equal to the number @@ -214,6 +231,7 @@ class TestWorkload { void OpenTable(client::sp::shared_ptr<client::KuduTable>* table); void WriteThread(); void ReadThread(); + size_t GetNumberOfErrors(client::KuduSession* session); cluster::MiniCluster* cluster_; client::KuduClientBuilder client_builder_; @@ -225,6 +243,7 @@ class TestWorkload { int num_read_threads_; int read_timeout_millis_; int write_batch_size_; + int write_interval_millis_; int write_timeout_millis_; bool timeout_allowed_; bool not_found_allowed_; @@ -241,6 +260,7 @@ class TestWorkload { CountDownLatch start_latch_; AtomicBool should_run_; AtomicInt<int64_t> rows_inserted_; + AtomicInt<int64_t> rows_deleted_; AtomicInt<int64_t> batches_completed_; AtomicInt<int32_t> sequential_key_gen_; diff --git a/src/kudu/tablet/delta_tracker.cc b/src/kudu/tablet/delta_tracker.cc index 4ea8684..770e3aa 100644 --- a/src/kudu/tablet/delta_tracker.cc +++ b/src/kudu/tablet/delta_tracker.cc @@ -744,10 +744,16 @@ Status DeltaTracker::FlushDMS(DeltaMemStore* dms, dfr)); VLOG_WITH_PREFIX(1) << "Opened new delta block " << block_id.ToString() << " for read"; - // Merge the deleted row count of the old DMS to the RowSetMetadata if necessary. - rowset_metadata_->IncrementLiveRows(-deleted_row_count_); - - RETURN_NOT_OK(rowset_metadata_->CommitRedoDeltaDataBlock(dms->id(), block_id)); + { + // Merge the deleted row count of the old DMS to the RowSetMetadata + // and reset deleted_row_count_ should be atomic, so we lock the + // component_lock_ in exclusive mode. + std::lock_guard<rw_spinlock> lock(component_lock_); + RETURN_NOT_OK(rowset_metadata_->CommitRedoDeltaDataBlock(dms->id(), + deleted_row_count_, + block_id)); + deleted_row_count_ = 0; + } if (flush_type == FLUSH_METADATA) { RETURN_NOT_OK_PREPEND(rowset_metadata_->Flush(), Substitute("Unable to commit Delta block metadata for: $0", @@ -814,7 +820,6 @@ Status DeltaTracker::Flush(const IOContext* io_context, MetadataFlushType flush_ CHECK_EQ(redo_delta_stores_[idx], old_dms) << "Another thread modified the delta store list during flush"; redo_delta_stores_[idx] = dfr; - deleted_row_count_ = 0; } return Status::OK(); diff --git a/src/kudu/tablet/delta_tracker.h b/src/kudu/tablet/delta_tracker.h index 072b6e6..a4ea0a1 100644 --- a/src/kudu/tablet/delta_tracker.h +++ b/src/kudu/tablet/delta_tracker.h @@ -373,7 +373,7 @@ class DeltaTracker { // TODO(perf): this needs to be more fine grained mutable Mutex compact_flush_lock_; - // Number of deleted rows for a DMS that is currently being flushed. + // Number of deleted rows for a DMS that is currently being flushed. // When the flush completes, this is merged into the RowSetMetadata // and reset. int64_t deleted_row_count_; diff --git a/src/kudu/tablet/diskrowset.cc b/src/kudu/tablet/diskrowset.cc index 8143fc3..b2395a9 100644 --- a/src/kudu/tablet/diskrowset.cc +++ b/src/kudu/tablet/diskrowset.cc @@ -467,7 +467,7 @@ Status RollingDiskRowSetWriter::FinishCurrentWriter() { s = cur_redo_writer_->FinishAndReleaseBlock(block_transaction_.get()); if (!s.IsAborted()) { RETURN_NOT_OK(s); - cur_drs_metadata_->CommitRedoDeltaDataBlock(0, cur_redo_ds_block_id_); + cur_drs_metadata_->CommitRedoDeltaDataBlock(0, 0, cur_redo_ds_block_id_); } else { DCHECK_EQ(cur_redo_delta_stats->min_timestamp(), Timestamp::kMax); } diff --git a/src/kudu/tablet/metadata-test.cc b/src/kudu/tablet/metadata-test.cc index ec77b00..e67529d 100644 --- a/src/kudu/tablet/metadata-test.cc +++ b/src/kudu/tablet/metadata-test.cc @@ -46,7 +46,7 @@ class MetadataTest : public KuduTest { tablet_meta_ = new TabletMetadata(nullptr, "fake-tablet"); CHECK_OK(RowSetMetadata::CreateNew(tablet_meta_.get(), 0, &meta_)); for (int i = 0; i < all_blocks_.size(); i++) { - CHECK_OK(meta_->CommitRedoDeltaDataBlock(i, all_blocks_[i])); + CHECK_OK(meta_->CommitRedoDeltaDataBlock(i, 0, all_blocks_[i])); } CHECK_EQ(4, meta_->redo_delta_blocks().size()); } diff --git a/src/kudu/tablet/mt-tablet-test.cc b/src/kudu/tablet/mt-tablet-test.cc index 2e3afaa..dab3913 100644 --- a/src/kudu/tablet/mt-tablet-test.cc +++ b/src/kudu/tablet/mt-tablet-test.cc @@ -35,6 +35,7 @@ #include "kudu/common/rowblock.h" #include "kudu/common/rowid.h" #include "kudu/common/schema.h" +#include "kudu/gutil/basictypes.h" #include "kudu/gutil/ref_counted.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/tablet/local_tablet_writer.h" @@ -408,13 +409,18 @@ class MultiThreadedTabletTest : public TabletTestBase<SETUP> { "num_rowsets"); shared_ptr<TimeSeries> memrowset_size_ts = ts_collector_.GetTimeSeries( "memrowset_kb"); + shared_ptr<TimeSeries> num_live_rows_ts = ts_collector_.GetTimeSeries( + "num_live_rows"); while (running_insert_count_.count() > 0) { num_rowsets_ts->SetValue(tablet()->num_rowsets()); memrowset_size_ts->SetValue(tablet()->MemRowSetSize() / 1024.0); + int64_t num_live_rows; + ignore_result(tablet()->CountLiveRows(&num_live_rows)); + num_live_rows_ts->SetValue(num_live_rows); // Wait, unless the inserters are all done. - running_insert_count_.WaitFor(MonoDelta::FromMilliseconds(250)); + running_insert_count_.WaitFor(MonoDelta::FromMilliseconds(10)); } } @@ -491,6 +497,7 @@ TYPED_TEST(MultiThreadedTabletTest, DeleteAndReinsert) { FLAGS_flusher_initial_frequency_ms = 1; FLAGS_tablet_delta_store_major_compact_min_ratio = 0.01f; FLAGS_tablet_delta_store_minor_compact_max = 10; + this->StartThreads(1, &TestFixture::CollectStatisticsThread); this->StartThreads(FLAGS_num_flush_threads, &TestFixture::FlushThread); this->StartThreads(FLAGS_num_compact_threads, &TestFixture::CompactThread); this->StartThreads(FLAGS_num_undo_delta_gc_threads, &TestFixture::DeleteAncientUndoDeltasThread); diff --git a/src/kudu/tablet/rowset_metadata.cc b/src/kudu/tablet/rowset_metadata.cc index 6bd19ce..6c9705c 100644 --- a/src/kudu/tablet/rowset_metadata.cc +++ b/src/kudu/tablet/rowset_metadata.cc @@ -187,10 +187,12 @@ void RowSetMetadata::SetColumnDataBlocks(const std::map<ColumnId, BlockId>& bloc } Status RowSetMetadata::CommitRedoDeltaDataBlock(int64_t dms_id, + int64_t num_deleted_rows, const BlockId& block_id) { std::lock_guard<LockType> l(lock_); last_durable_redo_dms_id_ = dms_id; redo_delta_blocks_.push_back(block_id); + IncrementLiveRowsUnlocked(-num_deleted_rows); return Status::OK(); } @@ -278,14 +280,18 @@ void RowSetMetadata::CommitUpdate(const RowSetMetadataUpdate& update, blocks_by_col_id_.shrink_to_fit(); } -void RowSetMetadata::IncrementLiveRows(int64_t row_count) { +void RowSetMetadata::IncrementLiveRowsUnlocked(int64_t row_count) { if (tablet_metadata_->supports_live_row_count() && row_count != 0) { - std::lock_guard<LockType> l(lock_); live_row_count_ += row_count; DCHECK_GE(live_row_count_, 0); } } +void RowSetMetadata::IncrementLiveRows(int64_t row_count) { + std::lock_guard<LockType> l(lock_); + IncrementLiveRowsUnlocked(row_count); +} + int64_t RowSetMetadata::live_row_count() const { std::lock_guard<LockType> l(lock_); DCHECK_GE(live_row_count_, 0); diff --git a/src/kudu/tablet/rowset_metadata.h b/src/kudu/tablet/rowset_metadata.h index 94af6f3..7f67b63 100644 --- a/src/kudu/tablet/rowset_metadata.h +++ b/src/kudu/tablet/rowset_metadata.h @@ -117,7 +117,11 @@ class RowSetMetadata { void SetColumnDataBlocks(const std::map<ColumnId, BlockId>& blocks_by_col_id); - Status CommitRedoDeltaDataBlock(int64_t dms_id, const BlockId& block_id); + // Atomically commit the new redo delta block to RowSetMetadata. + // This atomic operation includes updates to last_durable_redo_dms_id_ and live_row_count_. + Status CommitRedoDeltaDataBlock(int64_t dms_id, + int64_t num_deleted_rows, + const BlockId& block_id); Status CommitUndoDeltaDataBlock(const BlockId& block_id); @@ -256,6 +260,8 @@ class RowSetMetadata { Status InitFromPB(const RowSetDataPB& pb); + void IncrementLiveRowsUnlocked(int64_t row_count); + TabletMetadata* const tablet_metadata_; bool initted_; int64_t id_;
