This is an automated email from the ASF dual-hosted git repository.
awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 2ca437e KUDU-2612: have MRS iteration account for txn metadata
2ca437e is described below
commit 2ca437e25e93dba3d3678dd20d92ffa0b4ba2e29
Author: Andrew Wong <[email protected]>
AuthorDate: Tue Sep 22 00:35:35 2020 -0700
KUDU-2612: have MRS iteration account for txn metadata
This patch introduces the ability to iterate through the rows of a MRS
taking into account the transaction's commit status, rather than relying
on the apply timestamps of the individual mutations therein. It does so
by adding a reference to the TxnMetadata in the MRS. Upon iteration, if
a commit timestamp exists for the transaction, Kudu uses the transaction
metadata to determine relevancy.
As a refresher, the MvccManager tracks mutations by maintaining a
"current" MvccSnapshot that encapsulates timestamps for ops that have
been applied. Rather than keeping track of every applied timestamp
individually, the MvccManager also keeps track of the currently
in-flight ops and the lower bound on future ops' timestamps, as
guaranteed by the TimeManager. Taken together, these define a watermark
timestamp below which all timestamps can be considered applied, as well
as a set of higher timestamps that are considered applied, but are
higher than the earliest in-flight (not-yet-applied) op's timestamp.
The MvccManager passes out MvccSnapshots that detail whether iterators
should consider certain timestamps as relevant to iteration. These
snapshots are used in the following ways:
- Snapshot scans:
- The user input is a timestamp, which is used to generate an
MvccSnapshot defined by that timestamp, i.e. all timestamps before
are applied, and all timestamps above are not applied.
- Such a snapshot is defined to be a "clean" snapshot.
- Before iterating through data, Kudu waits for the safe time to pass
beyond the given timestamp, and waits for all ops with lower
timestamps to complete. Only then can Kudu safely iterate through
mutations with certainty that relevancy can be determined via a
simple comparison against the clean snapshot.
- Diff scans:
- Similar to the above case, but with a second, lower input timestamp
to serve as a lower bound on relevant mutation timestamps.
- READ_LATEST scans:
- Unlike the above two scenarios, no input timestamp is given here.
Instead, Kudu will use the MvccManager's current MvccSnapshot, which
isn't guaranteed to be a clean snapshot.
- If it can, Kudu uses the watermark to determine relevancy (fast
path, like with clean snapshots), and if not, it falls back on the
set of higher timestamps that are considered applied (slow path).
- Flushes and compactions:
- Snapshots are also used in the context of flushes and compactions to
track ops that get applied in the process of a flush or compaction,
for the sake of duplicating ops onto new data stores if they were
missed while swapping in the new data stores.
- As with READ_LATEST, the snapshots used here aren't necessarily
clean snapshots.
Based on the above usages, this patch distinguishes between two types of
MvccSnapshots that encapsulate all usage today:
- kTimestamp: we are iterating as of a specific timestamp T. We must
guarantee that iteration will see all mutations made visible before T
(i.e. Raft committed before T for non-transaction ops, transaction
committed before T for transaction ops). We may wait for MVCC ops to
complete to ensure this is guaranteed. Scans in this mode are
repeatable. Snapshot and diff scans use kTimestamp snapshots.
- kLatest: we are iterating without waiting for the completion of any
ops -- instead, we only care about seeing a view of the latest
completed ops, regardless of whether there are non-applied ops from
before the latest applied ops. READ_LATEST scans and flushes use
kLatest snapshots.
In the context of evaluating commit status in transactions, these
snapshot types behave as follows when iterating:
- kTimestamp: since we care about displaying all ops or transactions
from before T, scanners should wait for T to become safe, and for ops
before T to complete (including all commit MVCC ops). After waiting,
all transactions that would have a commit timestamp lower than T will
have a commit timestamp in their metadata. As such, it's sufficient
that, while iterating, we look at the commit timestamp of each
mutation and compare it to T. If no commit timestamp exists for a
transactional mutation, it must not have committed as of T.
- kLatest: since we don't care about using a clean snapshot, it's
sufficient to use the current snapshot, which includes transactions'
commit MVCC ops. If that op is finished for a given transaction, Kudu
should check whether the transaction was aborted or committed. If the
op was not finished in the snapshot, it could not have committed.
This patch only adds the APIs to the MvccManager, with some initial
usage for snapshot and diff scans in the memrowset; there is still no
way to exercise these APIs using a real tablet, nor is there a way to
persist the MVCC op timestamp in metadata. These will come in later
patches.
Change-Id: I6bb02c6025eea1a327cf9d9ee1f14a38d63ae4ad
Reviewed-on: http://gerrit.cloudera.org:8080/16510
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <[email protected]>
---
src/kudu/tablet/memrowset-test.cc | 159 +++++++++++++++++++++++++++++++
src/kudu/tablet/memrowset.cc | 72 ++++++++++----
src/kudu/tablet/memrowset.h | 40 +++++++-
src/kudu/tablet/metadata.proto | 2 +
src/kudu/tablet/mvcc-test.cc | 162 +++++++++++++++++++++++++++++++-
src/kudu/tablet/mvcc.cc | 108 ++++++++++-----------
src/kudu/tablet/mvcc.h | 109 ++++++++++++++++-----
src/kudu/tablet/tablet_metadata-test.cc | 3 +-
src/kudu/tablet/tablet_metadata.cc | 10 +-
src/kudu/tablet/tablet_metadata.h | 40 +-------
src/kudu/tablet/txn_metadata.h | 94 ++++++++++++++++++
src/kudu/tablet/txn_participant.cc | 8 +-
12 files changed, 653 insertions(+), 154 deletions(-)
diff --git a/src/kudu/tablet/memrowset-test.cc
b/src/kudu/tablet/memrowset-test.cc
index f26293d..65205e4 100644
--- a/src/kudu/tablet/memrowset-test.cc
+++ b/src/kudu/tablet/memrowset-test.cc
@@ -49,6 +49,7 @@
#include "kudu/tablet/rowset.h"
#include "kudu/tablet/tablet-test-util.h"
#include "kudu/tablet/tablet.pb.h"
+#include "kudu/tablet/txn_metadata.h"
#include "kudu/util/faststring.h"
#include "kudu/util/mem_tracker.h"
#include "kudu/util/memory/arena.h"
@@ -224,6 +225,24 @@ class TestMemRowSet : public KuduTest {
return fetched;
}
+ // Checks the number of rows in the range (-Inf, snap].
+ bool CheckRowsAtSnapshot(MemRowSet* mrs, const MvccSnapshot& snap, int
expected_rows) {
+ RowIteratorOptions opts;
+ opts.snap_to_include = snap;
+ opts.projection = &schema_;
+ return expected_rows == ScanAndCount(mrs, opts);
+ }
+
+ // Checks the number of mutations in the range (snap_to_exc, snap_to_inc].
+ bool CheckRowsBetween(MemRowSet* mrs, const MvccSnapshot& snap_to_exc,
+ const MvccSnapshot& snap_to_inc, int expected_rows) {
+ RowIteratorOptions opts;
+ opts.snap_to_exclude = snap_to_exc;
+ opts.snap_to_include = snap_to_inc;
+ opts.projection = &schema_;
+ return expected_rows == ScanAndCount(mrs, opts);
+ }
+
Status GenerateTestData(MemRowSet* mrs) {
// row 0 - insert
// row 1 - insert, update
@@ -853,5 +872,145 @@ TEST_F(TestMemRowSet, TestCountLiveRows) {
NO_FATALS(CheckLiveRowsCount(6));
}
+// Test that rows inserted as a part of a transaction only get returned if the
+// transaction is committed in the iteration snapshot.
+TEST_F(TestMemRowSet, TestCommittedTransactionalRows) {
+ const int64_t kTxnId = 0;
+ shared_ptr<MemRowSet> mrs;
+ scoped_refptr<TxnMetadata> txn_meta(new TxnMetadata);
+ ASSERT_OK(MemRowSet::Create(/*mrs_id*/0, schema_, kTxnId, txn_meta,
log_anchor_registry_.get(),
+ MemTracker::GetRootTracker(), &mrs));
+ MvccSnapshot latest_with_none_applied = MvccSnapshot(mvcc_);
+ ASSERT_OK(InsertRow(mrs.get(), "hello world", 12345));
+ MvccSnapshot latest_with_one_applied = MvccSnapshot(mvcc_);
+ ASSERT_OK(InsertRow(mrs.get(), "goodbye world", 54321));
+ MvccSnapshot latest_with_two_applied = MvccSnapshot(mvcc_);
+
+ ASSERT_EQ(2, mrs->entry_count());
+
+ // Iterating through the MRS with a snapshot that doesn't have the
+ // transaction committed should yield no rows.
+ NO_FATALS(CheckRowsAtSnapshot(mrs.get(), latest_with_two_applied, 0));
+ NO_FATALS(CheckRowsAtSnapshot(mrs.get(), latest_with_one_applied, 0));
+ NO_FATALS(CheckRowsAtSnapshot(mrs.get(), latest_with_none_applied, 0));
+
+ // Emulate transaction commit by starting an MVCC op timestamp higher than
+ // the transaction so ops with lower timestamps are considered applied.
+ //
+ // Only once we set the commit timestamp in the txn metadata and finish the
+ // op will iteration be able to read any rows.
+ auto mvcc_op_ts = clock_.Now();
+ ScopedOp commit_op(&mvcc_, mvcc_op_ts);
+ txn_meta->set_commit_mvcc_op_timestamp(mvcc_op_ts);
+ MvccSnapshot snap_after_begin_commit =
MvccSnapshot(Timestamp(mvcc_op_ts.value() + 1));
+ MvccSnapshot snap_before_begin_commit =
MvccSnapshot(Timestamp(mvcc_op_ts.value()));
+ MvccSnapshot latest_after_begin_commit = MvccSnapshot(mvcc_);
+ NO_FATALS(CheckRowsAtSnapshot(mrs.get(), snap_after_begin_commit, 0));
+ NO_FATALS(CheckRowsAtSnapshot(mrs.get(), snap_before_begin_commit, 0));
+ NO_FATALS(CheckRowsAtSnapshot(mrs.get(), latest_after_begin_commit, 0));
+
+ auto commit_ts = clock_.Now();
+ commit_op.StartApplying();
+ txn_meta->set_commit_timestamp(commit_ts);
+ commit_op.FinishApplying();
+
+ MvccSnapshot snap_after_commit = MvccSnapshot(Timestamp(commit_ts.value() +
1));
+ MvccSnapshot snap_before_commit = MvccSnapshot(commit_ts);
+ MvccSnapshot latest_after_commit = MvccSnapshot(mvcc_);
+ NO_FATALS(CheckRowsAtSnapshot(mrs.get(), snap_after_commit, 2));
+ NO_FATALS(CheckRowsAtSnapshot(mrs.get(), snap_before_commit, 0));
+ NO_FATALS(CheckRowsAtSnapshot(mrs.get(), latest_after_commit, 2));
+
+ // Scanning for the rows inserted between the snapshots that didn't have
+ // the commit and the snapshot with the commit, we should see the rows.
+ NO_FATALS(CheckRowsBetween(mrs.get(), latest_with_two_applied,
latest_after_commit, 2));
+ NO_FATALS(CheckRowsBetween(mrs.get(), latest_with_one_applied,
latest_after_commit, 2));
+ NO_FATALS(CheckRowsBetween(mrs.get(), latest_with_none_applied,
latest_after_commit, 2));
+ NO_FATALS(CheckRowsBetween(mrs.get(), snap_before_commit,
latest_after_commit, 2));
+
+ // If we scan for rows in between snapshots with the commit, we shouldn't
+ // see anything.
+ NO_FATALS(CheckRowsBetween(mrs.get(), snap_after_commit,
latest_after_commit, 0));
+ NO_FATALS(CheckRowsBetween(mrs.get(), latest_after_commit,
latest_after_commit, 0));
+}
+
+// Like the above test, but testing the behavior when aborting before beginning
+// to commit.
+TEST_F(TestMemRowSet, TestAbortBeforeBeginningToCommitTransactionalRows) {
+ const int64_t kTxnId = 0;
+ shared_ptr<MemRowSet> mrs;
+ scoped_refptr<TxnMetadata> txn_meta(new TxnMetadata);
+ ASSERT_OK(MemRowSet::Create(/*mrs_id*/0, schema_, kTxnId, txn_meta,
log_anchor_registry_.get(),
+ MemTracker::GetRootTracker(), &mrs));
+ MvccSnapshot latest_with_none_applied = MvccSnapshot(mvcc_);
+ ASSERT_OK(InsertRow(mrs.get(), "hello world", 12345));
+ MvccSnapshot latest_with_one_applied = MvccSnapshot(mvcc_);
+ ASSERT_OK(InsertRow(mrs.get(), "goodbye world", 54321));
+ MvccSnapshot latest_with_two_applied = MvccSnapshot(mvcc_);
+
+ ASSERT_EQ(2, mrs->entry_count());
+ NO_FATALS(CheckRowsAtSnapshot(mrs.get(), latest_with_two_applied, 0));
+ NO_FATALS(CheckRowsAtSnapshot(mrs.get(), latest_with_one_applied, 0));
+ NO_FATALS(CheckRowsAtSnapshot(mrs.get(), latest_with_none_applied, 0));
+ txn_meta->set_aborted();
+ MvccSnapshot latest_after_abort = MvccSnapshot(mvcc_);
+
+ // No matter how we iterate, we should see no rows.
+ const vector<MvccSnapshot> all_snaps = {
+ latest_with_none_applied, latest_with_one_applied,
latest_with_two_applied, latest_after_abort
+ };
+ for (const auto& left_snap : all_snaps) {
+ for (const auto& right_snap : all_snaps) {
+ NO_FATALS(CheckRowsBetween(mrs.get(), left_snap, right_snap, 0));
+ }
+ }
+}
+
+// Like the above test, but testing the behavior when aborting after beginning
+// to commit.
+TEST_F(TestMemRowSet, TestAbortAfterBeginningToCommitTransactionalRows) {
+ const int64_t kTxnId = 0;
+ shared_ptr<MemRowSet> mrs;
+ scoped_refptr<TxnMetadata> txn_meta(new TxnMetadata);
+ ASSERT_OK(MemRowSet::Create(/*mrs_id*/0, schema_, kTxnId, txn_meta,
log_anchor_registry_.get(),
+ MemTracker::GetRootTracker(), &mrs));
+ MvccSnapshot latest_with_none_applied = MvccSnapshot(mvcc_);
+ ASSERT_OK(InsertRow(mrs.get(), "hello world", 12345));
+ MvccSnapshot latest_with_one_applied = MvccSnapshot(mvcc_);
+ ASSERT_OK(InsertRow(mrs.get(), "goodbye world", 54321));
+ MvccSnapshot latest_with_two_applied = MvccSnapshot(mvcc_);
+
+ ASSERT_EQ(2, mrs->entry_count());
+ NO_FATALS(CheckRowsAtSnapshot(mrs.get(), latest_with_two_applied, 0));
+ NO_FATALS(CheckRowsAtSnapshot(mrs.get(), latest_with_one_applied, 0));
+ NO_FATALS(CheckRowsAtSnapshot(mrs.get(), latest_with_none_applied, 0));
+
+ // Start beginning to commit, but abort without finishing the commit.
+ auto mvcc_op_ts = clock_.Now();
+ ScopedOp commit_op(&mvcc_, mvcc_op_ts);
+ txn_meta->set_commit_mvcc_op_timestamp(mvcc_op_ts);
+ MvccSnapshot snap_after_begin_commit =
MvccSnapshot(Timestamp(mvcc_op_ts.value() + 1));
+ MvccSnapshot snap_before_begin_commit =
MvccSnapshot(Timestamp(mvcc_op_ts.value()));
+ MvccSnapshot latest_after_begin_commit = MvccSnapshot(mvcc_);
+ NO_FATALS(CheckRowsAtSnapshot(mrs.get(), snap_after_begin_commit, 0));
+ NO_FATALS(CheckRowsAtSnapshot(mrs.get(), snap_before_begin_commit, 0));
+ NO_FATALS(CheckRowsAtSnapshot(mrs.get(), latest_after_begin_commit, 0));
+ txn_meta->set_aborted();
+ commit_op.Abort();
+ MvccSnapshot latest_after_abort = MvccSnapshot(mvcc_);
+
+ // No matter how we iterate, we should see no rows.
+ NO_FATALS(CheckRowsAtSnapshot(mrs.get(), latest_after_abort, 0));
+ const vector<MvccSnapshot> all_snaps = {
+ latest_with_none_applied, latest_with_one_applied,
latest_with_two_applied,
+ snap_after_begin_commit, snap_before_begin_commit,
latest_after_begin_commit
+ };
+ for (const auto& left_snap : all_snaps) {
+ for (const auto& right_snap : all_snaps) {
+ NO_FATALS(CheckRowsBetween(mrs.get(), left_snap, right_snap, 0));
+ }
+ }
+}
+
} // namespace tablet
} // namespace kudu
diff --git a/src/kudu/tablet/memrowset.cc b/src/kudu/tablet/memrowset.cc
index 11436e6..3f1796f 100644
--- a/src/kudu/tablet/memrowset.cc
+++ b/src/kudu/tablet/memrowset.cc
@@ -22,6 +22,7 @@
#include <utility>
#include <vector>
+#include <boost/none_t.hpp>
#include <gflags/gflags.h>
#include <glog/logging.h>
@@ -42,6 +43,7 @@
#include "kudu/tablet/mutation.h"
#include "kudu/tablet/mvcc.h"
#include "kudu/tablet/tablet.pb.h"
+#include "kudu/tablet/txn_metadata.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/mem_tracker.h"
#include "kudu/util/memory/memory.h"
@@ -50,18 +52,18 @@ DEFINE_bool(mrs_use_codegen, true, "whether the memrowset
should use code "
"generation for iteration");
TAG_FLAG(mrs_use_codegen, hidden);
+using kudu::consensus::OpId;
+using kudu::fs::IOContext;
+using kudu::log::LogAnchorRegistry;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
using std::vector;
-
-namespace kudu { namespace tablet {
-
-using consensus::OpId;
-using fs::IOContext;
-using log::LogAnchorRegistry;
using strings::Substitute;
+namespace kudu {
+namespace tablet {
+
static const int kInitialArenaSize = 16;
bool MRSRow::IsGhost() const {
@@ -95,17 +97,37 @@ Status MemRowSet::Create(int64_t id,
shared_ptr<MemTracker> parent_tracker,
shared_ptr<MemRowSet>* mrs) {
auto local_mrs(MemRowSet::make_shared(
- id, schema, log_anchor_registry, std::move(parent_tracker)));
+ id, schema, /*txn_id*/boost::none, /*txn_metadata*/nullptr,
log_anchor_registry,
+ std::move(parent_tracker)));
+ *mrs = std::move(local_mrs);
+ return Status::OK();
+}
+
+Status MemRowSet::Create(int64_t id,
+ const Schema &schema,
+ int64_t txn_id,
+ scoped_refptr<TxnMetadata> txn_metadata,
+ LogAnchorRegistry* log_anchor_registry,
+ shared_ptr<MemTracker> parent_tracker,
+ shared_ptr<MemRowSet>* mrs) {
+ auto local_mrs(MemRowSet::make_shared(
+ id, schema, boost::make_optional(txn_id), std::move(txn_metadata),
log_anchor_registry,
+ std::move(parent_tracker)));
*mrs = std::move(local_mrs);
return Status::OK();
}
+
MemRowSet::MemRowSet(int64_t id,
- const Schema &schema,
+ const Schema& schema,
+ boost::optional<int64_t> txn_id,
+ scoped_refptr<TxnMetadata> txn_metadata,
LogAnchorRegistry* log_anchor_registry,
shared_ptr<MemTracker> parent_tracker)
: id_(id),
schema_(schema),
+ txn_id_(txn_id),
+ txn_metadata_(std::move(txn_metadata)),
allocator_(new MemoryTrackingBufferAllocator(
HeapBufferAllocator::Get(),
CreateMemTrackerForMemRowSet(id, std::move(parent_tracker)))),
@@ -130,8 +152,13 @@ Status MemRowSet::DebugDump(vector<string> *lines) {
while (iter->HasNext()) {
MRSRow row = iter->GetCurrentRow();
LOG_STRING(INFO, lines)
- << "@" << row.insertion_timestamp() << ": row "
- << schema_.DebugRow(row)
+ << "@" << row.insertion_timestamp()
+ << string(txn_id_ ?
+ Substitute("(txn_id=$0$1)", *txn_id_,
+ DCHECK_NOTNULL(txn_metadata_)->commit_timestamp() ?
+ Substitute("@$0",
txn_metadata_->commit_timestamp()->value()) : "") :
+ "")
+ << ": row " << schema_.DebugRow(row)
<< " mutations=" << Mutation::StringifyMutationList(schema_,
row.header_->redo_head)
<< std::endl;
iter->Next();
@@ -181,8 +208,8 @@ Status MemRowSet::Insert(Timestamp timestamp,
RETURN_NOT_OK(mrsrow.CopyRow(row, arena_.get()));
CHECK(mutation.Insert(mrsrow_slice))
- << "Expected to be able to insert, since the prepared mutation "
- << "succeeded!";
+ << "Expected to be able to insert, since the prepared mutation "
+ << "succeeded!";
}
anchorer_.AnchorIfMinimum(op_id.index());
@@ -513,15 +540,23 @@ Status MemRowSet::Iterator::FetchRows(RowBlock* dst,
size_t* fetched) {
// 2. Both 'snap_to exclude' and 'snap_to_include' are set. The time range
// is [snap_to_exclude, snap_to_include).
//
- // If the row's insertion timestamp is committed in 'snap_to_exclude', it
- // means the insertion was outside this iterator's time range (i.e. the
- // insert was "excluded"). However, subsequent mutations may be inside the
- // time range, so we must still project the row and walk its mutation list.
+ // If a non-transactional row's insertion timestamp is applied in
+ // 'snap_to_exclude' or a transactional row's commit timestamp is committed
+ // in 'snap_to_exclude', the insertion was outside this iterator's time
+ // range (i.e. the insert was "excluded"). However, subsequent mutations
+ // may be inside the time range, so we must still project the row and walk
+ // its mutation list.
+ const auto& txn_meta = memrowset_->txn_metadata();
bool insert_excluded = opts_.snap_to_exclude &&
-
opts_.snap_to_exclude->IsApplied(row.insertion_timestamp());
+ // TODO(awong): if we find this to be too slow, we should be able to
+ // compute IsCommitted() once per iterator at construction time.
+ (txn_meta ? opts_.snap_to_exclude->IsCommitted(*txn_meta.get()) :
+
opts_.snap_to_exclude->IsApplied(row.insertion_timestamp()));
bool unset_in_sel_vector;
ApplyStatus apply_status;
- if (insert_excluded ||
opts_.snap_to_include.IsApplied(row.insertion_timestamp())) {
+ if (insert_excluded ||
+ (txn_meta ? opts_.snap_to_include.IsCommitted(*txn_meta.get()) :
+
opts_.snap_to_include.IsApplied(row.insertion_timestamp()))) {
RETURN_NOT_OK(projector_->ProjectRowForRead(row, &dst_row,
dst->arena()));
// Roll-forward MVCC for committed updates.
@@ -665,6 +700,7 @@ Status MemRowSet::Iterator::GetCurrentRow(RowBlockRow*
dst_row,
Arena* mutation_arena,
Timestamp* insertion_timestamp) {
+ DCHECK(boost::none == opts_.snap_to_exclude);
DCHECK(redo_head != nullptr);
// Get the row from the MemRowSet. It may have a different schema from the
iterator projection.
diff --git a/src/kudu/tablet/memrowset.h b/src/kudu/tablet/memrowset.h
index 3b3fbcb..3e5ebfa 100644
--- a/src/kudu/tablet/memrowset.h
+++ b/src/kudu/tablet/memrowset.h
@@ -37,6 +37,7 @@
#include "kudu/consensus/log_anchor_registry.h"
#include "kudu/gutil/atomicops.h"
#include "kudu/gutil/macros.h"
+#include "kudu/gutil/ref_counted.h"
#include "kudu/tablet/concurrent_btree.h"
#include "kudu/tablet/rowset.h"
#include "kudu/tablet/rowset_metadata.h"
@@ -89,6 +90,7 @@ class CompactionInput;
class MemRowSet;
class Mutation;
class OperationResultPB;
+class TxnMetadata;
// The value stored in the CBTree for a single row.
class MRSRow {
@@ -210,11 +212,20 @@ class MemRowSet : public RowSet,
class Iterator;
static Status Create(int64_t id,
- const Schema &schema,
+ const Schema& schema,
log::LogAnchorRegistry* log_anchor_registry,
std::shared_ptr<MemTracker> parent_tracker,
std::shared_ptr<MemRowSet>* mrs);
+ // Create() variant for a MRS that get inserted to by a single transaction of
+ // the given transaction ID and metadata.
+ static Status Create(int64_t id,
+ const Schema& schema,
+ int64_t txn_id,
+ scoped_refptr<TxnMetadata> txn_metadata,
+ log::LogAnchorRegistry* log_anchor_registry,
+ std::shared_ptr<MemTracker> parent_tracker,
+ std::shared_ptr<MemRowSet>* mrs);
~MemRowSet();
// Insert a new row into the memrowset.
@@ -231,7 +242,6 @@ class MemRowSet : public RowSet,
const ConstContiguousRow& row,
const consensus::OpId& op_id);
-
// Update or delete an existing row in the memrowset.
//
// Returns Status::NotFound if the row doesn't exist.
@@ -348,6 +358,19 @@ class MemRowSet : public RowSet,
return id_;
}
+ // Transaction ID that inserted the rows of this MRS. 'none' if the rows in
+ // this MRS were not inserted as a part of a transaction.
+ const boost::optional<int64_t>& txn_id() const {
+ return txn_id_;
+ }
+
+ // Transaction metadata of the transaction that inserted the rows of this
+ // MRS. 'nullptr' if the rows in this MRS were not inserted as a part of a
+ // transaction.
+ const scoped_refptr<TxnMetadata>& txn_metadata() const {
+ return txn_metadata_;
+ }
+
std::shared_ptr<RowSetMetadata> metadata() override {
return std::shared_ptr<RowSetMetadata>(
reinterpret_cast<RowSetMetadata *>(NULL));
@@ -431,7 +454,9 @@ class MemRowSet : public RowSet,
protected:
MemRowSet(int64_t id,
- const Schema &schema,
+ const Schema& schema,
+ boost::optional<int64_t> txn_id,
+ scoped_refptr<TxnMetadata> txn_metadata,
log::LogAnchorRegistry* log_anchor_registry,
std::shared_ptr<MemTracker> parent_tracker);
@@ -447,8 +472,12 @@ class MemRowSet : public RowSet,
typedef btree::CBTree<MSBTreeTraits> MSBTree;
int64_t id_;
-
const Schema schema_;
+
+ // The transaction ID that inserted into this MemRowSet, and its
corresponding metadata.
+ boost::optional<int64_t> txn_id_;
+ scoped_refptr<TxnMetadata> txn_metadata_;
+
std::shared_ptr<MemoryTrackingBufferAllocator> allocator_;
std::shared_ptr<ThreadSafeMemoryTrackingArena> arena_;
@@ -526,7 +555,8 @@ class MemRowSet::Iterator : public RowwiseIterator {
return MRSRow(memrowset_.get(), mrsrow_data);
}
- // Copy the current MRSRow to the 'dst_row' provided using the iterator
projection schema.
+ // Copy the current MRSRow to the 'dst_row' provided using the iterator
+ // projection schema. Used in compactions.
Status GetCurrentRow(RowBlockRow* dst_row,
Arena* row_arena,
Mutation** redo_head,
diff --git a/src/kudu/tablet/metadata.proto b/src/kudu/tablet/metadata.proto
index 00e28ff..e59ee80 100644
--- a/src/kudu/tablet/metadata.proto
+++ b/src/kudu/tablet/metadata.proto
@@ -87,6 +87,8 @@ message TxnMetadataPB {
// set.
optional int64 commit_timestamp = 2;
+ // TODO(awong): persist the op timestamp for the commit's MVCC op.
+
// TODO(awong): add an owner field to this for uncommitted transactions.
}
diff --git a/src/kudu/tablet/mvcc-test.cc b/src/kudu/tablet/mvcc-test.cc
index 6cf152e..a47eec1 100644
--- a/src/kudu/tablet/mvcc-test.cc
+++ b/src/kudu/tablet/mvcc-test.cc
@@ -30,6 +30,9 @@
#include "kudu/clock/hybrid_clock.h"
#include "kudu/clock/logical_clock.h"
#include "kudu/common/timestamp.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/tablet/txn_metadata.h"
+#include "kudu/util/barrier.h"
#include "kudu/util/locks.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
@@ -39,6 +42,7 @@
using std::thread;
using std::unique_ptr;
+using std::vector;
METRIC_DECLARE_entity(server);
@@ -226,7 +230,7 @@ TEST_F(MvccTest, TestOutOfOrderOps) {
// Tests starting ops at a point-in-time in the past and applying them while
// adjusting the new op timestamp lower bound.
-TEST_F(MvccTest, TestSafeTimeWithOutOfOrderTxns) {
+TEST_F(MvccTest, TestSafeTimeWithOutOfOrderOps) {
MvccManager mgr;
// Set the clock to some time in the "future".
@@ -606,8 +610,7 @@ TEST_F(MvccTest, TestDontWaitAfterClose) {
// Test that if we abort an op we don't advance the new op lower bound and
// don't add the op to the applied set.
-TEST_F(MvccTest, TestTxnAbort) {
-
+TEST_F(MvccTest, TestOpAbort) {
MvccManager mgr;
// Ops with timestamps 1 through 3
@@ -781,5 +784,158 @@ TEST_F(MvccTest, TestCorrectInitWithNoOps) {
EXPECT_EQ(snap2.applied_timestamps_.size(), 0);
}
+class TransactionMvccTest : public MvccTest {
+ public:
+ // Simulates successfully committing the given transaction by starting an
MVCC
+ // op to track the commit, and setting a higher commit timestamp while that
+ // op is applying. Returns the commit timestmap.
+ Timestamp Commit(MvccManager* mgr, TxnMetadata* txn_meta) {
+ // Start an op to begin committing.
+ Timestamp ts = clock_.Now();
+ ScopedOp op(mgr, ts);
+ txn_meta->set_commit_mvcc_op_timestamp(ts);
+ // Finalize the commit with some future timestamp.
+ op.StartApplying();
+ Timestamp commit_ts = Timestamp(ts.value() + 10);
+ txn_meta->set_commit_timestamp(commit_ts);
+ op.FinishApplying();
+ return commit_ts;
+ }
+
+ // Simulates aborting a transaction by beginning to commit, and then
aborting.
+ void AbortAfterBeginCommit(MvccManager* mgr, TxnMetadata* txn_meta) {
+ // Start an op to begin committing.
+ Timestamp ts = clock_.Now();
+ ScopedOp op(mgr, ts);
+ txn_meta->set_commit_mvcc_op_timestamp(ts);
+ // Abort the commit.
+ txn_meta->set_aborted();
+ op.Abort();
+ }
+
+ // Simulates aborting without starting any ops.
+ static void AbortBeforeBeginCommit(TxnMetadata* txn_meta) {
+ txn_meta->set_aborted();
+ }
+};
+
+// Test that timestamp snapshots before and after committing correctly
+// determine whether transactions are committed.
+TEST_F(TransactionMvccTest, TestTimestampSnapshot) {
+ MvccManager mgr;
+ scoped_refptr<TxnMetadata> committed_txn_meta(new TxnMetadata);
+ scoped_refptr<TxnMetadata> aborted_before_begin_commit_txn_meta(new
TxnMetadata);
+ scoped_refptr<TxnMetadata> aborted_after_begin_commit_txn_meta(new
TxnMetadata);
+ Timestamp initial_ts = clock_.Now();
+ MvccSnapshot initial_snap(initial_ts);
+ ASSERT_FALSE(initial_snap.IsCommitted(*committed_txn_meta.get()));
+
ASSERT_FALSE(initial_snap.IsCommitted(*aborted_before_begin_commit_txn_meta.get()));
+
ASSERT_FALSE(initial_snap.IsCommitted(*aborted_after_begin_commit_txn_meta.get()));
+
+ AbortAfterBeginCommit(&mgr, aborted_before_begin_commit_txn_meta.get());
+ AbortAfterBeginCommit(&mgr, aborted_after_begin_commit_txn_meta.get());
+ Timestamp commit_ts = Commit(&mgr, committed_txn_meta.get());
+
+ // Snapshots taken right before the commit timestamp should not return that
+ // the transaction was committed.
+ MvccSnapshot before_commit_snap(commit_ts);
+ ASSERT_FALSE(before_commit_snap.IsCommitted(*committed_txn_meta.get()));
+
ASSERT_FALSE(before_commit_snap.IsCommitted(*aborted_before_begin_commit_txn_meta.get()));
+
ASSERT_FALSE(before_commit_snap.IsCommitted(*aborted_after_begin_commit_txn_meta.get()));
+
+ // Snapshots taken right after the commit timestamp shouldn't return that
+ // aborted transactions were committed.
+ MvccSnapshot after_commit_snap(Timestamp(commit_ts.value() + 1));
+ ASSERT_TRUE(after_commit_snap.IsCommitted(*committed_txn_meta.get()));
+
ASSERT_FALSE(after_commit_snap.IsCommitted(*aborted_before_begin_commit_txn_meta.get()));
+
ASSERT_FALSE(after_commit_snap.IsCommitted(*aborted_after_begin_commit_txn_meta.get()));
+}
+
+// Test that the latest snapshots before and after committing or aborting
+// correctly determine whether transactions are committed.
+TEST_F(TransactionMvccTest, TestLatestSnapshot) {
+ MvccManager mgr;
+ {
+ scoped_refptr<TxnMetadata> txn_meta(new TxnMetadata);
+ MvccSnapshot latest_before_commit(mgr);
+ ASSERT_FALSE(latest_before_commit.IsCommitted(*txn_meta.get()));
+ Commit(&mgr, txn_meta.get());
+ MvccSnapshot latest_after_commit(mgr);
+ ASSERT_FALSE(latest_before_commit.IsCommitted(*txn_meta.get()));
+ ASSERT_TRUE(latest_after_commit.IsCommitted(*txn_meta.get()));
+ }
+ {
+ scoped_refptr<TxnMetadata> txn_meta(new TxnMetadata);
+ MvccSnapshot latest_before_abort(mgr);
+ ASSERT_FALSE(latest_before_abort.IsCommitted(*txn_meta.get()));
+ AbortAfterBeginCommit(&mgr, txn_meta.get());
+ MvccSnapshot latest_after_abort(mgr);
+ ASSERT_FALSE(latest_before_abort.IsCommitted(*txn_meta.get()));
+ ASSERT_FALSE(latest_after_abort.IsCommitted(*txn_meta.get()));
+ }
+ {
+ scoped_refptr<TxnMetadata> txn_meta(new TxnMetadata);
+ MvccSnapshot latest_before_abort(mgr);
+ ASSERT_FALSE(latest_before_abort.IsCommitted(*txn_meta.get()));
+ AbortBeforeBeginCommit(txn_meta.get());
+ MvccSnapshot latest_after_abort(mgr);
+ ASSERT_FALSE(latest_before_abort.IsCommitted(*txn_meta.get()));
+ ASSERT_FALSE(latest_after_abort.IsCommitted(*txn_meta.get()));
+ }
+}
+
+enum OpType {
+ kCommit,
+ kAbortAfterBeginCommit,
+ kAbortBeforeBeginCommit,
+};
+
+class ParamedTransactionMvccTest : public TransactionMvccTest,
+ public
::testing::WithParamInterface<OpType> {};
+
+// Test that snapshots taken concurrently with commit or abort do not change
+// commit status.
+TEST_P(ParamedTransactionMvccTest, TestConcurrentLatestSnapshots) {
+ constexpr const int kNumThreads = 10;
+ Barrier b(kNumThreads + 1);
+ vector<thread> threads;
+ vector<MvccSnapshot> snaps(kNumThreads);
+ // NOTE: we really only care about bools, but vector<bool> isn't thread-safe.
+ vector<int> is_committed(kNumThreads);
+ scoped_refptr<TxnMetadata> txn_meta(new TxnMetadata);
+ MvccManager mgr;
+ for (int i = 0; i < kNumThreads; i++) {
+ threads.emplace_back([&, i] {
+ b.Wait();
+ // Take a snapshot and immediately check whether the transaction is
+ // committed.
+ snaps[i] = MvccSnapshot(mgr);
+ is_committed[i] = snaps[i].IsCommitted(*txn_meta.get());
+ });
+ }
+ b.Wait();
+ switch (GetParam()) {
+ case kCommit:
+ Commit(&mgr, txn_meta.get());
+ break;
+ case kAbortAfterBeginCommit:
+ AbortAfterBeginCommit(&mgr, txn_meta.get());
+ break;
+ case kAbortBeforeBeginCommit:
+ AbortBeforeBeginCommit(txn_meta.get());
+ break;
+ }
+ for (auto& t : threads) {
+ t.join();
+ }
+ // Take the collected snapshots and evaluate them again against the
+ // transaction metadata. The commit status should not have changed.
+ for (int i = 0; i < kNumThreads; i++) {
+ ASSERT_EQ(is_committed[i], snaps[i].IsCommitted(*txn_meta.get()));
+ }
+}
+INSTANTIATE_TEST_CASE_P(Op, ParamedTransactionMvccTest,
+ ::testing::Values(kCommit, kAbortAfterBeginCommit,
kAbortBeforeBeginCommit));
+
} // namespace tablet
} // namespace kudu
diff --git a/src/kudu/tablet/mvcc.cc b/src/kudu/tablet/mvcc.cc
index cca1054..58ed9f5 100644
--- a/src/kudu/tablet/mvcc.cc
+++ b/src/kudu/tablet/mvcc.cc
@@ -27,7 +27,7 @@
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/port.h"
-#include "kudu/gutil/strings/strcat.h"
+#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/countdown_latch.h"
#include "kudu/util/debug/trace_event.h"
@@ -49,8 +49,9 @@ using strings::Substitute;
MvccManager::MvccManager()
: new_op_timestamp_exc_lower_bound_(Timestamp::kMin),
- earliest_in_flight_(Timestamp::kMax),
+ earliest_op_in_flight_(Timestamp::kMax),
open_(true) {
+ cur_snap_.type_ = MvccSnapshot::kLatest;
cur_snap_.all_applied_before_ = Timestamp::kInitialTimestamp;
cur_snap_.none_applied_at_or_after_ = Timestamp::kInitialTimestamp;
}
@@ -70,7 +71,7 @@ void MvccManager::StartOp(Timestamp timestamp) {
"timestamp: $0, current MVCC snapshot: $1",
timestamp.ToString(), cur_snap_.ToString());
CHECK(InitOpUnlocked(timestamp)) <<
- Substitute("There is already a op with timestamp: $0 in flight, or "
+ Substitute("There is already an op with timestamp: $0 in flight, or "
"this timestamp is below or equal to the exclusive lower "
"bound for new op timestamps. Current lower bound: "
"$1, current MVCC snapshot: $2", timestamp.ToString(),
@@ -80,18 +81,17 @@ void MvccManager::StartOp(Timestamp timestamp) {
void MvccManager::StartApplyingOp(Timestamp timestamp) {
std::lock_guard<LockType> l(lock_);
- auto it = timestamps_in_flight_.find(timestamp.value());
- if (PREDICT_FALSE(it == timestamps_in_flight_.end())) {
+ auto it = ops_in_flight_.find(timestamp.value());
+ if (PREDICT_FALSE(it == ops_in_flight_.end())) {
LOG(FATAL) << "Cannot mark timestamp " << timestamp.ToString() << " as
APPLYING: "
<< "not in the in-flight map.";
}
- TxnState cur_state = it->second;
+ OpState cur_state = it->second;
if (PREDICT_FALSE(cur_state != RESERVED)) {
LOG(FATAL) << "Cannot mark timestamp " << timestamp.ToString() << " as
APPLYING: "
<< "wrong state: " << cur_state;
}
-
it->second = APPLYING;
}
@@ -102,18 +102,16 @@ bool MvccManager::InitOpUnlocked(const Timestamp&
timestamp) {
return false;
}
- if (timestamp < earliest_in_flight_) {
- earliest_in_flight_ = timestamp;
- }
+ earliest_op_in_flight_ = std::min(timestamp, earliest_op_in_flight_);
- return InsertIfNotPresent(×tamps_in_flight_, timestamp.value(),
RESERVED);
+ return InsertIfNotPresent(&ops_in_flight_, timestamp.value(), RESERVED);
}
void MvccManager::AbortOp(Timestamp timestamp) {
std::lock_guard<LockType> l(lock_);
// Remove from our in-flight list.
- TxnState old_state = RemoveInFlightAndGetStateUnlocked(timestamp);
+ OpState old_state = RemoveInFlightAndGetStateUnlocked(timestamp);
// If the tablet is shutting down, we can ignore the state of the
// ops.
@@ -128,7 +126,7 @@ void MvccManager::AbortOp(Timestamp timestamp) {
// If we're aborting the earliest op that was in flight,
// update our cached value.
- if (earliest_in_flight_ == timestamp) {
+ if (earliest_op_in_flight_ == timestamp) {
AdvanceEarliestInFlightTimestamp();
}
}
@@ -150,45 +148,45 @@ void MvccManager::FinishApplyingOp(Timestamp timestamp) {
}
}
-MvccManager::TxnState MvccManager::RemoveInFlightAndGetStateUnlocked(Timestamp
ts) {
+MvccManager::OpState MvccManager::RemoveInFlightAndGetStateUnlocked(Timestamp
ts) {
DCHECK(lock_.is_locked());
- auto it = timestamps_in_flight_.find(ts.value());
- if (it == timestamps_in_flight_.end()) {
+ auto it = ops_in_flight_.find(ts.value());
+ if (it == ops_in_flight_.end()) {
LOG(FATAL) << "Trying to remove timestamp which isn't in the in-flight
set: "
<< ts.ToString();
}
- TxnState state = it->second;
- timestamps_in_flight_.erase(it);
+ OpState state = it->second;
+ ops_in_flight_.erase(it);
return state;
}
void MvccManager::ApplyOpUnlocked(Timestamp timestamp,
- bool* was_earliest_in_flight) {
- *was_earliest_in_flight = earliest_in_flight_ == timestamp;
+ bool* was_earliest_op_in_flight) {
+ *was_earliest_op_in_flight = earliest_op_in_flight_ == timestamp;
// Remove from our in-flight list.
- TxnState old_state = RemoveInFlightAndGetStateUnlocked(timestamp);
+ OpState old_state = RemoveInFlightAndGetStateUnlocked(timestamp);
CHECK_EQ(old_state, APPLYING)
- << "Trying to apply an op which never entered APPLYING state: "
- << timestamp.ToString() << " state=" << old_state;
+ << "Trying to apply an op which never entered APPLYING state: "
+ << timestamp.ToString() << " state=" << old_state;
// Add to snapshot's applied list
cur_snap_.AddAppliedTimestamp(timestamp);
// If we're applying the earliest op that was in flight, update our cached
// value.
- if (*was_earliest_in_flight) {
+ if (*was_earliest_op_in_flight) {
AdvanceEarliestInFlightTimestamp();
}
}
void MvccManager::AdvanceEarliestInFlightTimestamp() {
- if (timestamps_in_flight_.empty()) {
- earliest_in_flight_ = Timestamp::kMax;
+ if (ops_in_flight_.empty()) {
+ earliest_op_in_flight_ = Timestamp::kMax;
} else {
- earliest_in_flight_ =
Timestamp(std::min_element(timestamps_in_flight_.begin(),
-
timestamps_in_flight_.end())->first);
+ earliest_op_in_flight_ = Timestamp(std::min_element(ops_in_flight_.begin(),
+
ops_in_flight_.end())->first);
}
}
@@ -256,8 +254,8 @@ void MvccManager::AdjustCleanTimeUnlocked() {
// In either case, we have to add the newly applied ts only if it remains
// higher than the new watermark.
- if (earliest_in_flight_ < new_op_timestamp_exc_lower_bound_) {
- cur_snap_.all_applied_before_ = earliest_in_flight_;
+ if (earliest_op_in_flight_ < new_op_timestamp_exc_lower_bound_) {
+ cur_snap_.all_applied_before_ = earliest_op_in_flight_;
} else {
cur_snap_.all_applied_before_ = new_op_timestamp_exc_lower_bound_;
}
@@ -355,13 +353,13 @@ bool MvccManager::AreAllOpsAppliedUnlocked(Timestamp ts)
const {
// We might not have moved 'cur_snap_.all_applied_before_' (the clean time)
but 'ts'
// might still come before any possible in-flights.
- return ts < earliest_in_flight_;
+ return ts < earliest_op_in_flight_;
}
bool MvccManager::AnyApplyingAtOrBeforeUnlocked(Timestamp ts) const {
// TODO(todd) this is not actually checking on the applying ops, it's
checking on
// _all in-flight_. Is this a bug?
- for (const InFlightMap::value_type entry : timestamps_in_flight_) {
+ for (const InFlightOpsMap::value_type entry : ops_in_flight_) {
if (entry.first <= ts.value()) {
return true;
}
@@ -375,8 +373,8 @@ void MvccManager::TakeSnapshot(MvccSnapshot *snap) const {
}
Status MvccManager::WaitForSnapshotWithAllApplied(Timestamp timestamp,
- MvccSnapshot* snapshot,
- const MonoTime& deadline)
const {
+ MvccSnapshot* snapshot,
+ const MonoTime& deadline)
const {
TRACE_EVENT0("tablet", "MvccManager::WaitForSnapshotWithAllApplied");
RETURN_NOT_OK(WaitUntil(ALL_APPLIED, timestamp, deadline));
@@ -392,7 +390,7 @@ Status MvccManager::WaitForApplyingOpsToApply() const {
Timestamp wait_for = Timestamp::kMin;
{
std::lock_guard<LockType> l(lock_);
- for (const InFlightMap::value_type entry : timestamps_in_flight_) {
+ for (const auto& entry : ops_in_flight_) {
if (entry.second == APPLYING) {
wait_for = Timestamp(std::max(entry.first, wait_for.value()));
}
@@ -416,8 +414,8 @@ Timestamp MvccManager::GetCleanTimestamp() const {
void MvccManager::GetApplyingOpsTimestamps(std::vector<Timestamp>* timestamps)
const {
std::lock_guard<LockType> l(lock_);
- timestamps->reserve(timestamps_in_flight_.size());
- for (const InFlightMap::value_type entry : timestamps_in_flight_) {
+ timestamps->reserve(ops_in_flight_.size());
+ for (const auto& entry : ops_in_flight_) {
if (entry.second == APPLYING) {
timestamps->push_back(Timestamp(entry.first));
}
@@ -433,18 +431,20 @@ MvccManager::~MvccManager() {
////////////////////////////////////////////////////////////
MvccSnapshot::MvccSnapshot()
- : all_applied_before_(Timestamp::kInitialTimestamp),
+ : type_(MvccSnapshot::kTimestamp),
+ all_applied_before_(Timestamp::kInitialTimestamp),
none_applied_at_or_after_(Timestamp::kInitialTimestamp) {
}
-MvccSnapshot::MvccSnapshot(const MvccManager &manager) {
+MvccSnapshot::MvccSnapshot(const MvccManager& manager) {
manager.TakeSnapshot(this);
}
MvccSnapshot::MvccSnapshot(const Timestamp& timestamp)
- : all_applied_before_(timestamp),
+ : type_(MvccSnapshot::kTimestamp),
+ all_applied_before_(timestamp),
none_applied_at_or_after_(timestamp) {
- }
+}
MvccSnapshot MvccSnapshot::CreateSnapshotIncludingAllOps() {
return MvccSnapshot(Timestamp::kMax);
@@ -476,25 +476,9 @@ bool MvccSnapshot::MayHaveNonAppliedOpsAtOrBefore(const
Timestamp& timestamp) co
}
std::string MvccSnapshot::ToString() const {
- std::string ret("MvccSnapshot[applied={T|");
-
- if (applied_timestamps_.size() == 0) {
- StrAppend(&ret, "T < ", all_applied_before_.ToString(),"}]");
- return ret;
- }
- StrAppend(&ret, "T < ", all_applied_before_.ToString(),
- " or (T in {");
-
- bool first = true;
- for (Timestamp::val_type t : applied_timestamps_) {
- if (!first) {
- ret.push_back(',');
- }
- first = false;
- StrAppend(&ret, t);
- }
- ret.append("})}]");
- return ret;
+ return Substitute("MvccSnapshot[applied={T|T < $0$1}]",
all_applied_before_.ToString(),
+ applied_timestamps_.empty() ? "" :
+ Substitute(" or (T in {$0})", JoinInts(applied_timestamps_, ",")));
}
void MvccSnapshot::AddAppliedTimestamps(const std::vector<Timestamp>&
timestamps) {
@@ -504,6 +488,7 @@ void MvccSnapshot::AddAppliedTimestamps(const
std::vector<Timestamp>& timestamps
}
void MvccSnapshot::AddAppliedTimestamp(Timestamp timestamp) {
+ DCHECK_EQ(kLatest, type_);
if (IsApplied(timestamp)) return;
applied_timestamps_.push_back(timestamp.value());
@@ -515,6 +500,9 @@ void MvccSnapshot::AddAppliedTimestamp(Timestamp timestamp)
{
}
bool MvccSnapshot::Equals(const MvccSnapshot& other) const {
+ if (type_ != other.type_) {
+ return false;
+ }
if (all_applied_before_ != other.all_applied_before_) {
return false;
}
diff --git a/src/kudu/tablet/mvcc.h b/src/kudu/tablet/mvcc.h
index 65ab642..648ae71 100644
--- a/src/kudu/tablet/mvcc.h
+++ b/src/kudu/tablet/mvcc.h
@@ -23,11 +23,14 @@
#include <unordered_map>
#include <vector>
+#include <boost/optional/optional.hpp>
+#include <glog/logging.h>
#include <gtest/gtest_prod.h>
#include "kudu/common/timestamp.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/port.h"
+#include "kudu/tablet/txn_metadata.h"
#include "kudu/util/locks.h"
#include "kudu/util/status.h"
@@ -44,13 +47,17 @@ class MvccSnapshot {
public:
MvccSnapshot();
- // Create a snapshot with the current state of the given manager
- explicit MvccSnapshot(const MvccManager &manager);
+ // Create a kLatest snapshot with the current state of the given manager
+ explicit MvccSnapshot(const MvccManager& manager);
- // Create a snapshot at a specific Timestamp.
+ // Create a kTimestamp snapshot at a specific Timestamp.
//
// This snapshot considers all ops with lower timestamps to
- // be applied, and those with higher timestamps to be nonapplied.
+ // be applied, and those with higher timestamps to be non-applied.
+ //
+ // This snapshot considers all transactions with lower commit timestamps to
+ // be committed, and higher commit timestamps or no commit timestamp to be
+ // non-committed.
explicit MvccSnapshot(const Timestamp& timestamp);
// Create a snapshot which considers all ops as applied. This is mostly
@@ -75,6 +82,44 @@ class MvccSnapshot {
return IsAppliedFallback(timestamp);
}
+ // Returns true if the given transaction should be considered committed in
+ // this snapshot. Calls to this should always return the same result
+ // assuming:
+ // - Txns start MVCC ops to track when they begin committing,
+ // - Txns assign commit timestamp that are higher than their MVCC ops
+ // timestamps,
+ // - Txns set either the aborted bit or the commit timestamp before finishing
+ // their MVCC ops.
+ // - If this is a kTimestamp snapshot, WaitUntil(ALL_APPLIED) was called with
+ // a timestamp equal to or higher than this snapshot's timestamp.
+ inline bool IsCommitted(const TxnMetadata& txn_metadata) const {
+ boost::optional<Timestamp> commit_mvcc_op_ts;
+ boost::optional<Timestamp> commit_ts;
+ txn_metadata.GetTimestamps(&commit_mvcc_op_ts, &commit_ts);
+ // If there is no commit MVCC op in the metadata, the transaction could not
+ // have been committed at the time this snapshot represents.
+ if (!commit_mvcc_op_ts) {
+ return false;
+ }
+ if (type_ == kLatest) {
+ // If the commit MVCC op finished applying, the commit process is
+ // complete, and commit status should be determined by whether there is a
+ // commit timestamp.
+ // NOTE: we must consult the metadata again *after* checking on the apply
+ // status of the MVCC op; if we were to check the abort status first, we
+ // could race with the transaction aborting, and erroneously return that
+ // an aborted transaction is committed.
+ if (IsApplied(*commit_mvcc_op_ts)) {
+ return !txn_metadata.aborted();
+ }
+ return false;
+ }
+ DCHECK_EQ(kTimestamp, type_);
+ DCHECK_EQ(all_applied_before_, none_applied_at_or_after_);
+ DCHECK(applied_timestamps_.empty());
+ return commit_ts && *commit_ts < all_applied_before_;
+ }
+
// Returns true if this snapshot may have any applied ops with timestamp
// equal to or higher than the provided 'timestamp'.
// This is mostly useful to avoid scanning REDO deltas in certain cases.
@@ -122,10 +167,28 @@ class MvccSnapshot {
FRIEND_TEST(MvccTest, TestWaitUntilAllApplied_SnapAtTimestampWithInFlights);
FRIEND_TEST(MvccTest, TestCorrectInitWithNoOps);
+ // TODO(awong): it may be worth compiling entirely separate MvccSnapshot
+ // classes for these.
+ enum SnapshotType {
+ // The snapshot is defined entirely by a single timestamp T, indicating
+ // that iterating with this snapshot should return mutations that are
+ // committed as of T.
+ kTimestamp,
+
+ // The snapshot tracks the latest ops that have been applied. Unlike a
+ // 'kTimestamp' snapshot, may include mutations of a timestamp even though
+ // there are in-flight (non-applied) mutations at a lower timestamps.
+ kLatest,
+ };
+
bool IsAppliedFallback(const Timestamp& timestamp) const;
void AddAppliedTimestamp(Timestamp timestamp);
+ // Indicates what kind of snapshot type this is, which determines how
+ // transaction commit ops are viewed in this snapshot.
+ SnapshotType type_;
+
// Summary rule:
// An op T is applied if and only if:
// T < all_applied_before_ or
@@ -133,12 +196,15 @@ class MvccSnapshot {
//
// In ASCII form, where 'C' represents an applied op,
// and 'U' represents an nonapplied one:
- //
+ // ___ none_applied_at_or_after_
+ // /
// CCCCCCCCCCCCCCCCCUUUUUCUUUCU
// | \___\___ applied_timestamps_
// |
// \- all_applied_before_
-
+ //
+ // If this is a 'kTimestamp' snapshot, 'applied_timestamps_' is empty, and
+ // 'all_applied_before_' and 'none_applied_at_or_after_' are equal value.
// An op timestamp below which all ops have been applied.
// For any timestamp X, if X < all_applied_before_, then X is applied.
@@ -158,7 +224,6 @@ class MvccSnapshot {
// or none_applied_at_or_after_. So, using the compact vector structure fits
// the whole thing on one or two cache lines, and it ends up going faster.
std::vector<Timestamp::val_type> applied_timestamps_;
-
};
// Coordinator of MVCC ops. Threads wishing to make updates use
@@ -205,7 +270,7 @@ class MvccManager {
void AdjustNewOpLowerBound(Timestamp timestamp);
// Take a snapshot of the MVCC state at 'timestamp' (i.e which includes all
- // ops which have a lower timestamp)
+ // ops which have a lower timestamp).
//
// If there are any in-flight ops at a lower timestamp, waits for them to
// complete before returning.
@@ -213,8 +278,8 @@ class MvccManager {
// If 'timestamp' was marked safe before the call to this method (e.g. by
TimeManager)
// then the returned snapshot is repeatable.
Status WaitForSnapshotWithAllApplied(Timestamp timestamp,
- MvccSnapshot* snapshot,
- const MonoTime& deadline) const
WARN_UNUSED_RESULT;
+ MvccSnapshot* snapshot,
+ const MonoTime& deadline) const
WARN_UNUSED_RESULT;
// Wait for all operations that are currently APPLYING to finish applying.
//
@@ -260,9 +325,9 @@ class MvccManager {
friend class ScopedOp;
FRIEND_TEST(MvccTest, TestAutomaticCleanTimeMoveToSafeTimeOnApply);
FRIEND_TEST(MvccTest, TestIllegalStateTransitionsCrash);
- FRIEND_TEST(MvccTest, TestTxnAbort);
+ FRIEND_TEST(MvccTest, TestOpAbort);
- enum TxnState {
+ enum OpState {
RESERVED,
APPLYING
};
@@ -339,15 +404,15 @@ class MvccManager {
// been achieved.
bool IsDoneWaitingUnlocked(const WaitingState& waiter) const;
- // Applies the given op.
- // Sets *was_earliest to true if this was the earliest in-flight op.
+ // Applies the given op. Sets *was_earliest_op_in_flight to true if this was
+ // the earliest in-flight op.
void ApplyOpUnlocked(Timestamp timestamp,
- bool* was_earliest_in_flight);
+ bool* was_earliest_op_in_flight);
// Remove the timestamp 'ts' from the in-flight map.
// FATALs if the ts is not in the in-flight map.
// Returns its state.
- TxnState RemoveInFlightAndGetStateUnlocked(Timestamp ts);
+ OpState RemoveInFlightAndGetStateUnlocked(Timestamp ts);
// Adjusts the clean time, i.e. the timestamp such that all ops with lower
// timestamps are applied or aborted, based on which ops are currently in
@@ -365,11 +430,13 @@ class MvccManager {
typedef simple_spinlock LockType;
mutable LockType lock_;
+ // The kLatest snapshot that gets updated with op timestamps as MVCC ops
+ // start and complete through the lifespan of this MvccManager.
MvccSnapshot cur_snap_;
// The set of timestamps corresponding to currently in-flight ops.
- typedef std::unordered_map<Timestamp::val_type, TxnState> InFlightMap;
- InFlightMap timestamps_in_flight_;
+ typedef std::unordered_map<Timestamp::val_type, OpState> InFlightOpsMap;
+ InFlightOpsMap ops_in_flight_;
// An op timestamp at and below which no new ops can be
// initialized.
@@ -379,10 +446,10 @@ class MvccManager {
// timestamp.
Timestamp new_op_timestamp_exc_lower_bound_;
- // The minimum timestamp in timestamps_in_flight_, or Timestamp::kMax
+ // The minimum timestamp in ops_in_flight_, or Timestamp::kMax
// if that set is empty. This is cached in order to avoid having to iterate
- // over timestamps_in_flight_ on every apply.
- Timestamp earliest_in_flight_;
+ // over ops_in_flight_ on every apply.
+ Timestamp earliest_op_in_flight_;
mutable std::vector<WaitingState*> waiters_;
diff --git a/src/kudu/tablet/tablet_metadata-test.cc
b/src/kudu/tablet/tablet_metadata-test.cc
index 40d2f2e..254b4cb 100644
--- a/src/kudu/tablet/tablet_metadata-test.cc
+++ b/src/kudu/tablet/tablet_metadata-test.cc
@@ -46,6 +46,7 @@
#include "kudu/tablet/tablet-harness.h"
#include "kudu/tablet/tablet-test-util.h"
#include "kudu/tablet/tablet.h"
+#include "kudu/tablet/txn_metadata.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/status.h"
#include "kudu/util/stopwatch.h"
@@ -260,7 +261,7 @@ TEST_F(TestTabletMetadata, TestTxnMetadata) {
const auto& committed_txn = FindOrDie(txn_metas, kCommittedTxnId);
ASSERT_FALSE(committed_txn->aborted());
ASSERT_NE(boost::none, committed_txn->commit_timestamp());
- ASSERT_EQ(kDummyTimestamp.value(), *committed_txn->commit_timestamp());
+ ASSERT_EQ(kDummyTimestamp, *committed_txn->commit_timestamp());
const auto& aborted_txn = FindOrDie(txn_metas, kAbortedTxnId);
ASSERT_TRUE(aborted_txn->aborted());
diff --git a/src/kudu/tablet/tablet_metadata.cc
b/src/kudu/tablet/tablet_metadata.cc
index 4d9405e..699d98b 100644
--- a/src/kudu/tablet/tablet_metadata.cc
+++ b/src/kudu/tablet/tablet_metadata.cc
@@ -27,6 +27,7 @@
#include <utility>
#include <boost/optional/optional.hpp>
+#include <boost/type_traits/decay.hpp>
#include <gflags/gflags.h>
#include <google/protobuf/stubs/port.h>
@@ -34,9 +35,9 @@
#include "kudu/common/schema.h"
#include "kudu/common/timestamp.h"
#include "kudu/common/wire_protocol.h"
+#include "kudu/consensus/log_anchor_registry.h"
#include "kudu/consensus/opid.pb.h"
#include "kudu/consensus/opid_util.h"
-#include "kudu/consensus/log_anchor_registry.h"
#include "kudu/fs/block_id.h"
#include "kudu/fs/block_manager.h"
#include "kudu/fs/data_dirs.h"
@@ -48,6 +49,7 @@
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/tablet/rowset_metadata.h"
+#include "kudu/tablet/txn_metadata.h"
#include "kudu/util/debug/trace_event.h"
#include "kudu/util/env.h"
#include "kudu/util/flag_tags.h"
@@ -504,7 +506,7 @@ Status TabletMetadata::LoadFromSuperBlock(const
TabletSuperBlockPB& superblock)
new TxnMetadata(
txn_meta.has_aborted() && txn_meta.aborted(),
txn_meta.has_commit_timestamp() ?
- boost::make_optional(txn_meta.commit_timestamp()) :
+ boost::make_optional(Timestamp(txn_meta.commit_timestamp()))
:
boost::none
));
}
@@ -738,7 +740,7 @@ Status
TabletMetadata::ToSuperBlockUnlocked(TabletSuperBlockPB* super_block,
const auto& txn_meta = txn_id_and_metadata.second;
const auto& commit_ts = txn_meta->commit_timestamp();
if (commit_ts) {
- meta_pb.set_commit_timestamp(*commit_ts);
+ meta_pb.set_commit_timestamp(commit_ts->value());
}
if (txn_meta->aborted()) {
meta_pb.set_aborted(true);
@@ -805,7 +807,7 @@ void TabletMetadata::AddCommitTimestamp(int64_t txn_id,
Timestamp commit_timesta
std::lock_guard<LockType> l(data_lock_);
auto txn_metadata = FindPtrOrNull(txn_metadata_by_txn_id_, txn_id);
CHECK(txn_metadata);
- txn_metadata->set_commit_timestamp(commit_timestamp.value());
+ txn_metadata->set_commit_timestamp(commit_timestamp);
anchors_needing_flush_.emplace_back(std::move(log_anchor));
}
diff --git a/src/kudu/tablet/tablet_metadata.h
b/src/kudu/tablet/tablet_metadata.h
index b6eeae8..e8fbb3f 100644
--- a/src/kudu/tablet/tablet_metadata.h
+++ b/src/kudu/tablet/tablet_metadata.h
@@ -19,14 +19,11 @@
#include <atomic>
#include <cstdint>
#include <memory>
-#include <mutex>
#include <string>
#include <unordered_map>
#include <unordered_set>
-#include <utility>
#include <vector>
-#include <boost/none_t.hpp>
#include <boost/optional/optional.hpp>
#include <glog/logging.h>
@@ -60,48 +57,13 @@ class MinLogIndexAnchorer;
namespace tablet {
class RowSetMetadata;
+class TxnMetadata;
typedef std::vector<std::shared_ptr<RowSetMetadata> > RowSetMetadataVector;
typedef std::unordered_set<int64_t> RowSetMetadataIds;
extern const int64_t kNoDurableMemStore;
-// Encapsulates the persistent state associated with a transaction.
-class TxnMetadata : public RefCountedThreadSafe<TxnMetadata> {
- public:
- TxnMetadata(bool aborted = false,
- boost::optional<int64_t> commit_ts = boost::none)
- : aborted_(aborted),
- commit_timestamp_(std::move(commit_ts)) {}
- void set_aborted() {
- std::lock_guard<simple_spinlock> l(lock_);
- CHECK(boost::none == commit_timestamp_);
- aborted_ = true;
- }
- void set_commit_timestamp(int64_t commit_ts) {
- std::lock_guard<simple_spinlock> l(lock_);
- CHECK(boost::none == commit_timestamp_);
- commit_timestamp_ = commit_ts;
- }
-
- bool aborted() const {
- std::lock_guard<simple_spinlock> l(lock_);
- return aborted_;
- }
- boost::optional<int64_t> commit_timestamp() const {
- std::lock_guard<simple_spinlock> l(lock_);
- return commit_timestamp_;
- }
-
- private:
- friend class RefCountedThreadSafe<TxnMetadata>;
- ~TxnMetadata() = default;
-
- mutable simple_spinlock lock_;
- bool aborted_;
- boost::optional<int64_t> commit_timestamp_;
-};
-
// Manages the "blocks tracking" for the specified tablet.
//
// TabletMetadata is owned by the Tablet. As new blocks are written to store
diff --git a/src/kudu/tablet/txn_metadata.h b/src/kudu/tablet/txn_metadata.h
new file mode 100644
index 0000000..ea2708b
--- /dev/null
+++ b/src/kudu/tablet/txn_metadata.h
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <glog/logging.h>
+#include <boost/optional/optional.hpp>
+
+#include "kudu/common/timestamp.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/mutex.h"
+
+namespace kudu {
+namespace tablet {
+
+// Encapsulates the persistent state associated with a transaction.
+class TxnMetadata : public RefCountedThreadSafe<TxnMetadata> {
+ public:
+ // TODO(awong): add commit_mvcc_op_timestamp to the contructor when reading
+ // from TxnMetadataPB.
+ explicit TxnMetadata(bool aborted = false,
+ boost::optional<Timestamp> commit_ts = boost::none)
+ : aborted_(aborted),
+ commit_mvcc_op_timestamp_(boost::none),
+ commit_timestamp_(std::move(commit_ts)) {}
+ void set_aborted() {
+ std::lock_guard<simple_spinlock> l(lock_);
+ CHECK(boost::none == commit_timestamp_);
+ aborted_ = true;
+ }
+ void set_commit_timestamp(Timestamp commit_ts) {
+ std::lock_guard<simple_spinlock> l(lock_);
+ CHECK(boost::none == commit_timestamp_);
+ CHECK(!aborted_);
+ commit_timestamp_ = commit_ts;
+ }
+ void set_commit_mvcc_op_timestamp(Timestamp op_ts) {
+ std::lock_guard<simple_spinlock> l(lock_);
+ CHECK(boost::none == commit_timestamp_);
+ CHECK(boost::none == commit_mvcc_op_timestamp_);
+ commit_mvcc_op_timestamp_ = op_ts;
+ }
+
+ bool aborted() const {
+ std::lock_guard<simple_spinlock> l(lock_);
+ return aborted_;
+ }
+ boost::optional<Timestamp> commit_timestamp() const {
+ std::lock_guard<simple_spinlock> l(lock_);
+ return commit_timestamp_;
+ }
+ boost::optional<Timestamp> commit_mvcc_op_timestamp() const {
+ std::lock_guard<simple_spinlock> l(lock_);
+ return commit_mvcc_op_timestamp_;
+ }
+
+ void GetTimestamps(boost::optional<Timestamp>* op_ts,
+ boost::optional<Timestamp>* commit_ts) const {
+ std::lock_guard<simple_spinlock> l(lock_);
+ *op_ts = commit_mvcc_op_timestamp_;
+ *commit_ts = commit_timestamp_;
+ }
+
+ private:
+ friend class RefCountedThreadSafe<TxnMetadata>;
+ ~TxnMetadata() = default;
+
+ // Protects access to all members below.
+ mutable simple_spinlock lock_;
+ bool aborted_;
+
+ // If the MVCC op with this timestamp is considered applied, either
+ // 'aborted_' is set to true, or 'commit_timestamp_' is set.
+ boost::optional<Timestamp> commit_mvcc_op_timestamp_;
+
+ boost::optional<Timestamp> commit_timestamp_;
+};
+
+} // namespace tablet
+} // namespace kudu
diff --git a/src/kudu/tablet/txn_participant.cc
b/src/kudu/tablet/txn_participant.cc
index eebf2be..d498fc9 100644
--- a/src/kudu/tablet/txn_participant.cc
+++ b/src/kudu/tablet/txn_participant.cc
@@ -26,9 +26,11 @@
#include <boost/optional/optional.hpp>
+#include "kudu/common/timestamp.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/substitute.h"
+#include "kudu/tablet/txn_metadata.h"
using kudu::log::LogAnchorRegistry;
using std::vector;
@@ -120,7 +122,7 @@ vector<TxnParticipant::TxnEntry>
TxnParticipant::GetTxnsForTests() const {
const auto& commit_ts = txn_meta->commit_timestamp();
if (commit_ts) {
txn_entry.state = Txn::kCommitted;
- txn_entry.commit_timestamp = *commit_ts;
+ txn_entry.commit_timestamp = commit_ts->value();
continue;
}
}
@@ -140,8 +142,8 @@ vector<TxnParticipant::TxnEntry>
TxnParticipant::GetTxnsForTests() const {
const auto& commit_ts = txn_meta->commit_timestamp();
if (commit_ts) {
txn_entry.state = Txn::kCommitted;
- txn_entry.commit_timestamp = *commit_ts;
- txns.emplace_back(std::move(txn_entry));
+ txn_entry.commit_timestamp = commit_ts->value();
+ txns.emplace_back(txn_entry);
continue;
}
}