This is an automated email from the ASF dual-hosted git repository. awong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit f4508ff2ceea073197eb07b85c1c4bd28a9c7b3d Author: Andrew Wong <[email protected]> AuthorDate: Tue Mar 17 02:19:02 2020 -0700 tablet: cache delta stats when flushing a DMS This allows us to GC ancient rowsets that have had their DMS flushed, which is likely to be the case, given the default ancient history period is 7 days. Change-Id: I26e74467ed180cebbd8da360763ba9498661e5f3 Reviewed-on: http://gerrit.cloudera.org:8080/15460 Tested-by: Kudu Jenkins Reviewed-by: Adar Dembo <[email protected]> --- src/kudu/tablet/delta_store.h | 19 +++++++++++------- src/kudu/tablet/delta_tracker.cc | 14 ++++++------- src/kudu/tablet/deltafile-test.cc | 2 +- src/kudu/tablet/deltafile.cc | 15 +++++++++++--- src/kudu/tablet/deltafile.h | 33 ++++++++++++++++++++----------- src/kudu/tablet/deltamemstore.h | 2 +- src/kudu/tablet/mt-tablet-test.cc | 4 ++-- src/kudu/tablet/tablet_history_gc-test.cc | 4 ---- src/kudu/tablet/tablet_replica-test.cc | 22 ++++++++++++++++++--- 9 files changed, 76 insertions(+), 39 deletions(-) diff --git a/src/kudu/tablet/delta_store.h b/src/kudu/tablet/delta_store.h index 069600e..7859e9f 100644 --- a/src/kudu/tablet/delta_store.h +++ b/src/kudu/tablet/delta_store.h @@ -14,8 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -#ifndef KUDU_TABLET_DELTA_STORE_H -#define KUDU_TABLET_DELTA_STORE_H +#pragma once #include <cstddef> #include <cstdint> @@ -179,7 +178,7 @@ class DeltaStore { virtual Status Init(const fs::IOContext* io_context) = 0; // Whether this delta store was initialized or not. - virtual bool Initted() = 0; + virtual bool Initted() const = 0; // Create a DeltaIterator for the given projection. // @@ -204,11 +203,18 @@ class DeltaStore { virtual std::string ToString() const = 0; - // TODO remove this once we don't need to have delta_stats for both DMS and DFR. Currently - // DeltaTracker#GetColumnsIdxWithUpdates() needs to filter out DMS from the redo list but it - // can't without RTTI. + // TODO(jdcryans): remove this once we don't need to have delta_stats for + // both DMS and DFR. Currently DeltaTracker#GetColumnsIdxWithUpdates() needs + // to filter out DMS from the redo list but it can't without RTTI. virtual const DeltaStats& delta_stats() const = 0; + // Returns whether callers can use 'delta_stats()', either because they've + // been read from disk, or because the store has been initialized with the + // stats cached. + virtual bool has_delta_stats() const { + return Initted(); + } + virtual ~DeltaStore() {} }; @@ -558,4 +564,3 @@ Status WriteDeltaIteratorToFile(DeltaIterator* iter, } // namespace tablet } // namespace kudu -#endif diff --git a/src/kudu/tablet/delta_tracker.cc b/src/kudu/tablet/delta_tracker.cc index 7b53fc8..2e8c2ea 100644 --- a/src/kudu/tablet/delta_tracker.cc +++ b/src/kudu/tablet/delta_tracker.cc @@ -122,6 +122,7 @@ Status DeltaTracker::OpenDeltaReaders(const vector<BlockId>& blocks, s = DeltaFileReader::OpenNoInit(std::move(block), type, std::move(options), + /*delta_stats*/nullptr, &dfr); if (!s.ok()) { LOG_WITH_PREFIX(ERROR) << "Failed to open " << DeltaType_Name(type) @@ -193,7 +194,7 @@ Status DeltaTracker::MakeDeltaIteratorMergerUnlocked(const IOContext* io_context ignore_result(down_cast<DeltaFileReader*>(delta_store.get())); shared_ptr<DeltaFileReader> dfr = std::static_pointer_cast<DeltaFileReader>(delta_store); - if (dfr->Initted()) { + if (dfr->has_delta_stats()) { delete_count += dfr->delta_stats().delete_count(); reinsert_count += dfr->delta_stats().reinsert_count(); update_count += dfr->delta_stats().UpdateCount(); @@ -461,9 +462,7 @@ bool DeltaTracker::EstimateAllRedosAreAncient(Timestamp ancient_history_mark) { if (!redo_delta_stores_.empty()) { newest_redo = redo_delta_stores_.back(); } - // TODO(awong): keep the delta stats cached after flushing a DMS so a flush - // doesn't invalidate this rowset. - return newest_redo && newest_redo->Initted() && + return newest_redo && newest_redo->has_delta_stats() && newest_redo->delta_stats().max_timestamp() < ancient_history_mark; } @@ -477,7 +476,7 @@ Status DeltaTracker::EstimateBytesInPotentiallyAncientUndoDeltas(Timestamp ancie int64_t tmp_bytes = 0; for (const auto& undo : boost::adaptors::reverse(undos_newest_first)) { // Short-circuit once we hit an initialized delta block with 'max_timestamp' > AHM. - if (undo->Initted() && + if (undo->has_delta_stats() && undo->delta_stats().max_timestamp() >= ancient_history_mark) { break; } @@ -552,7 +551,7 @@ Status DeltaTracker::DeleteAncientUndoDeltas(Timestamp ancient_history_mark, // Traverse oldest-first. for (auto& undo : boost::adaptors::reverse(undos_newest_first)) { - if (!undo->Initted()) break; // Never initialize the deltas in this code path (it's slow). + if (!undo->has_delta_stats()) break; if (undo->delta_stats().max_timestamp() >= ancient_history_mark) break; tmp_blocks_deleted++; tmp_bytes_deleted += undo->EstimateSize(); @@ -762,6 +761,7 @@ Status DeltaTracker::FlushDMS(DeltaMemStore* dms, RETURN_NOT_OK(DeltaFileReader::OpenNoInit(std::move(readable_block), REDO, std::move(options), + std::move(stats), dfr)); VLOG_WITH_PREFIX(1) << "Opened new delta block " << block_id.ToString() << " for read"; @@ -890,7 +890,7 @@ void DeltaTracker::GetColumnIdsWithUpdates(std::vector<ColumnId>* col_ids) const set<ColumnId> column_ids_with_updates; for (const shared_ptr<DeltaStore>& ds : redo_delta_stores_) { // We won't force open files just to read their stats. - if (!ds->Initted()) { + if (!ds->has_delta_stats()) { continue; } diff --git a/src/kudu/tablet/deltafile-test.cc b/src/kudu/tablet/deltafile-test.cc index 207b46c..c88396f 100644 --- a/src/kudu/tablet/deltafile-test.cc +++ b/src/kudu/tablet/deltafile-test.cc @@ -374,7 +374,7 @@ TEST_F(TestDeltaFile, TestLazyInit) { // Lazily opening the delta file should not trigger any reads. shared_ptr<DeltaFileReader> reader; ASSERT_OK(DeltaFileReader::OpenNoInit( - std::move(count_block), REDO, ReaderOptions(), &reader)); + std::move(count_block), REDO, ReaderOptions(), /*delta_stats*/nullptr, &reader)); ASSERT_EQ(0, bytes_read); // But initializing it should (only the first time). diff --git a/src/kudu/tablet/deltafile.cc b/src/kudu/tablet/deltafile.cc index 227429a..5e83342 100644 --- a/src/kudu/tablet/deltafile.cc +++ b/src/kudu/tablet/deltafile.cc @@ -40,6 +40,7 @@ #include "kudu/fs/block_id.h" #include "kudu/fs/block_manager.h" #include "kudu/fs/fs_manager.h" +#include "kudu/gutil/port.h" #include "kudu/gutil/stringprintf.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/tablet/delta_relevancy.h" @@ -217,6 +218,7 @@ Status DeltaFileReader::Open(unique_ptr<ReadableBlock> block, RETURN_NOT_OK(DeltaFileReader::OpenNoInit(std::move(block), delta_type, std::move(options), + /*delta_stats*/nullptr, &df_reader)); RETURN_NOT_OK(df_reader->Init(io_context)); @@ -227,6 +229,7 @@ Status DeltaFileReader::Open(unique_ptr<ReadableBlock> block, Status DeltaFileReader::OpenNoInit(unique_ptr<ReadableBlock> block, DeltaType delta_type, ReaderOptions options, + unique_ptr<DeltaStats> delta_stats, shared_ptr<DeltaFileReader>* reader_out) { unique_ptr<CFileReader> cf_reader; const IOContext* io_context = options.io_context; @@ -234,7 +237,7 @@ Status DeltaFileReader::OpenNoInit(unique_ptr<ReadableBlock> block, std::move(options), &cf_reader)); unique_ptr<DeltaFileReader> df_reader( - new DeltaFileReader(std::move(cf_reader), delta_type)); + new DeltaFileReader(std::move(cf_reader), std::move(delta_stats), delta_type)); if (!FLAGS_cfile_lazy_open) { RETURN_NOT_OK(df_reader->Init(io_context)); } @@ -245,8 +248,10 @@ Status DeltaFileReader::OpenNoInit(unique_ptr<ReadableBlock> block, } DeltaFileReader::DeltaFileReader(unique_ptr<CFileReader> cf_reader, + unique_ptr<DeltaStats> delta_stats, DeltaType delta_type) : reader_(cf_reader.release()), + delta_stats_(std::move(delta_stats)), delta_type_(delta_type) {} Status DeltaFileReader::Init(const IOContext* io_context) { @@ -264,7 +269,9 @@ Status DeltaFileReader::InitOnce(const IOContext* io_context) { } // Initialize delta file stats - RETURN_NOT_OK(ReadDeltaStats()); + if (!has_delta_stats()) { + RETURN_NOT_OK(ReadDeltaStats()); + } return Status::OK(); } @@ -280,6 +287,7 @@ Status DeltaFileReader::ReadDeltaStats() { } unique_ptr<DeltaStats> stats(new DeltaStats()); RETURN_NOT_OK(stats->InitFromPB(deltastats_pb)); + std::lock_guard<simple_spinlock> l(stats_lock_); delta_stats_ = std::move(stats); return Status::OK(); } @@ -317,7 +325,8 @@ Status DeltaFileReader::CloneForDebugging(FsManager* fs_manager, RETURN_NOT_OK(fs_manager->OpenBlock(reader_->block_id(), &block)); ReaderOptions options; options.parent_mem_tracker = parent_mem_tracker; - return DeltaFileReader::OpenNoInit(std::move(block), delta_type_, options, out); + return DeltaFileReader::OpenNoInit(std::move(block), delta_type_, options, + /*delta_stats*/nullptr, out); } Status DeltaFileReader::NewDeltaIterator(const RowIteratorOptions& opts, diff --git a/src/kudu/tablet/deltafile.h b/src/kudu/tablet/deltafile.h index 68222be..1850d1a 100644 --- a/src/kudu/tablet/deltafile.h +++ b/src/kudu/tablet/deltafile.h @@ -20,6 +20,7 @@ #include <cstdint> #include <deque> #include <memory> +#include <mutex> #include <string> #include <vector> @@ -32,12 +33,12 @@ #include "kudu/cfile/cfile_writer.h" #include "kudu/common/rowid.h" #include "kudu/gutil/macros.h" -#include "kudu/gutil/port.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/tablet/delta_key.h" #include "kudu/tablet/delta_stats.h" #include "kudu/tablet/delta_store.h" #include "kudu/util/faststring.h" +#include "kudu/util/locks.h" #include "kudu/util/once.h" #include "kudu/util/slice.h" #include "kudu/util/status.h" @@ -147,33 +148,40 @@ class DeltaFileReader : public DeltaStore, static Status OpenNoInit(std::unique_ptr<fs::ReadableBlock> block, DeltaType delta_type, cfile::ReaderOptions options, + std::unique_ptr<DeltaStats> delta_stats, std::shared_ptr<DeltaFileReader>* reader_out); - virtual Status Init(const fs::IOContext* io_context) OVERRIDE; + Status Init(const fs::IOContext* io_context) override; - virtual bool Initted() OVERRIDE { + bool Initted() const override { return init_once_.init_succeeded(); } // See DeltaStore::NewDeltaIterator(...) Status NewDeltaIterator(const RowIteratorOptions& opts, - std::unique_ptr<DeltaIterator>* iterator) const OVERRIDE; + std::unique_ptr<DeltaIterator>* iterator) const override; // See DeltaStore::CheckRowDeleted - virtual Status CheckRowDeleted(rowid_t row_idx, - const fs::IOContext* io_context, - bool *deleted) const OVERRIDE; + Status CheckRowDeleted(rowid_t row_idx, + const fs::IOContext* io_context, + bool *deleted) const override; - virtual uint64_t EstimateSize() const OVERRIDE; + uint64_t EstimateSize() const override; const BlockId& block_id() const { return reader_->block_id(); } - virtual const DeltaStats& delta_stats() const OVERRIDE { - DCHECK(init_once_.init_succeeded()); + const DeltaStats& delta_stats() const override { + std::lock_guard<simple_spinlock> l(stats_lock_); + DCHECK(delta_stats_); return *delta_stats_; } - virtual std::string ToString() const OVERRIDE { + bool has_delta_stats() const override { + std::lock_guard<simple_spinlock> l(stats_lock_); + return delta_stats_ != nullptr; + } + + std::string ToString() const override { if (!init_once_.init_succeeded()) return reader_->ToString(); return strings::Substitute("$0 ($1)", reader_->ToString(), delta_stats_->ToString()); } @@ -201,6 +209,7 @@ class DeltaFileReader : public DeltaStore, } DeltaFileReader(std::unique_ptr<cfile::CFileReader> cf_reader, + std::unique_ptr<DeltaStats> delta_stats, DeltaType delta_type); // Callback used in 'init_once_' to initialize this delta file. @@ -209,6 +218,8 @@ class DeltaFileReader : public DeltaStore, Status ReadDeltaStats(); std::shared_ptr<cfile::CFileReader> reader_; + + mutable simple_spinlock stats_lock_; std::unique_ptr<DeltaStats> delta_stats_; // The type of this delta, i.e. UNDO or REDO. diff --git a/src/kudu/tablet/deltamemstore.h b/src/kudu/tablet/deltamemstore.h index 4a42551..7603ebc 100644 --- a/src/kudu/tablet/deltamemstore.h +++ b/src/kudu/tablet/deltamemstore.h @@ -81,7 +81,7 @@ class DeltaMemStore : public DeltaStore, virtual Status Init(const fs::IOContext* io_context) OVERRIDE; - virtual bool Initted() OVERRIDE { + virtual bool Initted() const OVERRIDE { return true; } diff --git a/src/kudu/tablet/mt-tablet-test.cc b/src/kudu/tablet/mt-tablet-test.cc index f40b579..8cf354b 100644 --- a/src/kudu/tablet/mt-tablet-test.cc +++ b/src/kudu/tablet/mt-tablet-test.cc @@ -323,11 +323,11 @@ class MultiThreadedTabletTest : public TabletTestBase<SETUP> { } } - void MinorCompactDeltasThread(int tid) { + void MinorCompactDeltasThread(int /*tid*/) { CompactDeltas(RowSet::MINOR_DELTA_COMPACTION); } - void MajorCompactDeltasThread(int tid) { + void MajorCompactDeltasThread(int /*tid*/) { CompactDeltas(RowSet::MAJOR_DELTA_COMPACTION); } diff --git a/src/kudu/tablet/tablet_history_gc-test.cc b/src/kudu/tablet/tablet_history_gc-test.cc index b5fa40b..a7e4ea0 100644 --- a/src/kudu/tablet/tablet_history_gc-test.cc +++ b/src/kudu/tablet/tablet_history_gc-test.cc @@ -668,10 +668,6 @@ TEST_F(TabletHistoryGcNoMaintMgrTest, TestGCDeletedRowsetsWithRedoFiles) { NO_FATALS(DeleteOriginalRows(kNumRowsets, rows_per_rowset_, /*flush_dms*/true)); ASSERT_EQ(kNumRowsets, tablet()->CountRedoDeltasForTests()); - // TODO(awong): keep the delta stats cached after flushing a DMS so we don't - // have to scan to read stats. - NO_FATALS(VerifyTestRows(0, 0)); - // We shouldn't have any ancient rowsets since we haven't passed the AHM. int64_t bytes = 0; ASSERT_OK(tablet()->GetBytesInAncientDeletedRowsets(&bytes)); diff --git a/src/kudu/tablet/tablet_replica-test.cc b/src/kudu/tablet/tablet_replica-test.cc index c4b88d3..baad47c 100644 --- a/src/kudu/tablet/tablet_replica-test.cc +++ b/src/kudu/tablet/tablet_replica-test.cc @@ -810,7 +810,8 @@ TEST_F(TabletReplicaTest, TestRestartAfterGCDeletedRowsets) { auto live_row_count = METRIC_live_row_count.InstantiateFunctionGauge( tablet->GetMetricEntity(), [] () { return 0; }); - // Insert some rows and flush so we get a DRS. + // Insert some rows and flush so we get a DRS, and then delete them so we + // have an ancient, fully deleted DRS. ASSERT_OK(ExecuteInsertsAndRollLogs(kNumRows)); ASSERT_OK(tablet->Flush()); ASSERT_OK(ExecuteDeletesAndRollLogs(kNumRows)); @@ -828,14 +829,29 @@ TEST_F(TabletReplicaTest, TestRestartAfterGCDeletedRowsets) { ASSERT_OK(tablet->DeleteAncientDeletedRowsets()); ASSERT_EQ(1, tablet->num_rowsets()); ASSERT_EQ(kNumRows, live_row_count->value()); + ASSERT_OK(ExecuteDeletesAndRollLogs(kNumRows)); + ASSERT_EQ(0, live_row_count->value()); - // Restart and ensure we can get online okay. + // Restart and ensure we can rebuild our DMS okay. NO_FATALS(RestartReplica()); tablet = tablet_replica_->tablet(); ASSERT_EQ(1, tablet->num_rowsets()); live_row_count = METRIC_live_row_count.InstantiateFunctionGauge( tablet->GetMetricEntity(), [] () { return 0; }); - ASSERT_EQ(kNumRows, live_row_count->value()); + ASSERT_EQ(0, live_row_count->value()); + + // Now do that again but with deltafiles. + ASSERT_OK(tablet->FlushBiggestDMS()); + NO_FATALS(RestartReplica()); + tablet = tablet_replica_->tablet(); + ASSERT_EQ(1, tablet->num_rowsets()); + + // Wait for our deleted rowset to become ancient. Since we just started up, + // we shouldn't have read any delta stats, so running the GC won't pick up + // our deleted DRS. + SleepFor(MonoDelta::FromSeconds(FLAGS_tablet_history_max_age_sec)); + ASSERT_OK(tablet->DeleteAncientDeletedRowsets()); + ASSERT_EQ(1, tablet->num_rowsets()); } } // namespace tablet
