This is an automated email from the ASF dual-hosted git repository. granthenke pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 8973a285eaebee0736a6dac29de5eca39746eb0c Author: helifu <[email protected]> AuthorDate: Tue Jul 9 19:11:23 2019 +0800 KUDU-2855 Lazy-create DeltaMemStore on first update This patch supports lazy-create DeltaMemStore on first update to save memory and to fast-path out any DMS-related code. The created DeltaMemStore will be destroyed on the following flush. Change-Id: Ie0c565d86647d5144266b30aa6e8572d42db48c6 Reviewed-on: http://gerrit.cloudera.org:8080/13821 Tested-by: Kudu Jenkins Reviewed-by: Adar Dembo <[email protected]> --- src/kudu/tablet/delta_tracker.cc | 84 +++++++++++++++++++++------------- src/kudu/tablet/delta_tracker.h | 18 +++++--- src/kudu/tablet/diskrowset-test-base.h | 6 ++- src/kudu/tablet/diskrowset-test.cc | 7 +-- 4 files changed, 72 insertions(+), 43 deletions(-) diff --git a/src/kudu/tablet/delta_tracker.cc b/src/kudu/tablet/delta_tracker.cc index 9400e32..4ea8684 100644 --- a/src/kudu/tablet/delta_tracker.cc +++ b/src/kudu/tablet/delta_tracker.cc @@ -95,7 +95,8 @@ DeltaTracker::DeltaTracker(shared_ptr<RowSetMetadata> rowset_metadata, read_only_(false), log_anchor_registry_(log_anchor_registry), mem_trackers_(std::move(mem_trackers)), - dms_empty_(true), + next_dms_id_(rowset_metadata_->last_durable_redo_dms_id() + 1), + dms_exists_(false), deleted_row_count_(0) {} Status DeltaTracker::OpenDeltaReaders(const vector<BlockId>& blocks, @@ -151,15 +152,22 @@ Status DeltaTracker::DoOpen(const IOContext* io_context) { &undo_delta_stores_, UNDO)); - // the id of the first DeltaMemStore is the max id of the current ones +1 - RETURN_NOT_OK(DeltaMemStore::Create(rowset_metadata_->last_durable_redo_dms_id() + 1, + open_ = true; + return Status::OK(); +} + +Status DeltaTracker::CreateAndInitDMSUnlocked(const fs::IOContext* io_context) { + DCHECK(component_lock_.is_write_locked()); + shared_ptr<DeltaMemStore> dms; + RETURN_NOT_OK(DeltaMemStore::Create(next_dms_id_++, rowset_metadata_->id(), log_anchor_registry_, mem_trackers_.dms_tracker, - &dms_)); - RETURN_NOT_OK(dms_->Init(io_context)); + &dms)); + RETURN_NOT_OK(dms->Init(io_context)); - open_ = true; + dms_.swap(dms); + dms_exists_.Store(true); return Status::OK(); } @@ -581,7 +589,9 @@ void DeltaTracker::CollectStores(vector<shared_ptr<DeltaStore>>* deltas, } if (which != UNDOS_ONLY) { deltas->insert(deltas->end(), redo_delta_stores_.begin(), redo_delta_stores_.end()); - deltas->push_back(dms_); + if (dms_exists_.Load() && !dms_->Empty()) { + deltas->push_back(dms_); + } } } @@ -637,17 +647,32 @@ Status DeltaTracker::Update(Timestamp timestamp, const RowChangeList &update, const consensus::OpId& op_id, OperationResultPB* result) { - // TODO(todd): can probably lock this more fine-grained. - shared_lock<rw_spinlock> lock(component_lock_); + Status s; + while (true) { + if (!dms_exists_.Load()) { + std::lock_guard<rw_spinlock> lock(component_lock_); + // Should check dms_exists_ here in case multiple threads are blocked. + if (!dms_exists_.Load()) { + RETURN_NOT_OK(CreateAndInitDMSUnlocked(nullptr)); + } + } - Status s = dms_->Update(timestamp, row_idx, update, op_id); - if (s.ok()) { - dms_empty_.Store(false); + // TODO(todd): can probably lock this more fine-grained. + shared_lock<rw_spinlock> lock(component_lock_); - MemStoreTargetPB* target = result->add_mutated_stores(); - target->set_rs_id(rowset_metadata_->id()); - target->set_dms_id(dms_->id()); + // Should check dms_exists_ here again since there is a gap + // between the two critical sections defined by component_lock_. + if (!dms_exists_.Load()) continue; + + s = dms_->Update(timestamp, row_idx, update, op_id); + if (s.ok()) { + MemStoreTargetPB* target = result->add_mutated_stores(); + target->set_rs_id(rowset_metadata_->id()); + target->set_dms_id(dms_->id()); + } + break; } + return s; } @@ -655,12 +680,13 @@ Status DeltaTracker::CheckRowDeleted(rowid_t row_idx, const IOContext* io_contex bool *deleted, ProbeStats* stats) const { shared_lock<rw_spinlock> lock(component_lock_); - *deleted = false; // Check if the row has a deletion in DeltaMemStore. - RETURN_NOT_OK(dms_->CheckRowDeleted(row_idx, io_context, deleted)); - if (*deleted) { - return Status::OK(); + if (dms_exists_.Load()) { + RETURN_NOT_OK(dms_->CheckRowDeleted(row_idx, io_context, deleted)); + if (*deleted) { + return Status::OK(); + } } // Then check backwards through the list of trackers. @@ -744,17 +770,11 @@ Status DeltaTracker::Flush(const IOContext* io_context, MetadataFlushType flush_ // This shuts out any concurrent readers or writers. std::lock_guard<rw_spinlock> lock(component_lock_); - count = dms_->Count(); + count = dms_exists_.Load() ? dms_->Count() : 0; - // Swap the DeltaMemStore to use the new schema - old_dms = dms_; - RETURN_NOT_OK(DeltaMemStore::Create(old_dms->id() + 1, - rowset_metadata_->id(), - log_anchor_registry_, - mem_trackers_.dms_tracker, - &dms_)); - RETURN_NOT_OK(dms_->Init(nullptr)); - dms_empty_.Store(true); + // Swap the DeltaMemStore and dms_ is null now. + old_dms.swap(dms_); + dms_exists_.Store(false); if (count == 0) { // No need to flush if there are no deltas. @@ -802,12 +822,12 @@ Status DeltaTracker::Flush(const IOContext* io_context, MetadataFlushType flush_ size_t DeltaTracker::DeltaMemStoreSize() const { shared_lock<rw_spinlock> lock(component_lock_); - return dms_->EstimateSize(); + return dms_exists_.Load() ? dms_->EstimateSize() : 0; } int64_t DeltaTracker::MinUnflushedLogIndex() const { shared_lock<rw_spinlock> lock(component_lock_); - return dms_->MinLogIndex(); + return dms_exists_.Load() ? dms_->MinLogIndex() : 0; } size_t DeltaTracker::CountUndoDeltaStores() const { @@ -871,7 +891,7 @@ Status DeltaTracker::InitAllDeltaStoresForTests(WhichStores stores) { int64_t DeltaTracker::CountDeletedRows() const { shared_lock<rw_spinlock> lock(component_lock_); DCHECK_GE(deleted_row_count_, 0); - return deleted_row_count_ + dms_->deleted_row_count(); + return deleted_row_count_ + (dms_exists_.Load() ? dms_->deleted_row_count() : 0); } string DeltaTracker::LogPrefix() const { diff --git a/src/kudu/tablet/delta_tracker.h b/src/kudu/tablet/delta_tracker.h index f1dabb5..072b6e6 100644 --- a/src/kudu/tablet/delta_tracker.h +++ b/src/kudu/tablet/delta_tracker.h @@ -230,9 +230,9 @@ class DeltaTracker { // Get the delta MemStore's size in bytes, including pre-allocation. size_t DeltaMemStoreSize() const; - // Returns true if the DMS has no entries. This doesn't rely on the size. + // Returns true if the DMS doesn't exist. This doesn't rely on the size. bool DeltaMemStoreEmpty() const { - return dms_empty_.Load(); + return !dms_exists_.Load(); } // Get the minimum log index for this tracker's DMS, -1 if it wasn't set. @@ -322,6 +322,8 @@ class DeltaTracker { std::string LogPrefix() const; + Status CreateAndInitDMSUnlocked(const fs::IOContext* io_context); + std::shared_ptr<RowSetMetadata> rowset_metadata_; bool open_; @@ -337,6 +339,8 @@ class DeltaTracker { TabletMemTrackers mem_trackers_; + int64_t next_dms_id_; + // The current DeltaMemStore into which updates should be written. std::shared_ptr<DeltaMemStore> dms_; // The set of tracked REDO delta stores, in increasing timestamp order. @@ -345,12 +349,14 @@ class DeltaTracker { SharedDeltaStoreVector undo_delta_stores_; // The maintenance scheduler calls DeltaMemStoreEmpty() a lot. - // We cache this here to avoid having to take component_lock_ - // in order to satisfy this call. - AtomicBool dms_empty_; + // We use an atomic variable to indicate whether DMS exists or not and + // to avoid having to take component_lock_ in order to satisfy this call. + AtomicBool dms_exists_; // read-write lock protecting dms_ and {redo,undo}_delta_stores_. - // - Readers and mutators take this lock in shared mode. + // - Readers take this lock in shared mode. + // - Mutators take this lock in exclusive mode if they need to create + // a new DMS, and shared mode otherwise. // - Flushers take this lock in exclusive mode before they modify the // structure of the rowset. // diff --git a/src/kudu/tablet/diskrowset-test-base.h b/src/kudu/tablet/diskrowset-test-base.h index 3c11c7c..787dcf3 100644 --- a/src/kudu/tablet/diskrowset-test-base.h +++ b/src/kudu/tablet/diskrowset-test-base.h @@ -59,7 +59,8 @@ class TestRowSet : public KuduRowSetTest { : KuduRowSetTest(CreateTestSchema()), n_rows_(FLAGS_roundtrip_num_rows), op_id_(consensus::MaximumOpId()), - clock_(clock::LogicalClock::CreateStartingAt(Timestamp::kInitialTimestamp)) { + clock_(clock::LogicalClock::CreateStartingAt(Timestamp::kInitialTimestamp)), + log_anchor_registry_(new log::LogAnchorRegistry()) { CHECK_GT(n_rows_, 0); } @@ -327,7 +328,7 @@ class TestRowSet : public KuduRowSetTest { Status OpenTestRowSet(std::shared_ptr<DiskRowSet> *rowset) { return DiskRowSet::Open(rowset_meta_, - new log::LogAnchorRegistry(), + log_anchor_registry_.get(), TabletMemTrackers(), nullptr, rowset); @@ -341,6 +342,7 @@ class TestRowSet : public KuduRowSetTest { consensus::OpId op_id_; // Generally a "fake" OpId for these tests. scoped_refptr<clock::Clock> clock_; MvccManager mvcc_; + scoped_refptr<log::LogAnchorRegistry> log_anchor_registry_; }; } // namespace tablet diff --git a/src/kudu/tablet/diskrowset-test.cc b/src/kudu/tablet/diskrowset-test.cc index 9155e4b..ecb6016 100644 --- a/src/kudu/tablet/diskrowset-test.cc +++ b/src/kudu/tablet/diskrowset-test.cc @@ -354,8 +354,8 @@ TEST_F(TestRowSet, TestDMSFlush) { ASSERT_OK(rs->FlushDeltas(nullptr)); - // Check that the DiskRowSet's DMS has now been emptied. - ASSERT_EQ(0, rs->delta_tracker_->dms_->Count()); + // Check that the DiskRowSet's DMS has not been initialized. + ASSERT_FALSE(rs->delta_tracker_->dms_); // Now read back the value column, and verify that the updates // are visible. @@ -739,6 +739,7 @@ INSTANTIATE_TEST_CASE_P(RowIteratorOptionsPermutations, DiffScanRowSetTest, // the test operates on a variety of different on-disk and in-memory layouts. TEST_P(DiffScanRowSetTest, TestFuzz) { fs::IOContext test_context; + scoped_refptr<log::LogAnchorRegistry> log_anchor_registry(new log::LogAnchorRegistry()); // Create and open a DRS with four rows. shared_ptr<DiskRowSet> rs; @@ -756,7 +757,7 @@ TEST_P(DiffScanRowSetTest, TestFuzz) { ASSERT_OK(WriteRow(rb.data(), &drsw)); } ASSERT_OK(drsw.Finish()); - ASSERT_OK(DiskRowSet::Open(rowset_meta_, new log::LogAnchorRegistry(), + ASSERT_OK(DiskRowSet::Open(rowset_meta_, log_anchor_registry.get(), TabletMemTrackers(), &test_context, &rs)); }
