KUDU-798 (part 2) Remove automatic safe time adjustment from mvcc
This patch completely removes the APIs that allow automatic timestamp
assignment and automatic safe time adjustment from Mvcc.
The previous patch took care of making sure these APIs were
unused in regular TabletServer operations. This patch completes
that by removing the APIs from Mvcc and changing tests appropriately.
Because, for non-tserver based tests, timestamp assignment and
transaction start are now done under a lock this does introduce
a slowdown in tests like mt-tablet-test. I measured about 10% slowdown
on an mvcc-stressing config of mt-tablet-test (run config and
perf stats in the end of this commit message).
However, this patch series does not add any overhead on the actual
tablet server write path. In fact, this even simplifies it in some
circumstances: for instance, we were advancing safe time twice for
leader side txns: one when it was consensus committed and one when
the apply was complete. Due to this, there was even a moderate
speedup in this more important case, measured with
full_stack-insert-scan-test, (run config and perf stats below)
of about 9%.
===================================================================
full_stack-insert-scan-test run config:
bin/full_stack-insert-scan-test \
--gtest_filter=*MRSOnlyStressTest* \
--inserts_per_client=200000 \
--concurrent_inserts=10 \
--rows_per_batch=1 \
--skip_scans
Results on master before this patch series:
344086.766163 task-clock # 5.659 CPUs utilized
14,435,389 context-switches # 0.042 M/sec
91,225 cpu-migrations # 0.265 K/sec
112,036 page-faults # 0.326 K/sec
956,637,266,040 cycles # 2.780 GHz
<not supported> stalled-cycles-frontend
<not supported> stalled-cycles-backend
577,637,169,921 instructions # 0.60 insns per cycle
109,720,205,884 branches # 318.874 M/sec
730,807,372 branch-misses # 0.67% of all branches
60.807013290 seconds time elapsed
Results on master after this patch series:
328574.272742 task-clock # 5.932 CPUs utilized
13,820,330 context-switches # 0.042 M/sec
170,370 cpu-migrations # 0.519 K/sec
111,997 page-faults # 0.341 K/sec
911,264,247,114 cycles # 2.773 GHz
<not supported> stalled-cycles-frontend
<not supported> stalled-cycles-backend
572,898,176,369 instructions # 0.63 insns per cycle
108,872,113,035 branches # 331.347 M/sec
728,934,228 branch-misses # 0.67% of all branches
55.387672052 seconds time elapsed
===================================================================
mt-tablet-test run config:
bin/mt-tablet-test \
--gtest_filter=*0*DoTestAllAtOnce* \
--num_counter_threads=0 \
--num_slowreader_threads=0 \
--flusher_backoff=1.0 \
--flusher_initial_frequency_ms=10000 \
--inserts_per_thread=1000000 \
--num_summer_threads=0 \
--gtest_repeat=3 \
--tablet_test_flush_threshold_mb=2000 \
--minloglevel=10
Results on master before this patch series:
618894.806683 task-clock # 7.716 CPUs utilized
7,243,713 context-switches # 0.012 M/sec
782 cpu-migrations # 0.001 K/sec
1,457,677 page-faults # 0.002 M/sec
1,794,712,092,284 cycles # 2.900 GHz
<not supported> stalled-cycles-frontend
<not supported> stalled-cycles-backend
648,757,335,519 instructions # 0.36 insns per cycle
128,766,989,780 branches # 208.060 M/sec
790,345,583 branch-misses # 0.61% of all branches
80.204388663 seconds time elapsed
Result on master after this patch series:
620594.911121 task-clock # 7.012 CPUs utilized
17,645,922 context-switches # 0.028 M/sec
1,163 cpu-migrations # 0.002 K/sec
1,252,812 page-faults # 0.002 M/sec
1,775,937,664,119 cycles # 2.862 GHz
<not supported> stalled-cycles-frontend
<not supported> stalled-cycles-backend
757,882,564,990 instructions # 0.43 insns per cycle
155,500,757,477 branches # 250.567 M/sec
982,110,478 branch-misses # 0.63% of all branches
88.500973260 seconds time elapsed
Change-Id: I6cd0b8aa52e4b0f5c0f4872d806e76387f6b9538
Reviewed-on: http://gerrit.cloudera.org:8080/5057
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/cc120dac
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/cc120dac
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/cc120dac
Branch: refs/heads/master
Commit: cc120dac79fe2d8e8f5b8656ad7d6ad0f29732cd
Parents: b8093f0
Author: David Alves <[email protected]>
Authored: Sat Nov 26 14:10:23 2016 -0800
Committer: David Ribeiro Alves <[email protected]>
Committed: Tue Nov 29 21:54:09 2016 +0000
----------------------------------------------------------------------
src/kudu/tablet/compaction-test.cc | 11 +-
src/kudu/tablet/deltamemstore-test.cc | 24 +--
src/kudu/tablet/diskrowset-test-base.h | 8 +-
src/kudu/tablet/diskrowset-test.cc | 2 +-
src/kudu/tablet/local_tablet_writer.h | 1 +
src/kudu/tablet/memrowset-test.cc | 19 +--
src/kudu/tablet/mvcc-test.cc | 139 ++++++++++-------
src/kudu/tablet/mvcc.cc | 156 ++++---------------
src/kudu/tablet/mvcc.h | 94 +++--------
src/kudu/tablet/tablet.cc | 21 ++-
src/kudu/tablet/tablet.h | 9 +-
src/kudu/tablet/tablet_bootstrap.cc | 4 +-
.../tablet/transactions/transaction_driver.cc | 2 +-
13 files changed, 196 insertions(+), 294 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/cc120dac/src/kudu/tablet/compaction-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/compaction-test.cc
b/src/kudu/tablet/compaction-test.cc
index 99cd49f..5b8d160 100644
--- a/src/kudu/tablet/compaction-test.cc
+++ b/src/kudu/tablet/compaction-test.cc
@@ -70,8 +70,8 @@ class TestCompaction : public KuduRowSetTest {
: KuduRowSetTest(CreateSchema()),
op_id_(consensus::MaximumOpId()),
row_builder_(schema_),
- mvcc_(scoped_refptr<server::Clock>(
-
server::LogicalClock::CreateStartingAt(Timestamp::kInitialTimestamp))),
+
clock_(server::LogicalClock::CreateStartingAt(Timestamp::kInitialTimestamp)),
+ mvcc_(clock_),
log_anchor_registry_(new log::LogAnchorRegistry()) {
}
@@ -95,7 +95,7 @@ class TestCompaction : public KuduRowSetTest {
// The 'nullable_val' column is set to either NULL (when val is odd)
// or 'val' (when val is even).
void InsertRow(MemRowSet *mrs, int row_key, int32_t val) {
- ScopedTransaction tx(&mvcc_);
+ ScopedTransaction tx(&mvcc_, clock_->Now());
tx.StartApplying();
row_builder_.Reset();
snprintf(key_buf_, sizeof(key_buf_), kRowKeyFormat, row_key);
@@ -134,7 +134,7 @@ class TestCompaction : public KuduRowSetTest {
ColumnId nullable_col_id =
schema_.column_id(schema_.find_column("nullable_val"));
for (uint32_t i = 0; i < n_rows; i++) {
SCOPED_TRACE(i);
- ScopedTransaction tx(&mvcc_);
+ ScopedTransaction tx(&mvcc_, clock_->Now());
tx.StartApplying();
snprintf(keybuf, sizeof(keybuf), kRowKeyFormat, i * 10 + delta);
@@ -169,7 +169,7 @@ class TestCompaction : public KuduRowSetTest {
faststring update_buf;
for (uint32_t i = 0; i < n_rows; i++) {
SCOPED_TRACE(i);
- ScopedTransaction tx(&mvcc_);
+ ScopedTransaction tx(&mvcc_, clock_->Now());
tx.StartApplying();
snprintf(keybuf, sizeof(keybuf), kRowKeyFormat, i * 10 + delta);
@@ -397,6 +397,7 @@ class TestCompaction : public KuduRowSetTest {
RowBuilder row_builder_;
char key_buf_[256];
+ scoped_refptr<server::Clock> clock_;
MvccManager mvcc_;
scoped_refptr<LogAnchorRegistry> log_anchor_registry_;
http://git-wip-us.apache.org/repos/asf/kudu/blob/cc120dac/src/kudu/tablet/deltamemstore-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/deltamemstore-test.cc
b/src/kudu/tablet/deltamemstore-test.cc
index 389225b..1ecfe5c 100644
--- a/src/kudu/tablet/deltamemstore-test.cc
+++ b/src/kudu/tablet/deltamemstore-test.cc
@@ -49,8 +49,8 @@ class TestDeltaMemStore : public KuduTest {
TestDeltaMemStore()
: op_id_(consensus::MaximumOpId()),
schema_(CreateSchema()),
- mvcc_(scoped_refptr<server::Clock>(
-
server::LogicalClock::CreateStartingAt(Timestamp::kInitialTimestamp))) {
+
clock_(server::LogicalClock::CreateStartingAt(Timestamp::kInitialTimestamp)),
+ mvcc_(clock_) {
CHECK_OK(DeltaMemStore::Create(0, 0,
new log::LogAnchorRegistry(),
MemTracker::GetRootTracker(), &dms_));
@@ -79,7 +79,7 @@ class TestDeltaMemStore : public KuduTest {
RowChangeListEncoder update(&buf);
for (uint32_t idx_to_update : indexes_to_update) {
- ScopedTransaction tx(&mvcc_);
+ ScopedTransaction tx(&mvcc_, clock_->Now());
tx.StartApplying();
update.Reset();
uint32_t new_val = idx_to_update * 10;
@@ -122,6 +122,7 @@ class TestDeltaMemStore : public KuduTest {
const Schema schema_;
shared_ptr<DeltaMemStore> dms_;
+ scoped_refptr<server::Clock> clock_;
MvccManager mvcc_;
gscoped_ptr<FsManager> fs_manager_;
};
@@ -154,7 +155,7 @@ TEST_F(TestDeltaMemStore, TestUpdateCount) {
schema_.column_id(kStringColumn), &s);
}
if (idx % 2 == 0) {
- ScopedTransaction tx(&mvcc_);
+ ScopedTransaction tx(&mvcc_, clock_->Now());
tx.StartApplying();
uint32_t new_val = idx * 10;
update.AddColumnUpdate(schema_.column(kIntColumn),
@@ -224,7 +225,7 @@ TEST_F(TestDeltaMemStore, BenchmarkManyUpdatesToOneRow) {
faststring buf;
RowChangeListEncoder update(&buf);
- ScopedTransaction tx(&mvcc_);
+ ScopedTransaction tx(&mvcc_, clock_->Now());
tx.StartApplying();
string str(kStringDataSize, 'x');
Slice s(str);
@@ -233,6 +234,7 @@ TEST_F(TestDeltaMemStore, BenchmarkManyUpdatesToOneRow) {
CHECK_OK(dms_->Update(tx.timestamp(), kIdxToUpdate, RowChangeList(buf),
op_id_));
tx.Commit();
}
+ mvcc_.AdjustSafeTime(clock_->Now());
MvccSnapshot snap(mvcc_);
LOG_TIMING(INFO, "Applying updates") {
@@ -258,7 +260,7 @@ TEST_F(TestDeltaMemStore, TestReUpdateSlice) {
// the update gets cleared after usage. This ensures that the
// underlying data is properly copied into the DMS arena.
{
- ScopedTransaction tx(&mvcc_);
+ ScopedTransaction tx(&mvcc_, clock_->Now());
tx.StartApplying();
char buf[256] = "update 1";
Slice s(buf);
@@ -272,7 +274,7 @@ TEST_F(TestDeltaMemStore, TestReUpdateSlice) {
// Update the same cell again with a different value
{
- ScopedTransaction tx(&mvcc_);
+ ScopedTransaction tx(&mvcc_, clock_->Now());
tx.StartApplying();
char buf[256] = "update 2";
Slice s(buf);
@@ -309,8 +311,8 @@ TEST_F(TestDeltaMemStore, TestOutOfOrderTxns) {
RowChangeListEncoder update(&update_buf);
{
- ScopedTransaction tx1(&mvcc_);
- ScopedTransaction tx2(&mvcc_);
+ ScopedTransaction tx1(&mvcc_, clock_->Now());
+ ScopedTransaction tx2(&mvcc_, clock_->Now());
tx2.StartApplying();
Slice s("update 2");
@@ -344,7 +346,7 @@ TEST_F(TestDeltaMemStore, TestDMSBasic) {
char buf[256];
for (uint32_t i = 0; i < 1000; i++) {
- ScopedTransaction tx(&mvcc_);
+ ScopedTransaction tx(&mvcc_, clock_->Now());
tx.StartApplying();
update.Reset();
@@ -388,7 +390,7 @@ TEST_F(TestDeltaMemStore, TestDMSBasic) {
// these are separate transactions and we need to maintain the
// old ones for snapshot consistency purposes.
for (uint32_t i = 0; i < 1000; i++) {
- ScopedTransaction tx(&mvcc_);
+ ScopedTransaction tx(&mvcc_, clock_->Now());
tx.StartApplying();
update.Reset();
http://git-wip-us.apache.org/repos/asf/kudu/blob/cc120dac/src/kudu/tablet/diskrowset-test-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/diskrowset-test-base.h
b/src/kudu/tablet/diskrowset-test-base.h
index 6245fc8..dae3bdb 100644
--- a/src/kudu/tablet/diskrowset-test-base.h
+++ b/src/kudu/tablet/diskrowset-test-base.h
@@ -34,6 +34,7 @@
#include "kudu/consensus/log_util.h"
#include "kudu/consensus/opid_util.h"
#include "kudu/gutil/stringprintf.h"
+#include "kudu/server/clock.h"
#include "kudu/server/logical_clock.h"
#include "kudu/tablet/diskrowset.h"
#include "kudu/tablet/tablet-test-util.h"
@@ -60,8 +61,8 @@ class TestRowSet : public KuduRowSetTest {
: KuduRowSetTest(CreateTestSchema()),
n_rows_(FLAGS_roundtrip_num_rows),
op_id_(consensus::MaximumOpId()),
- mvcc_(scoped_refptr<server::Clock>(
-
server::LogicalClock::CreateStartingAt(Timestamp::kInitialTimestamp))) {
+
clock_(server::LogicalClock::CreateStartingAt(Timestamp::kInitialTimestamp)),
+ mvcc_(clock_) {
CHECK_GT(n_rows_, 0);
}
@@ -186,7 +187,7 @@ class TestRowSet : public KuduRowSetTest {
RowSetKeyProbe probe(rb.row());
ProbeStats stats;
- ScopedTransaction tx(&mvcc_);
+ ScopedTransaction tx(&mvcc_, clock_->Now());
tx.StartApplying();
Status s = rs->MutateRow(tx.timestamp(), probe, mutation, op_id_, &stats,
result);
tx.Commit();
@@ -335,6 +336,7 @@ class TestRowSet : public KuduRowSetTest {
size_t n_rows_;
consensus::OpId op_id_; // Generally a "fake" OpId for these tests.
+ scoped_refptr<server::Clock> clock_;
MvccManager mvcc_;
};
http://git-wip-us.apache.org/repos/asf/kudu/blob/cc120dac/src/kudu/tablet/diskrowset-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/diskrowset-test.cc
b/src/kudu/tablet/diskrowset-test.cc
index bc5396d..9f2c0d2 100644
--- a/src/kudu/tablet/diskrowset-test.cc
+++ b/src/kudu/tablet/diskrowset-test.cc
@@ -332,7 +332,7 @@ TEST_F(TestRowSet, TestFlushedUpdatesRespectMVCC) {
RowChangeListEncoder update(&update_buf);
for (uint32_t i = 2; i <= 5; i++) {
{
- ScopedTransaction tx(&mvcc_);
+ ScopedTransaction tx(&mvcc_, clock_->Now());
tx.StartApplying();
update.Reset();
update.AddColumnUpdate(schema_.column(1), schema_.column_id(1), &i);
http://git-wip-us.apache.org/repos/asf/kudu/blob/cc120dac/src/kudu/tablet/local_tablet_writer.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/local_tablet_writer.h
b/src/kudu/tablet/local_tablet_writer.h
index ee9fbdb..e0d3371 100644
--- a/src/kudu/tablet/local_tablet_writer.h
+++ b/src/kudu/tablet/local_tablet_writer.h
@@ -103,6 +103,7 @@ class LocalTabletWriter {
tablet_->ApplyRowOperations(tx_state_.get());
tx_state_->ReleaseTxResultPB(&result_);
+ tablet_->mvcc_manager()->AdjustSafeTime(tx_state_->timestamp());
tx_state_->CommitOrAbort(Transaction::COMMITTED);
// Return the status of first failed op.
http://git-wip-us.apache.org/repos/asf/kudu/blob/cc120dac/src/kudu/tablet/memrowset-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/memrowset-test.cc
b/src/kudu/tablet/memrowset-test.cc
index 02c83eb..a3c7057 100644
--- a/src/kudu/tablet/memrowset-test.cc
+++ b/src/kudu/tablet/memrowset-test.cc
@@ -49,8 +49,8 @@ class TestMemRowSet : public ::testing::Test {
log_anchor_registry_(new LogAnchorRegistry()),
schema_(CreateSchema()),
key_schema_(schema_.CreateKeyProjection()),
- mvcc_(scoped_refptr<server::Clock>(
-
server::LogicalClock::CreateStartingAt(Timestamp::kInitialTimestamp))) {
+
clock_(server::LogicalClock::CreateStartingAt(Timestamp::kInitialTimestamp)),
+ mvcc_(clock_) {
FLAGS_enable_data_block_fsync = false; // Keep unit tests fast.
}
@@ -107,7 +107,7 @@ class TestMemRowSet : public ::testing::Test {
}
Status InsertRow(MemRowSet *mrs, const string &key, uint32_t val) {
- ScopedTransaction tx(&mvcc_);
+ ScopedTransaction tx(&mvcc_, clock_->Now());
RowBuilder rb(schema_);
rb.AddString(key);
rb.AddUint32(val);
@@ -121,7 +121,7 @@ class TestMemRowSet : public ::testing::Test {
const string &key,
uint32_t new_val,
OperationResultPB* result) {
- ScopedTransaction tx(&mvcc_);
+ ScopedTransaction tx(&mvcc_, clock_->Now());
tx.StartApplying();
mutation_buf_.clear();
@@ -143,7 +143,7 @@ class TestMemRowSet : public ::testing::Test {
}
Status DeleteRow(MemRowSet *mrs, const string &key, OperationResultPB*
result) {
- ScopedTransaction tx(&mvcc_);
+ ScopedTransaction tx(&mvcc_, clock_->Now());
tx.StartApplying();
mutation_buf_.clear();
@@ -184,6 +184,7 @@ class TestMemRowSet : public ::testing::Test {
faststring mutation_buf_;
const Schema schema_;
const Schema key_schema_;
+ scoped_refptr<server::Clock> clock_;
MvccManager mvcc_;
};
@@ -231,7 +232,7 @@ TEST_F(TestMemRowSet, TestInsertAndIterateCompoundKey) {
RowBuilder rb(compound_key_schema);
{
- ScopedTransaction tx(&mvcc_);
+ ScopedTransaction tx(&mvcc_, clock_->Now());
tx.StartApplying();
rb.AddString(string("hello world"));
rb.AddInt32(1);
@@ -242,7 +243,7 @@ TEST_F(TestMemRowSet, TestInsertAndIterateCompoundKey) {
}
{
- ScopedTransaction tx2(&mvcc_);
+ ScopedTransaction tx2(&mvcc_, clock_->Now());
tx2.StartApplying();
rb.Reset();
rb.AddString(string("goodbye world"));
@@ -254,7 +255,7 @@ TEST_F(TestMemRowSet, TestInsertAndIterateCompoundKey) {
}
{
- ScopedTransaction tx3(&mvcc_);
+ ScopedTransaction tx3(&mvcc_, clock_->Now());
tx3.StartApplying();
rb.Reset();
rb.AddString(string("goodbye world"));
@@ -463,7 +464,7 @@ TEST_F(TestMemRowSet, TestInsertionMVCC) {
// Insert 5 rows in tx 0 through 4
for (uint32_t i = 0; i < 5; i++) {
{
- ScopedTransaction tx(&mvcc_);
+ ScopedTransaction tx(&mvcc_, clock_->Now());
tx.StartApplying();
RowBuilder rb(schema_);
char keybuf[256];
http://git-wip-us.apache.org/repos/asf/kudu/blob/cc120dac/src/kudu/tablet/mvcc-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/mvcc-test.cc b/src/kudu/tablet/mvcc-test.cc
index 1f036e9..d24c108 100644
--- a/src/kudu/tablet/mvcc-test.cc
+++ b/src/kudu/tablet/mvcc-test.cc
@@ -72,8 +72,9 @@ TEST_F(MvccTest, TestMvccBasic) {
ASSERT_FALSE(snap.IsCommitted(Timestamp(2)));
// Start timestamp 1
- Timestamp t = mgr.StartTransaction();
+ Timestamp t = clock_->Now();
ASSERT_EQ(1, t.value());
+ mgr.StartTransaction(t);
// State should still have no committed transactions, since 1 is in-flight.
mgr.TakeSnapshot(&snap);
@@ -101,11 +102,12 @@ TEST_F(MvccTest, TestMvccMultipleInFlight) {
MvccManager mgr(clock_.get());
MvccSnapshot snap;
- // Start timestamp 1, timestamp 2
- Timestamp t1 = mgr.StartTransaction();
+ Timestamp t1 = clock_->Now();
ASSERT_EQ(1, t1.value());
- Timestamp t2 = mgr.StartTransaction();
+ mgr.StartTransaction(t1);
+ Timestamp t2 = clock_->Now();
ASSERT_EQ(2, t2.value());
+ mgr.StartTransaction(t2);
// State should still have no committed transactions, since both are
in-flight.
@@ -127,8 +129,9 @@ TEST_F(MvccTest, TestMvccMultipleInFlight) {
ASSERT_TRUE(snap.IsCommitted(t2));
// Start another transaction. This gets timestamp 3
- Timestamp t3 = mgr.StartTransaction();
+ Timestamp t3 = clock_->Now();
ASSERT_EQ(3, t3.value());
+ mgr.StartTransaction(t3);
// State should show 2 as committed, 1 and 4 as uncommitted.
mgr.TakeSnapshot(&snap);
@@ -156,6 +159,9 @@ TEST_F(MvccTest, TestMvccMultipleInFlight) {
mgr.StartApplyingTransaction(t1);
mgr.CommitTransaction(t1);
+ // All transactions are committed, adjust the safe time.
+ mgr.AdjustSafeTime(t3);
+
// all committed
mgr.TakeSnapshot(&snap);
ASSERT_EQ("MvccSnapshot[committed={T|T < 3 or (T in {3})}]",
snap.ToString());
@@ -170,19 +176,22 @@ TEST_F(MvccTest, TestOutOfOrderTxns) {
MvccManager mgr(hybrid_clock);
// Start a normal non-commit-wait txn.
- Timestamp normal_txn = mgr.StartTransaction();
+ Timestamp normal_txn = hybrid_clock->Now();
+ mgr.StartTransaction(normal_txn);
MvccSnapshot s1(mgr);
// Start a transaction as if it were using commit-wait (i.e. started in
future)
- Timestamp cw_txn = mgr.StartTransactionAtLatest();
+ Timestamp cw_txn = hybrid_clock->NowLatest();
+ mgr.StartTransaction(cw_txn);
// Commit the original txn
mgr.StartApplyingTransaction(normal_txn);
mgr.CommitTransaction(normal_txn);
// Start a new txn
- Timestamp normal_txn_2 = mgr.StartTransaction();
+ Timestamp normal_txn_2 = hybrid_clock->Now();
+ mgr.StartTransaction(normal_txn_2);
// The old snapshot should not have either txn
EXPECT_FALSE(s1.IsCommitted(normal_txn));
@@ -203,39 +212,37 @@ TEST_F(MvccTest, TestOutOfOrderTxns) {
EXPECT_FALSE(s3.IsCommitted(normal_txn_2));
}
-// Tests starting transaction at a point-in-time in the past and committing
them.
-// This is disconnected from the current time (whatever is returned from
clock->Now())
-// for replication/bootstrap.
-TEST_F(MvccTest, TestOfflineTransactions) {
+// Tests starting transaction at a point-in-time in the past and committing
them while
+// adjusting safe time.
+TEST_F(MvccTest, TestSafeTimeWithOutOfOrderTxns) {
MvccManager mgr(clock_.get());
- // set the clock to some time in the "future"
+ // Set the clock to some time in the "future".
ASSERT_OK(clock_->Update(Timestamp(100)));
- // now start a transaction in the "past"
- ASSERT_OK(mgr.StartTransactionAtTimestamp(Timestamp(50)));
+ // Start a transaction in the "past"
+ Timestamp txn_in_the_past(50);
+ mgr.StartTransaction(txn_in_the_past);
+ mgr.StartApplyingTransaction(txn_in_the_past);
ASSERT_EQ(Timestamp::kInitialTimestamp, mgr.GetCleanTimestamp());
- // and committing this transaction "offline" this
- // should not advance the MvccManager 'all_committed_before_'
- // watermark.
- mgr.StartApplyingTransaction(Timestamp(50));
- mgr.OfflineCommitTransaction(Timestamp(50));
+ // Committing 'txn_in_the_past' should not advance safe time or clean time.
+ mgr.CommitTransaction(txn_in_the_past);
- // Now take a snaphsot.
+ // Now take a snapshot.
MvccSnapshot snap1;
mgr.TakeSnapshot(&snap1);
- // Because we did not advance the watermark, even though the only
- // in-flight transaction was committed at time 50, a transaction at
- // time 40 should still be considered uncommitted.
+ // Because we did not advance the the safe or clean watermarkd, even though
the only
+ // in-flight transaction was committed at time 50, a transaction at time 40
should still be
+ // considered uncommitted.
ASSERT_FALSE(snap1.IsCommitted(Timestamp(40)));
- // Now advance the watermark to the last committed transaction.
- mgr.OfflineAdjustSafeTime(Timestamp(50));
+ // Now advance the both clean and safe watermarks to the last committed
transaction.
+ mgr.AdjustSafeTime(Timestamp(50));
- ASSERT_EQ(Timestamp(50), mgr.GetCleanTimestamp());
+ ASSERT_EQ(txn_in_the_past, mgr.GetCleanTimestamp());
MvccSnapshot snap2;
mgr.TakeSnapshot(&snap2);
@@ -248,8 +255,8 @@ TEST_F(MvccTest, TestScopedTransaction) {
MvccSnapshot snap;
{
- ScopedTransaction t1(&mgr);
- ScopedTransaction t2(&mgr);
+ ScopedTransaction t1(&mgr, clock_->Now());
+ ScopedTransaction t2(&mgr, clock_->Now());
ASSERT_EQ(1, t1.timestamp().value());
ASSERT_EQ(2, t2.timestamp().value());
@@ -365,9 +372,12 @@ TEST_F(MvccTest, TestAreAllTransactionsCommitted) {
MvccManager mgr(clock_.get());
// start several transactions and take snapshots along the way
- Timestamp tx1 = mgr.StartTransaction();
- Timestamp tx2 = mgr.StartTransaction();
- Timestamp tx3 = mgr.StartTransaction();
+ Timestamp tx1 = clock_->Now();
+ mgr.StartTransaction(tx1);
+ Timestamp tx2 = clock_->Now();
+ mgr.StartTransaction(tx2);
+ Timestamp tx3 = clock_->Now();
+ mgr.StartTransaction(tx3);
ASSERT_FALSE(mgr.AreAllTransactionsCommitted(Timestamp(1)));
ASSERT_FALSE(mgr.AreAllTransactionsCommitted(Timestamp(2)));
@@ -410,8 +420,11 @@ TEST_F(MvccTest,
TestWaitForCleanSnapshot_SnapWithInFlights) {
MvccManager mgr(clock_.get());
- Timestamp tx1 = mgr.StartTransaction();
- Timestamp tx2 = mgr.StartTransaction();
+ Timestamp tx1 = clock_->Now();
+ mgr.StartTransaction(tx1);
+ Timestamp tx2 = clock_->Now();
+ mgr.StartTransaction(tx2);
+ mgr.AdjustSafeTime(tx2);
thread waiting_thread = thread(&MvccTest::WaitForSnapshotAtTSThread, this,
&mgr, clock_->Now());
@@ -428,8 +441,11 @@ TEST_F(MvccTest,
TestWaitForCleanSnapshot_SnapWithInFlights) {
TEST_F(MvccTest, TestWaitForApplyingTransactionsToCommit) {
MvccManager mgr(clock_.get());
- Timestamp tx1 = mgr.StartTransaction();
- Timestamp tx2 = mgr.StartTransaction();
+ Timestamp tx1 = clock_->Now();
+ mgr.StartTransaction(tx1);
+ Timestamp tx2 = clock_->Now();
+ mgr.StartTransaction(tx2);
+ mgr.AdjustSafeTime(tx2);
// Wait should return immediately, since we have no transactions "applying"
// yet.
@@ -458,9 +474,13 @@ TEST_F(MvccTest,
TestWaitForCleanSnapshot_SnapAtTimestampWithInFlights) {
MvccManager mgr(clock_.get());
// Transactions with timestamp 1 through 3
- Timestamp tx1 = mgr.StartTransaction();
- Timestamp tx2 = mgr.StartTransaction();
- Timestamp tx3 = mgr.StartTransaction();
+ Timestamp tx1 = clock_->Now();
+ mgr.StartTransaction(tx1);
+ Timestamp tx2 = clock_->Now();
+ mgr.StartTransaction(tx2);
+ Timestamp tx3 = clock_->Now();
+ mgr.StartTransaction(tx3);
+ mgr.AdjustSafeTime(tx3);
// Start a thread waiting for transactions with ts <= 2 to commit
thread waiting_thread = thread(&MvccTest::WaitForSnapshotAtTSThread, this,
&mgr, tx2);
@@ -492,9 +512,13 @@ TEST_F(MvccTest, TestTxnAbort) {
MvccManager mgr(clock_.get());
// Transactions with timestamps 1 through 3
- Timestamp tx1 = mgr.StartTransaction();
- Timestamp tx2 = mgr.StartTransaction();
- Timestamp tx3 = mgr.StartTransaction();
+ Timestamp tx1 = clock_->Now();
+ mgr.StartTransaction(tx1);
+ Timestamp tx2 = clock_->Now();
+ mgr.StartTransaction(tx2);
+ Timestamp tx3 = clock_->Now();
+ mgr.StartTransaction(tx3);
+ mgr.AdjustSafeTime(tx3);
// Now abort tx1, this shouldn't move the clean time and the transaction
// shouldn't be reported as committed.
@@ -503,12 +527,11 @@ TEST_F(MvccTest, TestTxnAbort) {
ASSERT_FALSE(mgr.cur_snap_.IsCommitted(tx1));
// Committing tx3 shouldn't advance the clean time since it is not the
earliest
- // in-flight, but it should advance 'no_new_transactions_at_or_before_', the
"safe"
- // time, to 3.
+ // in-flight, but it should advance 'safe_time_' to 3.
mgr.StartApplyingTransaction(tx3);
mgr.CommitTransaction(tx3);
ASSERT_TRUE(mgr.cur_snap_.IsCommitted(tx3));
- ASSERT_EQ(tx3, mgr.no_new_transactions_at_or_before_);
+ ASSERT_EQ(tx3, mgr.safe_time_);
// Committing tx2 should advance the clean time to 3.
mgr.StartApplyingTransaction(tx2);
@@ -518,21 +541,21 @@ TEST_F(MvccTest, TestTxnAbort) {
}
// This tests for a bug we were observing, where a clean snapshot would not
-// coalesce to the latest timestamp, for offline transactions.
-TEST_F(MvccTest, TestCleanTimeCoalescingOnOfflineTransactions) {
+// coalesce to the latest timestamp.
+TEST_F(MvccTest, TestAutomaticCleanTimeMoveToSafeTimeOnCommit) {
MvccManager mgr(clock_.get());
clock_->Update(Timestamp(20));
- CHECK_OK(mgr.StartTransactionAtTimestamp(Timestamp(10)));
- CHECK_OK(mgr.StartTransactionAtTimestamp(Timestamp(15)));
- mgr.OfflineAdjustSafeTime(Timestamp(15));
+ mgr.StartTransaction(Timestamp(10));
+ mgr.StartTransaction(Timestamp(15));
+ mgr.AdjustSafeTime(Timestamp(15));
mgr.StartApplyingTransaction(Timestamp(15));
- mgr.OfflineCommitTransaction(Timestamp(15));
+ mgr.CommitTransaction(Timestamp(15));
mgr.StartApplyingTransaction(Timestamp(10));
- mgr.OfflineCommitTransaction(Timestamp(10));
+ mgr.CommitTransaction(Timestamp(10));
ASSERT_EQ(mgr.cur_snap_.ToString(), "MvccSnapshot[committed={T|T < 15 or (T
in {15})}]");
}
@@ -568,7 +591,8 @@ TEST_F(MvccTest, TestIllegalStateTransitionsCrash) {
// Start a transaction, and try committing it without having moved to
"Applying"
// state.
- Timestamp t = mgr.StartTransaction();
+ Timestamp t = clock_->Now();
+ mgr.StartTransaction(t);
EXPECT_DEATH({
mgr.CommitTransaction(t);
}, "Trying to commit a transaction which never entered APPLYING state");
@@ -582,7 +606,9 @@ TEST_F(MvccTest, TestIllegalStateTransitionsCrash) {
}, "Trying to remove timestamp which isn't in the in-flight set: 21");
// Start a new transaction. This time, mark it as Applying.
- t = mgr.StartTransaction();
+ t = clock_->Now();
+ mgr.StartTransaction(t);
+ mgr.AdjustSafeTime(t);
mgr.StartApplyingTransaction(t);
// Can only call StartApplying once.
@@ -602,8 +628,9 @@ TEST_F(MvccTest, TestIllegalStateTransitionsCrash) {
TEST_F(MvccTest, TestWaitUntilCleanDeadline) {
MvccManager mgr(clock_.get());
- // Transactions with timestamp 1 through 3
- Timestamp tx1 = mgr.StartTransaction();
+ // Transactions with timestamp 1
+ Timestamp tx1 = clock_->Now();
+ mgr.StartTransaction(tx1);
// Wait until the 'tx1' timestamp is clean -- this won't happen because the
// transaction isn't committed yet.
http://git-wip-us.apache.org/repos/asf/kudu/blob/cc120dac/src/kudu/tablet/mvcc.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/mvcc.cc b/src/kudu/tablet/mvcc.cc
index db79c77..f6316e8 100644
--- a/src/kudu/tablet/mvcc.cc
+++ b/src/kudu/tablet/mvcc.cc
@@ -35,59 +35,22 @@
namespace kudu { namespace tablet {
MvccManager::MvccManager(const scoped_refptr<server::Clock>& clock)
- : no_new_transactions_at_or_before_(Timestamp::kMin),
+ : safe_time_(Timestamp::kMin),
earliest_in_flight_(Timestamp::kMax),
clock_(clock) {
cur_snap_.all_committed_before_ = Timestamp::kInitialTimestamp;
cur_snap_.none_committed_at_or_after_ = Timestamp::kInitialTimestamp;
}
-Timestamp MvccManager::StartTransaction() {
- while (true) {
- Timestamp now = clock_->Now();
- std::lock_guard<LockType> l(lock_);
- if (PREDICT_TRUE(InitTransactionUnlocked(now))) {
- return now;
- }
- }
- // dummy return to avoid compiler warnings
- LOG(FATAL) << "Unreachable, added to avoid compiler warning.";
- return Timestamp::kInvalidTimestamp;
-}
-
-Timestamp MvccManager::StartTransactionAtLatest() {
+void MvccManager::StartTransaction(Timestamp timestamp) {
std::lock_guard<LockType> l(lock_);
- Timestamp now_latest = clock_->NowLatest();
- while (PREDICT_FALSE(!InitTransactionUnlocked(now_latest))) {
- now_latest = clock_->NowLatest();
- }
-
- // If in debug mode enforce that transactions have monotonically increasing
- // timestamps at all times
-#ifndef NDEBUG
- if (!timestamps_in_flight_.empty()) {
- Timestamp max(std::max_element(timestamps_in_flight_.begin(),
- timestamps_in_flight_.end())->first);
- CHECK_EQ(max.value(), now_latest.value());
- }
-#endif
-
- return now_latest;
-}
-
-Status MvccManager::StartTransactionAtTimestamp(Timestamp timestamp) {
- std::lock_guard<LockType> l(lock_);
- if (PREDICT_FALSE(cur_snap_.IsCommitted(timestamp))) {
- return Status::IllegalState(
- strings::Substitute("Timestamp: $0 is already committed. Current
Snapshot: $1",
- timestamp.value(), cur_snap_.ToString()));
- }
- if (!InitTransactionUnlocked(timestamp)) {
- return Status::IllegalState(
- strings::Substitute("There is already a transaction with timestamp: $0
in flight.",
- timestamp.value()));
- }
- return Status::OK();
+ CHECK(!cur_snap_.IsCommitted(timestamp)) << "Trying to start a new txn at an
already-committed"
+ << " timestamp: " <<
timestamp.ToString()
+ << " cur_snap_: " <<
cur_snap_.ToString();
+ CHECK(InitTransactionUnlocked(timestamp)) << "There is already a transaction
with timestamp: "
+ << timestamp.value() << " in
flight or this timestamp "
+ << "is before than or equal to
\"safe\" time."
+ << "Current Snapshot: " <<
cur_snap_.ToString();
}
void MvccManager::StartApplyingTransaction(Timestamp timestamp) {
@@ -108,20 +71,10 @@ void MvccManager::StartApplyingTransaction(Timestamp
timestamp) {
}
bool MvccManager::InitTransactionUnlocked(const Timestamp& timestamp) {
- // Ensure that we didn't mark the given timestamp as "safe" in between
- // acquiring the time and taking the lock. This allows us to acquire
timestamps
- // outside of the MVCC lock.
- if (PREDICT_FALSE(no_new_transactions_at_or_before_ >= timestamp)) {
+ // Ensure that we didn't mark the given timestamp as "safe".
+ if (PREDICT_FALSE(safe_time_ >= timestamp)) {
return false;
}
- // Since transactions only commit once they are in the past, and new
- // transactions always start either in the current time or the future,
- // we should never be trying to start a new transaction at the same time
- // as an already-committed one.
- DCHECK(!cur_snap_.IsCommitted(timestamp))
- << "Trying to start a new txn at already-committed timestamp "
- << timestamp.ToString()
- << " cur_snap_: " << cur_snap_.ToString();
if (timestamp < earliest_in_flight_) {
earliest_in_flight_ = timestamp;
@@ -130,24 +83,6 @@ bool MvccManager::InitTransactionUnlocked(const Timestamp&
timestamp) {
return InsertIfNotPresent(×tamps_in_flight_, timestamp.value(),
RESERVED);
}
-void MvccManager::CommitTransaction(Timestamp timestamp) {
- std::lock_guard<LockType> l(lock_);
- bool was_earliest = false;
- CommitTransactionUnlocked(timestamp, &was_earliest);
-
- // No more transactions will start with a ts that is lower than or equal
- // to 'timestamp', so we adjust the snapshot accordingly.
- if (no_new_transactions_at_or_before_ < timestamp) {
- no_new_transactions_at_or_before_ = timestamp;
- }
-
- if (was_earliest) {
- // If this transaction was the earliest in-flight, we might have to adjust
- // the "clean" timestamp.
- AdjustCleanTime();
- }
-}
-
void MvccManager::AbortTransaction(Timestamp timestamp) {
std::lock_guard<LockType> l(lock_);
@@ -163,7 +98,7 @@ void MvccManager::AbortTransaction(Timestamp timestamp) {
}
}
-void MvccManager::OfflineCommitTransaction(Timestamp timestamp) {
+void MvccManager::CommitTransaction(Timestamp timestamp) {
std::lock_guard<LockType> l(lock_);
// Commit the transaction, but do not adjust 'all_committed_before_', that
will
@@ -171,8 +106,7 @@ void MvccManager::OfflineCommitTransaction(Timestamp
timestamp) {
bool was_earliest = false;
CommitTransactionUnlocked(timestamp, &was_earliest);
- if (was_earliest &&
- no_new_transactions_at_or_before_ >= timestamp) {
+ if (was_earliest && safe_time_ >= timestamp) {
// If this transaction was the earliest in-flight, we might have to adjust
// the "clean" timestamp.
AdjustCleanTime();
@@ -225,13 +159,16 @@ void MvccManager::AdvanceEarliestInFlightTimestamp() {
}
}
-void MvccManager::OfflineAdjustSafeTime(Timestamp safe_time) {
+void MvccManager::AdjustSafeTime(Timestamp safe_time) {
std::lock_guard<LockType> l(lock_);
// No more transactions will start with a ts that is lower than or equal
// to 'safe_time', so we adjust the snapshot accordingly.
- if (no_new_transactions_at_or_before_ < safe_time) {
- no_new_transactions_at_or_before_ = safe_time;
+ if (PREDICT_TRUE(safe_time_ < safe_time)) {
+ safe_time_ = safe_time;
+ } else {
+ // If we couldn't adjust "safe" time don't bother adjusting "clean" time.
+ return;
}
AdjustCleanTime();
@@ -252,22 +189,22 @@ static void
FilterTimestamps(std::vector<Timestamp::val_type>* v,
void MvccManager::AdjustCleanTime() {
// There are two possibilities:
//
- // 1) We still have an in-flight transaction earlier than
'no_new_transactions_at_or_before_'.
+ // 1) We still have an in-flight transaction earlier than 'safe_time_'.
// In this case, we update the watermark to that transaction's timestamp.
//
- // 2) There are no in-flight transactions earlier than
'no_new_transactions_at_or_before_'.
+ // 2) There are no in-flight transactions earlier than 'safe_time_'.
// (There may still be in-flight transactions with future timestamps due
to
// commit-wait transactions which start in the future). In this case, we
update
- // the watermark to 'no_new_transactions_at_or_before_', since we know
that no new
+ // the watermark to 'safe_time_', since we know that no new
// transactions can start with an earlier timestamp.
//
// In either case, we have to add the newly committed ts only if it remains
higher
// than the new watermark.
- if (earliest_in_flight_ < no_new_transactions_at_or_before_) {
+ if (earliest_in_flight_ < safe_time_) {
cur_snap_.all_committed_before_ = earliest_in_flight_;
} else {
- cur_snap_.all_committed_before_ = no_new_transactions_at_or_before_;
+ cur_snap_.all_committed_before_ = safe_time_;
}
// Filter out any committed timestamps that now fall below the watermark
@@ -516,33 +453,11 @@ void MvccSnapshot::AddCommittedTimestamp(Timestamp
timestamp) {
////////////////////////////////////////////////////////////
// ScopedTransaction
////////////////////////////////////////////////////////////
-ScopedTransaction::ScopedTransaction(MvccManager *mgr, TimestampAssignmentType
assignment_type)
+ScopedTransaction::ScopedTransaction(MvccManager* manager, Timestamp timestamp)
: done_(false),
- manager_(DCHECK_NOTNULL(mgr)),
- assignment_type_(assignment_type) {
-
- switch (assignment_type_) {
- case NOW: {
- timestamp_ = mgr->StartTransaction();
- break;
- }
- case NOW_LATEST: {
- timestamp_ = mgr->StartTransactionAtLatest();
- break;
- }
- default: {
- LOG(FATAL) << "Illegal TransactionAssignmentType. Only NOW and
NOW_LATEST are supported"
- " by this ctor.";
- }
- }
-}
-
-ScopedTransaction::ScopedTransaction(MvccManager *mgr, Timestamp timestamp)
- : done_(false),
- manager_(DCHECK_NOTNULL(mgr)),
- assignment_type_(PRE_ASSIGNED),
+ manager_(DCHECK_NOTNULL(manager)),
timestamp_(timestamp) {
- CHECK_OK(mgr->StartTransactionAtTimestamp(timestamp));
+ manager_->StartTransaction(timestamp);
}
ScopedTransaction::~ScopedTransaction() {
@@ -556,21 +471,7 @@ void ScopedTransaction::StartApplying() {
}
void ScopedTransaction::Commit() {
- switch (assignment_type_) {
- case NOW:
- case NOW_LATEST: {
- manager_->CommitTransaction(timestamp_);
- break;
- }
- case PRE_ASSIGNED: {
- manager_->OfflineCommitTransaction(timestamp_);
- break;
- }
- default: {
- LOG(FATAL) << "Unexpected transaction assignment type.";
- }
- }
-
+ manager_->CommitTransaction(timestamp_);
done_ = true;
}
@@ -579,6 +480,5 @@ void ScopedTransaction::Abort() {
done_ = true;
}
-
} // namespace tablet
} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/cc120dac/src/kudu/tablet/mvcc.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/mvcc.h b/src/kudu/tablet/mvcc.h
index 19ca2d5..94f3789 100644
--- a/src/kudu/tablet/mvcc.h
+++ b/src/kudu/tablet/mvcc.h
@@ -165,10 +165,6 @@ class MvccSnapshot {
// or
// 2) StartTransaction() -> AbortTransaction()
//
-// When a transaction is started, a timestamp is assigned. The manager will
-// never assign a timestamp if there is already another transaction with
-// the same timestamp in flight or previously committed.
-//
// When a transaction is ready to start making changes to in-memory data,
// it should transition to APPLYING state by calling
StartApplyingTransaction().
// At this point, the transaction should apply its in-memory operations and
@@ -178,47 +174,23 @@ class MvccSnapshot {
// NOTE: we do not support "rollback" of in-memory edits. Thus, once we call
// StartApplyingTransaction(), the transaction _must_ commit.
//
+// See: docs/design_docs/repeatable-reads.md for more information on some of
the concepts in
+// this class like "clean" and "safe" time.
class MvccManager {
public:
explicit MvccManager(const scoped_refptr<server::Clock>& clock);
- // Begin a new transaction, assigning it a transaction ID.
- // Callers should generally prefer using the ScopedTransaction class defined
- // below, which will automatically finish the transaction when it goes out
- // of scope.
- Timestamp StartTransaction();
-
- // The same as the above but but starts the transaction at the latest
possible
- // time, i.e. now + max_error. Returns Timestamp::kInvalidTimestamp if it was
- // not possible to obtain the latest time.
- Timestamp StartTransactionAtLatest();
-
// Begins a new transaction, which is assigned the provided timestamp.
- // Returns Status::OK() if the transaction was started successfully or
- // Status::IllegalState() if the provided timestamp is already considered
- // committed, e.g. if timestamp < 'all_committed_before_'.
- Status StartTransactionAtTimestamp(Timestamp timestamp);
+ //
+ // Requires that 'timestamp' is not committed.
+ // Requires that 'timestamp' is greater than 'safe_time'.
+ void StartTransaction(Timestamp timestamp);
// Mark that the transaction with the given timestamp is starting to apply
// its writes to in-memory stores. This must be called before
CommitTransaction().
// If this is called, then AbortTransaction(timestamp) must never be called.
void StartApplyingTransaction(Timestamp timestamp);
- // Commit the given transaction.
- //
- // If the transaction is not currently in-flight, this will trigger an
- // assertion error. It is an error to commit the same transaction more
- // than once.
- //
- // This should be used for 'true' online transaction processing on LEADER
- // replicas and not for delayed processing on FOLLOWER/LEARNER replicas or
- // on bootstrap, as this advances 'all_committed_before_' to clock_->Now()
- // when possible.
- //
- // The transaction must already have been marked as 'APPLYING' by calling
- // StartApplyingTransaction(), or else this logs a FATAL error.
- void CommitTransaction(Timestamp timestamp);
-
// Abort the given transaction.
//
// If the transaction is not currently in-flight, this will trigger an
@@ -226,23 +198,27 @@ class MvccManager {
// than once.
//
// This makes sure that the transaction with 'timestamp' is removed from
- // the in-flight set but without advancing the safe time since a new
- // transaction with a lower timestamp might be executed later.
+ // the in-flight set.
//
// The transaction must not have been marked as 'APPLYING' by calling
// StartApplyingTransaction(), or else this logs a FATAL error.
void AbortTransaction(Timestamp timestamp);
- // Same as commit transaction but does not advance 'all_committed_before_'.
- // Used for bootstrap and delayed processing in FOLLOWERS/LEARNERS.
+ // Commit the given transaction.
+ //
+ // If the transaction is not currently in-flight, this will trigger an
+ // assertion error. It is an error to commit the same transaction more
+ // than once.
//
// The transaction must already have been marked as 'APPLYING' by calling
// StartApplyingTransaction(), or else this logs a FATAL error.
- void OfflineCommitTransaction(Timestamp timestamp);
+ void CommitTransaction(Timestamp timestamp);
- // Used in conjunction with OfflineCommitTransaction() so that the mvcc
- // manager can trim state.
- void OfflineAdjustSafeTime(Timestamp safe_time);
+ // Adjusts the safe time so that the MvccManager can trim state.
+ //
+ // This must only be called when there is a guarantee that there won't be
+ // any more transactions with timestamps equal to or lower than 'safe_time'.
+ void AdjustSafeTime(Timestamp safe_time);
// Take a snapshot of the current MVCC state, which indicates which
// transactions have been committed at the time of this call.
@@ -308,7 +284,7 @@ class MvccManager {
friend class MvccTest;
FRIEND_TEST(MvccTest, TestAreAllTransactionsCommitted);
FRIEND_TEST(MvccTest, TestTxnAbort);
- FRIEND_TEST(MvccTest, TestCleanTimeCoalescingOnOfflineTransactions);
+ FRIEND_TEST(MvccTest, TestAutomaticCleanTimeMoveToSafeTimeOnCommit);
FRIEND_TEST(MvccTest, TestWaitForApplyingTransactionsToCommit);
enum TxnState {
@@ -359,7 +335,7 @@ class MvccManager {
// Adjusts the clean time, i.e. the timestamp such that all transactions with
// lower timestamps are committed or aborted, based on which transactions are
- // currently in flight and on what is the latest value of
'no_new_transactions_at_or_before_'.
+ // currently in flight and on what is the latest value of 'safe_time_'.
void AdjustCleanTime();
// Advances the earliest in-flight timestamp, based on which transactions are
@@ -381,10 +357,10 @@ class MvccManager {
typedef std::unordered_map<Timestamp::val_type, TxnState> InFlightMap;
InFlightMap timestamps_in_flight_;
- // A transaction ID below which all transactions are either committed or
in-flight,
+ // A transaction timestamp below which all transactions are either committed
or in-flight,
// meaning no new transactions will be started with a timestamp that is equal
// to or lower than this one.
- Timestamp no_new_transactions_at_or_before_;
+ Timestamp safe_time_;
// The minimum timestamp in timestamps_in_flight_, or Timestamp::kMax
// if that set is empty. This is cached in order to avoid having to iterate
@@ -402,29 +378,10 @@ class MvccManager {
// committed.
class ScopedTransaction {
public:
-
- // How to assign the timestamp to this transaction:
- // NOW - Based on the value obtained from clock_->Now().
- // NOW_LATEST - Based on the value obtained from clock_->NowLatest().
- // PRE_ASSIGNED - Based on the value passed in the ctor.
- enum TimestampAssignmentType {
- NOW,
- NOW_LATEST,
- PRE_ASSIGNED
- };
-
// Create a new transaction from the given MvccManager.
- // If 'latest' is true this transaction will use
MvccManager::StartTransactionAtLatest()
- // instead of MvccManager::StartTransaction().
//
- // The MvccManager must remain valid for the lifetime of this object.
- explicit ScopedTransaction(MvccManager *manager, TimestampAssignmentType
assignment_type = NOW);
-
- // Like the ctor above but starts the transaction at a pre-defined timestamp.
- // When this transaction is committed it will use
MvccManager::OfflineCommitTransaction()
- // so this is appropriate for offline replaying of transactions for replica
catch-up or
- // bootstrap.
- explicit ScopedTransaction(MvccManager *manager, Timestamp timestamp);
+ // When this transaction is committed it will use
MvccManager::CommitTransaction().
+ ScopedTransaction(MvccManager* manager, Timestamp timestamp);
// Commit the transaction referenced by this scoped object, if it hasn't
// already been committed.
@@ -454,8 +411,7 @@ class ScopedTransaction {
private:
bool done_;
MvccManager * const manager_;
- TimestampAssignmentType assignment_type_;
- Timestamp timestamp_;
+ const Timestamp timestamp_;
DISALLOW_COPY_AND_ASSIGN(ScopedTransaction);
};
http://git-wip-us.apache.org/repos/asf/kudu/blob/cc120dac/src/kudu/tablet/tablet.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index 3793026..91674ff 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -369,17 +369,22 @@ Status Tablet::AcquireLockForOp(WriteTransactionState*
tx_state, RowOp* op) {
}
void Tablet::AssignTimestampAndStartTransactionForTests(WriteTransactionState*
tx_state) {
- gscoped_ptr<ScopedTransaction> mvcc_tx;
CHECK(!tx_state->has_timestamp());
+ // Don't support COMMIT_WAIT for tests that don't boot a tablet server.
+ CHECK_NE(tx_state->external_consistency_mode(), COMMIT_WAIT);
- // We either assign a timestamp in the future, if the consistency mode is
COMMIT_WAIT, or
- // we assign one in the present if the consistency mode is any other one.
- if (tx_state->external_consistency_mode() == COMMIT_WAIT) {
- mvcc_tx.reset(new ScopedTransaction(&mvcc_,
ScopedTransaction::NOW_LATEST));
- } else {
- mvcc_tx.reset(new ScopedTransaction(&mvcc_, ScopedTransaction::NOW));
+ // Make sure timestamp assignment and transaction start are atomic, for
tests.
+ //
+ // This is to make sure that when test txns advance safe time later, we
don't have
+ // any txn in-flight between getting a timestamp and being started.
Otherwise we
+ // might run the risk of assigning a timestamp to txn1, and have another txn
+ // get a timestamp/start/advance safe time before txn1 starts making txn1's
timestamp
+ // invalid on start.
+ {
+ std::lock_guard<simple_spinlock> l(test_start_txn_lock_);
+ tx_state->set_timestamp(clock_->Now());
+ StartTransaction(tx_state);
}
- tx_state->SetMvccTxAndTimestamp(std::move(mvcc_tx));
}
void Tablet::StartTransaction(WriteTransactionState* tx_state) {
http://git-wip-us.apache.org/repos/asf/kudu/blob/cc120dac/src/kudu/tablet/tablet.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet.h b/src/kudu/tablet/tablet.h
index 56b28e4..994f98e 100644
--- a/src/kudu/tablet/tablet.h
+++ b/src/kudu/tablet/tablet.h
@@ -127,7 +127,8 @@ class Tablet {
// it's not the first thing in a transaction!
void StartTransaction(WriteTransactionState* tx_state);
- // Like the above but actually assigns the timestamp. Only used for tests.
+ // Like the above but actually assigns the timestamp. Only used for tests
that
+ // don't boot a tablet server.
void AssignTimestampAndStartTransactionForTests(WriteTransactionState*
tx_state);
// Insert a new row into the tablet.
@@ -476,6 +477,12 @@ class Tablet {
std::string LogPrefix() const;
+ // Test-only lock that synchronizes access to
AssignTimestampAndStartTransactionForTests().
+ // Tests that use LocalTabletWriter take this lock to synchronize timestamp
assignment,
+ // transaction start and safe time adjustment.
+ // NOTE: Should not be taken on non-test paths.
+ mutable simple_spinlock test_start_txn_lock_;
+
// Lock protecting schema_ and key_schema_.
//
// Writers take this lock in shared mode before decoding and projecting
http://git-wip-us.apache.org/repos/asf/kudu/blob/cc120dac/src/kudu/tablet/tablet_bootstrap.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_bootstrap.cc
b/src/kudu/tablet/tablet_bootstrap.cc
index 8650e81..55a76d2 100644
--- a/src/kudu/tablet/tablet_bootstrap.cc
+++ b/src/kudu/tablet/tablet_bootstrap.cc
@@ -500,7 +500,7 @@ Status TabletBootstrap::Bootstrap(shared_ptr<Tablet>*
rebuilt_tablet,
// Before playing any segments we set the safe and clean times to 'kMin' so
that
// the MvccManager will accept all transactions that we replay as
uncommitted.
- tablet_->mvcc_manager()->OfflineAdjustSafeTime(Timestamp::kMin);
+ tablet_->mvcc_manager()->AdjustSafeTime(Timestamp::kMin);
RETURN_NOT_OK_PREPEND(PlaySegments(consensus_info), "Failed log replay.
Reason");
// Flush the consensus metadata once at the end to persist our changes, if
any.
@@ -1005,7 +1005,7 @@ Status TabletBootstrap::HandleEntryPair(LogEntryPB*
replicate_entry, LogEntryPB*
Timestamp(replicate->timestamp()),
MonoDelta::FromMicroseconds(-FLAGS_max_clock_sync_error_usec));
}
- tablet_->mvcc_manager()->OfflineAdjustSafeTime(safe_time);
+ tablet_->mvcc_manager()->AdjustSafeTime(safe_time);
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/cc120dac/src/kudu/tablet/transactions/transaction_driver.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/transaction_driver.cc
b/src/kudu/tablet/transactions/transaction_driver.cc
index 623767b..23beaad 100644
--- a/src/kudu/tablet/transactions/transaction_driver.cc
+++ b/src/kudu/tablet/transactions/transaction_driver.cc
@@ -445,7 +445,7 @@ Status TransactionDriver::ApplyAsync() {
// Now that the transaction is committed in consensus advance the safe
time.
if (transaction_->state()->external_consistency_mode() != COMMIT_WAIT) {
transaction_->state()->tablet_peer()->tablet()->mvcc_manager()->
- OfflineAdjustSafeTime(transaction_->state()->timestamp());
+ AdjustSafeTime(transaction_->state()->timestamp());
}
} else {
DCHECK_EQ(replication_state_, REPLICATION_FAILED);