Repository: kudu Updated Branches: refs/heads/master 0fbebd67c -> a68c9355b
tablet: add second snapshot to iterator options This commit adds a second snapshot parameter called 'snap_to_exclude' and renames the existing snapshot to 'snap_to_include'. The new snapshot is hidden behind a boost::optional so that ignoring it is more efficient (i.e. a call to boost::optional::is_initialized() instead of MvccSnapshot::IsCommitted() on a snapshot including all transactions). Taken together, the two snapshots form a time range. If both are set, the iterator should only show transactions committed in 'snap_to_include' and not committed in 'snap_to_exclude'. This will be used in the upcoming diff scan API, which needs to produce results between two HT timestamps. Change-Id: If4f7390671a637962cc2e3851e8be0e3a6982b17 Reviewed-on: http://gerrit.cloudera.org:8080/10925 Tested-by: Adar Dembo <[email protected]> Reviewed-by: Mike Percy <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/a68c9355 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/a68c9355 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/a68c9355 Branch: refs/heads/master Commit: a68c9355bfeb9d16964b97f6b1ade98a513dac54 Parents: 0fbebd6 Author: Adar Dembo <[email protected]> Authored: Tue Jul 10 16:19:51 2018 -0700 Committer: Mike Percy <[email protected]> Committed: Thu Jul 19 22:06:17 2018 +0000 ---------------------------------------------------------------------- src/kudu/tablet/compaction.cc | 6 +++--- src/kudu/tablet/deltafile-test.cc | 10 +++++----- src/kudu/tablet/deltafile.cc | 24 ++++++++++++------------ src/kudu/tablet/deltamemstore-test.cc | 6 +++--- src/kudu/tablet/deltamemstore.cc | 2 +- src/kudu/tablet/diskrowset-test.cc | 4 ++-- src/kudu/tablet/memrowset-test.cc | 2 +- src/kudu/tablet/memrowset.cc | 4 ++-- src/kudu/tablet/rowset.cc | 2 +- src/kudu/tablet/rowset.h | 10 +++++++++- src/kudu/tablet/tablet-test-util.h | 2 +- src/kudu/tablet/tablet.cc | 2 +- 12 files changed, 41 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/a68c9355/src/kudu/tablet/compaction.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/compaction.cc b/src/kudu/tablet/compaction.cc index ec9edb4..c754e26 100644 --- a/src/kudu/tablet/compaction.cc +++ b/src/kudu/tablet/compaction.cc @@ -94,7 +94,7 @@ class MemRowSetCompactionInput : public CompactionInput { has_more_blocks_(false) { RowIteratorOptions opts; opts.projection = projection; - opts.snap = snap; + opts.snap_to_include = snap; iter_.reset(memrowset.NewIterator(opts)); } @@ -861,7 +861,7 @@ Status CompactionInput::Create(const DiskRowSet &rowset, // Creates a DeltaIteratorMerger that will only include the relevant REDO deltas. RowIteratorOptions redo_opts; redo_opts.projection = projection; - redo_opts.snap = snap; + redo_opts.snap_to_include = snap; unique_ptr<DeltaIterator> redo_deltas; RETURN_NOT_OK_PREPEND(rowset.delta_tracker_->NewDeltaIterator( redo_opts, DeltaTracker::REDOS_ONLY, &redo_deltas), "Could not open REDOs"); @@ -869,7 +869,7 @@ Status CompactionInput::Create(const DiskRowSet &rowset, // "empty" snapshot ensures that all deltas are included. RowIteratorOptions undo_opts; undo_opts.projection = projection; - undo_opts.snap = MvccSnapshot::CreateSnapshotIncludingNoTransactions(); + undo_opts.snap_to_include = MvccSnapshot::CreateSnapshotIncludingNoTransactions(); unique_ptr<DeltaIterator> undo_deltas; RETURN_NOT_OK_PREPEND(rowset.delta_tracker_->NewDeltaIterator( undo_opts, DeltaTracker::UNDOS_ONLY, &undo_deltas), "Could not open UNDOs"); http://git-wip-us.apache.org/repos/asf/kudu/blob/a68c9355/src/kudu/tablet/deltafile-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/deltafile-test.cc b/src/kudu/tablet/deltafile-test.cc index 3dbc9e6..19d08f0 100644 --- a/src/kudu/tablet/deltafile-test.cc +++ b/src/kudu/tablet/deltafile-test.cc @@ -150,7 +150,7 @@ class TestDeltaFile : public KuduTest { const shared_ptr<DeltaFileReader>& reader, gscoped_ptr<DeltaIterator>* out) { RowIteratorOptions opts; - opts.snap = type == REDO ? + opts.snap_to_include = type == REDO ? MvccSnapshot::CreateSnapshotIncludingAllTransactions() : MvccSnapshot::CreateSnapshotIncludingNoTransactions(); opts.projection = &schema_; @@ -341,8 +341,8 @@ TEST_F(TestDeltaFile, TestSkipsDeltasOutOfRange) { opts.projection = &schema_; // should skip - opts.snap = MvccSnapshot(Timestamp(9)); - ASSERT_FALSE(opts.snap.MayHaveCommittedTransactionsAtOrAfter(Timestamp(10))); + opts.snap_to_include = MvccSnapshot(Timestamp(9)); + ASSERT_FALSE(opts.snap_to_include.MayHaveCommittedTransactionsAtOrAfter(Timestamp(10))); DeltaIterator* raw_iter = nullptr; Status s = reader->NewDeltaIterator(opts, &raw_iter); ASSERT_TRUE(s.IsNotFound()); @@ -350,14 +350,14 @@ TEST_F(TestDeltaFile, TestSkipsDeltasOutOfRange) { // should include raw_iter = nullptr; - opts.snap = MvccSnapshot(Timestamp(15)); + opts.snap_to_include = MvccSnapshot(Timestamp(15)); ASSERT_OK(reader->NewDeltaIterator(opts, &raw_iter)); ASSERT_TRUE(raw_iter != nullptr); iter.reset(raw_iter); // should include raw_iter = nullptr; - opts.snap = MvccSnapshot(Timestamp(21)); + opts.snap_to_include = MvccSnapshot(Timestamp(21)); ASSERT_OK(reader->NewDeltaIterator(opts, &raw_iter)); ASSERT_TRUE(raw_iter != nullptr); iter.reset(raw_iter); http://git-wip-us.apache.org/repos/asf/kudu/blob/a68c9355/src/kudu/tablet/deltafile.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/deltafile.cc b/src/kudu/tablet/deltafile.cc index d00c654..4cd832f 100644 --- a/src/kudu/tablet/deltafile.cc +++ b/src/kudu/tablet/deltafile.cc @@ -317,22 +317,22 @@ Status DeltaFileReader::CloneForDebugging(FsManager* fs_manager, Status DeltaFileReader::NewDeltaIterator(const RowIteratorOptions& opts, DeltaIterator** iterator) const { - if (IsRelevantForSnapshot(opts.snap)) { + if (IsRelevantForSnapshot(opts.snap_to_include)) { if (VLOG_IS_ON(2)) { if (!init_once_.init_succeeded()) { TRACE_COUNTER_INCREMENT("delta_iterators_lazy_initted", 1); VLOG(2) << (delta_type_ == REDO ? "REDO" : "UNDO") << " delta " << ToString() << " has no delta stats" - << ": can't cull for " << opts.snap.ToString(); + << ": can't cull for " << opts.snap_to_include.ToString(); } else if (delta_type_ == REDO) { VLOG(2) << "REDO delta " << ToString() << " has min ts " << delta_stats_->min_timestamp().ToString() - << ": can't cull for " << opts.snap.ToString(); + << ": can't cull for " << opts.snap_to_include.ToString(); } else { VLOG(2) << "UNDO delta " << ToString() << " has max ts " << delta_stats_->max_timestamp().ToString() - << ": can't cull for " << opts.snap.ToString(); + << ": can't cull for " << opts.snap_to_include.ToString(); } } @@ -345,7 +345,7 @@ Status DeltaFileReader::NewDeltaIterator(const RowIteratorOptions& opts, } VLOG(2) << "Culling " << ((delta_type_ == REDO) ? "REDO":"UNDO") - << " delta " << ToString() << " for " << opts.snap.ToString(); + << " delta " << ToString() << " for " << opts.snap_to_include.ToString(); return Status::NotFound("MvccSnapshot outside the range of this delta."); } @@ -433,7 +433,7 @@ Status DeltaFileIterator::SeekToOrdinal(rowid_t idx) { // that we are querying. We did this already before creating the // DeltaFileIterator, but due to lazy initialization, it's possible // that we weren't able to check at that time. - if (!dfr_->IsRelevantForSnapshot(opts_.snap)) { + if (!dfr_->IsRelevantForSnapshot(opts_.snap_to_include)) { exhausted_ = true; delta_blocks_.clear(); return Status::OK(); @@ -713,7 +713,7 @@ template<> inline Status ApplyingVisitor<REDO>::Visit(const DeltaKey& key, const Slice& deltas, bool* continue_visit) { - if (IsRedoRelevant(dfi->opts_.snap, key.timestamp(), continue_visit)) { + if (IsRedoRelevant(dfi->opts_.snap_to_include, key.timestamp(), continue_visit)) { DVLOG(3) << "Applied redo delta"; return ApplyMutation(key, deltas); } @@ -725,7 +725,7 @@ template<> inline Status ApplyingVisitor<UNDO>::Visit(const DeltaKey& key, const Slice& deltas, bool* continue_visit) { - if (IsUndoRelevant(dfi->opts_.snap, key.timestamp(), continue_visit)) { + if (IsUndoRelevant(dfi->opts_.snap_to_include, key.timestamp(), continue_visit)) { DVLOG(3) << "Applied undo delta"; return ApplyMutation(key, deltas); } @@ -787,7 +787,7 @@ template<> inline Status LivenessVisitor<REDO>::Visit(const DeltaKey& key, const Slice& deltas, bool* continue_visit) { - if (IsRedoRelevant(dfi->opts_.snap, key.timestamp(), continue_visit)) { + if (IsRedoRelevant(dfi->opts_.snap_to_include, key.timestamp(), continue_visit)) { return ApplyDelete(key, deltas); } return Status::OK(); @@ -797,7 +797,7 @@ template<> inline Status LivenessVisitor<UNDO>::Visit(const DeltaKey& key, const Slice& deltas, bool* continue_visit) { - if (IsUndoRelevant(dfi->opts_.snap, key.timestamp(), continue_visit)) { + if (IsUndoRelevant(dfi->opts_.snap_to_include, key.timestamp(), continue_visit)) { return ApplyDelete(key, deltas); } return Status::OK(); @@ -843,7 +843,7 @@ template<> inline Status CollectingVisitor<REDO>::Visit(const DeltaKey& key, const Slice& deltas, bool* continue_visit) { - if (IsRedoRelevant(dfi->opts_.snap, key.timestamp(), continue_visit)) { + if (IsRedoRelevant(dfi->opts_.snap_to_include, key.timestamp(), continue_visit)) { return Collect(key, deltas); } return Status::OK(); @@ -853,7 +853,7 @@ template<> inline Status CollectingVisitor<UNDO>::Visit(const DeltaKey& key, const Slice& deltas, bool* continue_visit) { - if (IsUndoRelevant(dfi->opts_.snap, key.timestamp(), continue_visit)) { + if (IsUndoRelevant(dfi->opts_.snap_to_include, key.timestamp(), continue_visit)) { return Collect(key, deltas); } return Status::OK(); http://git-wip-us.apache.org/repos/asf/kudu/blob/a68c9355/src/kudu/tablet/deltamemstore-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/deltamemstore-test.cc b/src/kudu/tablet/deltamemstore-test.cc index e60cdc6..0c3930c 100644 --- a/src/kudu/tablet/deltamemstore-test.cc +++ b/src/kudu/tablet/deltamemstore-test.cc @@ -134,7 +134,7 @@ class TestDeltaMemStore : public KuduTest { 0); RowIteratorOptions opts; opts.projection = &single_col_projection; - opts.snap = snapshot; + opts.snap_to_include = snapshot; DeltaIterator* raw_iter; Status s = dms_->NewDeltaIterator(opts, &raw_iter); if (s.IsNotFound()) { @@ -452,7 +452,7 @@ TEST_F(TestDeltaMemStore, TestIteratorDoesUpdates) { RowIteratorOptions opts; opts.projection = &schema_; // TODO(todd): test snapshot reads from different points - opts.snap = MvccSnapshot(mvcc_); + opts.snap_to_include = MvccSnapshot(mvcc_); DeltaIterator* raw_iter; Status s = dms_->NewDeltaIterator(opts, &raw_iter); if (s.IsNotFound()) { @@ -500,7 +500,7 @@ TEST_F(TestDeltaMemStore, TestCollectMutations) { RowIteratorOptions opts; opts.projection = &schema_; - opts.snap = MvccSnapshot(mvcc_); + opts.snap_to_include = MvccSnapshot(mvcc_); DeltaIterator* raw_iter; Status s = dms_->NewDeltaIterator(opts, &raw_iter); if (s.IsNotFound()) { http://git-wip-us.apache.org/repos/asf/kudu/blob/a68c9355/src/kudu/tablet/deltamemstore.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/deltamemstore.cc b/src/kudu/tablet/deltamemstore.cc index c1abcef..ec07372 100644 --- a/src/kudu/tablet/deltamemstore.cc +++ b/src/kudu/tablet/deltamemstore.cc @@ -259,7 +259,7 @@ Status DMSIterator::PrepareBatch(size_t nrows, PrepareFlag flag) { DCHECK_GE(key.row_idx(), start_row); if (key.row_idx() > stop_row) break; - if (!opts_.snap.IsCommitted(key.timestamp())) { + if (!opts_.snap_to_include.IsCommitted(key.timestamp())) { // The transaction which applied this update is not yet committed // in this iterator's MVCC snapshot. Hence, skip it. iter_->Next(); http://git-wip-us.apache.org/repos/asf/kudu/blob/a68c9355/src/kudu/tablet/diskrowset-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/diskrowset-test.cc b/src/kudu/tablet/diskrowset-test.cc index e64cdfa..105e8ba 100644 --- a/src/kudu/tablet/diskrowset-test.cc +++ b/src/kudu/tablet/diskrowset-test.cc @@ -416,7 +416,7 @@ TEST_F(TestRowSet, TestFlushedUpdatesRespectMVCC) { SCOPED_TRACE(i); RowIteratorOptions opts; opts.projection = &schema_; - opts.snap = snaps[i]; + opts.snap_to_include = snaps[i]; gscoped_ptr<RowwiseIterator> iter; ASSERT_OK(rs->NewRowIterator(opts, &iter)); string data = InitAndDumpIterator(std::move(iter)); @@ -431,7 +431,7 @@ TEST_F(TestRowSet, TestFlushedUpdatesRespectMVCC) { SCOPED_TRACE(i); RowIteratorOptions opts; opts.projection = &schema_; - opts.snap = snaps[i]; + opts.snap_to_include = snaps[i]; gscoped_ptr<RowwiseIterator> iter; ASSERT_OK(rs->NewRowIterator(opts, &iter)); string data = InitAndDumpIterator(std::move(iter)); http://git-wip-us.apache.org/repos/asf/kudu/blob/a68c9355/src/kudu/tablet/memrowset-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/memrowset-test.cc b/src/kudu/tablet/memrowset-test.cc index 047039f..027d05e 100644 --- a/src/kudu/tablet/memrowset-test.cc +++ b/src/kudu/tablet/memrowset-test.cc @@ -192,7 +192,7 @@ class TestMemRowSet : public KuduTest { int ScanAndCount(MemRowSet* mrs, const MvccSnapshot& snap) { RowIteratorOptions opts; opts.projection = &schema_; - opts.snap = snap; + opts.snap_to_include = snap; gscoped_ptr<MemRowSet::Iterator> iter(mrs->NewIterator(opts)); CHECK_OK(iter->Init(nullptr)); http://git-wip-us.apache.org/repos/asf/kudu/blob/a68c9355/src/kudu/tablet/memrowset.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/memrowset.cc b/src/kudu/tablet/memrowset.cc index 3d25bc7..5e90eb9 100644 --- a/src/kudu/tablet/memrowset.cc +++ b/src/kudu/tablet/memrowset.cc @@ -496,7 +496,7 @@ Status MemRowSet::Iterator::FetchRows(RowBlock* dst, size_t* fetched) { iter_->GetCurrentEntry(&k, &v); MRSRow row(memrowset_.get(), v); - if (opts_.snap.IsCommitted(row.insertion_timestamp())) { + if (opts_.snap_to_include.IsCommitted(row.insertion_timestamp())) { if (has_upper_bound() && out_of_bounds(k)) { state_ = kFinished; break; @@ -542,7 +542,7 @@ Status MemRowSet::Iterator::ApplyMutationsToProjectedRow( for (const Mutation *mut = mutation_head; mut != nullptr; mut = mut->acquire_next()) { - if (!opts_.snap.IsCommitted(mut->timestamp_)) { + if (!opts_.snap_to_include.IsCommitted(mut->timestamp_)) { // Transaction which wasn't committed yet in the reader's snapshot. continue; } http://git-wip-us.apache.org/repos/asf/kudu/blob/a68c9355/src/kudu/tablet/rowset.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/rowset.cc b/src/kudu/tablet/rowset.cc index cf060f7..fb815fd 100644 --- a/src/kudu/tablet/rowset.cc +++ b/src/kudu/tablet/rowset.cc @@ -40,7 +40,7 @@ namespace tablet { RowIteratorOptions::RowIteratorOptions() : projection(nullptr), - snap(MvccSnapshot::CreateSnapshotIncludingAllTransactions()), + snap_to_include(MvccSnapshot::CreateSnapshotIncludingAllTransactions()), order(OrderMode::UNORDERED) {} DuplicatingRowSet::DuplicatingRowSet(RowSetVector old_rowsets, http://git-wip-us.apache.org/repos/asf/kudu/blob/a68c9355/src/kudu/tablet/rowset.h ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/rowset.h b/src/kudu/tablet/rowset.h index 3dcf920..d8e8bec 100644 --- a/src/kudu/tablet/rowset.h +++ b/src/kudu/tablet/rowset.h @@ -25,6 +25,7 @@ #include <string> #include <vector> +#include <boost/optional/optional.hpp> #include <glog/logging.h> #include "kudu/common/common.pb.h" @@ -72,7 +73,14 @@ struct RowIteratorOptions { // Transactions not committed in this snapshot will be ignored in the iteration. // // Defaults to a snapshot that includes all transactions. - MvccSnapshot snap; + MvccSnapshot snap_to_include; + + // Transactions committed in this snapshot will be ignored in the iteration. + // This is stored in a boost::optional so that iterators can ignore it + // entirely if it is unset (the common case). + // + // Defaults to none. + boost::optional<MvccSnapshot> snap_to_exclude; // Whether iteration should be ordered by primary key. Only relevant to those // iterators that deal with primary key order. http://git-wip-us.apache.org/repos/asf/kudu/blob/a68c9355/src/kudu/tablet/tablet-test-util.h ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/tablet-test-util.h b/src/kudu/tablet/tablet-test-util.h index e43707d..b45c75d 100644 --- a/src/kudu/tablet/tablet-test-util.h +++ b/src/kudu/tablet/tablet-test-util.h @@ -235,7 +235,7 @@ static inline Status DumpRowSet(const RowSet &rs, int limit = INT_MAX) { RowIteratorOptions opts; opts.projection = &projection; - opts.snap = snap; + opts.snap_to_include = snap; gscoped_ptr<RowwiseIterator> iter; RETURN_NOT_OK(rs.NewRowIterator(opts, &iter)); RETURN_NOT_OK(iter->Init(nullptr)); http://git-wip-us.apache.org/repos/asf/kudu/blob/a68c9355/src/kudu/tablet/tablet.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc index 4a59847..6fe3e96 100644 --- a/src/kudu/tablet/tablet.cc +++ b/src/kudu/tablet/tablet.cc @@ -1739,7 +1739,7 @@ Status Tablet::CaptureConsistentIterators( RowIteratorOptions opts; opts.projection = projection; - opts.snap = snap; + opts.snap_to_include = snap; opts.order = order; // Grab the memrowset iterator.
