Repository: kudu
Updated Branches:
  refs/heads/master c19f30225 -> 3d3c6f033


KUDU-798 (part 1) Unify leader/follower mvcc behavior

This patch does the following:
1) It moves timestamp assignment from tablet and into the
TransactionDriver to be done prior to pushing the operation to
consensus for replication. Follow up patches will move it to be done
within consensus itself (though not necessarily managed by any of the
consensus classes).
2) It makes all operations be "operations at a timestamp", making
all operations have the same behavior within mvcc independently of
whether they were started at the leader or at a follower.

Follow up patches will completely remove the Mvcc APIs for automatic
safe time advancement and timestamp assignment and will introduce
the new entity responsible for "safe time".

Change-Id: I3ba7212f9211f585d4bef00e5ccfc24d5eece224
Reviewed-on: http://gerrit.cloudera.org:8080/5055
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/b8093f03
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/b8093f03
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/b8093f03

Branch: refs/heads/master
Commit: b8093f033504bb0cd2de94500a59225e61f5e7bd
Parents: c19f302
Author: David Alves <[email protected]>
Authored: Fri Nov 11 11:31:42 2016 -0800
Committer: David Ribeiro Alves <[email protected]>
Committed: Tue Nov 29 21:53:56 2016 +0000

----------------------------------------------------------------------
 src/kudu/tablet/local_tablet_writer.h           |  2 +-
 src/kudu/tablet/tablet.cc                       | 22 +++++-----
 src/kudu/tablet/tablet.h                        | 38 +++--------------
 src/kudu/tablet/tablet_peer.cc                  |  6 ++-
 .../tablet/transactions/transaction_driver.cc   | 25 ++++++++---
 .../tablet/transactions/transaction_driver.h    | 44 +++++++++++++++++++-
 .../transactions/transaction_tracker-test.cc    | 14 ++++---
 7 files changed, 93 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/b8093f03/src/kudu/tablet/local_tablet_writer.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/local_tablet_writer.h 
b/src/kudu/tablet/local_tablet_writer.h
index 7d488e8..ee9fbdb 100644
--- a/src/kudu/tablet/local_tablet_writer.h
+++ b/src/kudu/tablet/local_tablet_writer.h
@@ -96,7 +96,7 @@ class LocalTabletWriter {
 
     RETURN_NOT_OK(tablet_->DecodeWriteOperations(client_schema_, 
tx_state_.get()));
     RETURN_NOT_OK(tablet_->AcquireRowLocks(tx_state_.get()));
-    tablet_->StartTransaction(tx_state_.get());
+    tablet_->AssignTimestampAndStartTransactionForTests(tx_state_.get());
 
     // Create a "fake" OpId and set it in the TransactionState for anchoring.
     tx_state_->mutable_op_id()->CopyFrom(consensus::MaximumOpId());

http://git-wip-us.apache.org/repos/asf/kudu/blob/b8093f03/src/kudu/tablet/tablet.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index bcde9d0..3793026 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -368,18 +368,13 @@ Status Tablet::AcquireLockForOp(WriteTransactionState* 
tx_state, RowOp* op) {
   return Status::OK();
 }
 
-void Tablet::StartTransaction(WriteTransactionState* tx_state) {
+void Tablet::AssignTimestampAndStartTransactionForTests(WriteTransactionState* 
tx_state) {
   gscoped_ptr<ScopedTransaction> mvcc_tx;
+  CHECK(!tx_state->has_timestamp());
 
-  // If the state already has a timestamp then we're replaying a transaction 
that occurred
-  // before a crash or at another node...
-  if (tx_state->has_timestamp()) {
-    mvcc_tx.reset(new ScopedTransaction(&mvcc_, tx_state->timestamp()));
-
-  // ... otherwise this is a new transaction and we must assign a new 
timestamp. We either
-  // assign a timestamp in the future, if the consistency mode is COMMIT_WAIT, 
or we assign
-  // one in the present if the consistency mode is any other one.
-  } else if (tx_state->external_consistency_mode() == COMMIT_WAIT) {
+  // We either assign a timestamp in the future, if the consistency mode is 
COMMIT_WAIT, or
+  // we assign one in the present if the consistency mode is any other one.
+  if (tx_state->external_consistency_mode() == COMMIT_WAIT) {
     mvcc_tx.reset(new ScopedTransaction(&mvcc_, 
ScopedTransaction::NOW_LATEST));
   } else {
     mvcc_tx.reset(new ScopedTransaction(&mvcc_, ScopedTransaction::NOW));
@@ -387,6 +382,13 @@ void Tablet::StartTransaction(WriteTransactionState* 
tx_state) {
   tx_state->SetMvccTxAndTimestamp(std::move(mvcc_tx));
 }
 
+void Tablet::StartTransaction(WriteTransactionState* tx_state) {
+  gscoped_ptr<ScopedTransaction> mvcc_tx;
+  DCHECK(tx_state->has_timestamp());
+  mvcc_tx.reset(new ScopedTransaction(&mvcc_, tx_state->timestamp()));
+  tx_state->SetMvccTxAndTimestamp(std::move(mvcc_tx));
+}
+
 Status Tablet::InsertOrUpsertUnlocked(WriteTransactionState *tx_state,
                                       RowOp* op,
                                       ProbeStats* stats) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/b8093f03/src/kudu/tablet/tablet.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet.h b/src/kudu/tablet/tablet.h
index 2986345..56b28e4 100644
--- a/src/kudu/tablet/tablet.h
+++ b/src/kudu/tablet/tablet.h
@@ -121,41 +121,15 @@ class Tablet {
   // otherwise destructed).
   Status AcquireRowLocks(WriteTransactionState* tx_state);
 
-  // Finish the Prepare phase of a write transaction.
+  // Starts an MVCC transaction which must have a pre-assigned timestamp.
   //
-  // Starts an MVCC transaction and assigns a timestamp for the transaction.
-  // This also snapshots the current set of tablet components into the 
transaction
-  // state.
-  //
-  // This should always be done _after_ any relevant row locks are acquired
-  // (using CreatePreparedInsert/CreatePreparedMutate). This ensures that,
-  // within each row, timestamps only move forward. If we took a timestamp 
before
-  // getting the row lock, we could have the following situation:
-  //
-  //   Thread 1         |  Thread 2
-  //   ----------------------
-  //   Start tx 1       |
-  //                    |  Start tx 2
-  //                    |  Obtain row lock
-  //                    |  Update row
-  //                    |  Commit tx 2
-  //   Obtain row lock  |
-  //   Delete row       |
-  //   Commit tx 1
-  //
-  // This would cause the mutation list to look like: @t1: DELETE, @t2: UPDATE
-  // which is invalid, since we expect to be able to be able to replay 
mutations
-  // in increasing timestamp order on a given row.
-  //
-  // This requirement is basically two-phase-locking: the order in which row 
locks
-  // are acquired for transactions determines their serialization order. 
If/when
-  // we support multi-node serializable transactions, we'll have to acquire 
_all_
-  // row locks (across all nodes) before obtaining a timestamp.
-  //
-  // TODO: rename this to something like "FinishPrepare" or "StartApply", since
+  // TODO(todd): rename this to something like "FinishPrepare" or 
"StartApply", since
   // it's not the first thing in a transaction!
   void StartTransaction(WriteTransactionState* tx_state);
 
+  // Like the above but actually assigns the timestamp. Only used for tests.
+  void AssignTimestampAndStartTransactionForTests(WriteTransactionState* 
tx_state);
+
   // Insert a new row into the tablet.
   //
   // The provided 'data' slice should have length equivalent to this
@@ -399,7 +373,7 @@ class Tablet {
   // - the tablet components have been acquired
   // - the operation has been decoded
   Status InsertOrUpsertUnlocked(WriteTransactionState *tx_state,
-                                RowOp* insert,
+                                RowOp* op,
                                 ProbeStats* stats);
 
   // Same as above, but for UPDATE.

http://git-wip-us.apache.org/repos/asf/kudu/blob/b8093f03/src/kudu/tablet/tablet_peer.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_peer.cc b/src/kudu/tablet/tablet_peer.cc
index 46ce6ec..f636f9c 100644
--- a/src/kudu/tablet/tablet_peer.cc
+++ b/src/kudu/tablet/tablet_peer.cc
@@ -554,7 +554,8 @@ Status 
TabletPeer::NewLeaderTransactionDriver(gscoped_ptr<Transaction> transacti
     log_.get(),
     prepare_pool_.get(),
     apply_pool_,
-    &txn_order_verifier_);
+    &txn_order_verifier_,
+    clock_);
   RETURN_NOT_OK(tx_driver->Init(std::move(transaction), consensus::LEADER));
   driver->swap(tx_driver);
 
@@ -569,7 +570,8 @@ Status 
TabletPeer::NewReplicaTransactionDriver(gscoped_ptr<Transaction> transact
     log_.get(),
     prepare_pool_.get(),
     apply_pool_,
-    &txn_order_verifier_);
+    &txn_order_verifier_,
+    clock_);
   RETURN_NOT_OK(tx_driver->Init(std::move(transaction), consensus::REPLICA));
   driver->swap(tx_driver);
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/b8093f03/src/kudu/tablet/transactions/transaction_driver.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/transaction_driver.cc 
b/src/kudu/tablet/transactions/transaction_driver.cc
index 3a10c16..623767b 100644
--- a/src/kudu/tablet/transactions/transaction_driver.cc
+++ b/src/kudu/tablet/transactions/transaction_driver.cc
@@ -86,13 +86,15 @@ TransactionDriver::TransactionDriver(TransactionTracker 
*txn_tracker,
                                      Log* log,
                                      ThreadPool* prepare_pool,
                                      ThreadPool* apply_pool,
-                                     TransactionOrderVerifier* order_verifier)
+                                     TransactionOrderVerifier* order_verifier,
+                                     scoped_refptr<server::Clock> clock)
     : txn_tracker_(txn_tracker),
       consensus_(consensus),
       log_(log),
       prepare_pool_(prepare_pool),
       apply_pool_(apply_pool),
       order_verifier_(order_verifier),
+      clock_(std::move(clock)),
       trace_(new Trace()),
       start_time_(MonoTime::Now()),
       replication_state_(NOT_REPLICATING),
@@ -241,7 +243,6 @@ Status TransactionDriver::PrepareAndStart() {
   prepare_physical_timestamp_ = GetMonoTimeMicros();
 
   RETURN_NOT_OK(transaction_->Prepare());
-  RETURN_NOT_OK(transaction_->Start());
 
   // Only take the lock long enough to take a local copy of the
   // replication state and set our prepare state. This ensures that
@@ -259,6 +260,7 @@ Status TransactionDriver::PrepareAndStart() {
     // preempted after the state is prepared apply can be triggered by another 
thread without the
     // rpc being registered.
     if (transaction_->type() == consensus::REPLICA) {
+      RETURN_NOT_OK(transaction_->Start());
       RegisterFollowerTransactionOnResultTracker();
     // ... else we're a client-started transaction. Make sure we're still the 
driver of the
     // RPC and give up if we aren't.
@@ -277,13 +279,25 @@ Status TransactionDriver::PrepareAndStart() {
   switch (repl_state_copy) {
     case NOT_REPLICATING:
     {
-      // Set the timestamp in the message, now that it's prepared.
+      // Assign the timestamp just before submitting the transaction to 
consensus, if
+      // it doesn't have one.
+      // This is a placeholder since in the near future the timestamp will be 
assigned.
+      // within consensus.
+      // TODO(dralves) Remove this when the new TimeManager class gets in 
(part of KUDU-798)
+      DCHECK(!transaction_->state()->has_timestamp());
+      if (transaction_->state()->external_consistency_mode() == COMMIT_WAIT) {
+        transaction_->state()->set_timestamp(clock_->NowLatest());
+      } else {
+        transaction_->state()->set_timestamp(clock_->Now());
+      }
+
       transaction_->state()->consensus_round()->replicate_msg()->set_timestamp(
           transaction_->state()->timestamp().ToUint64());
 
+      RETURN_NOT_OK(transaction_->Start());
+
       VLOG_WITH_PREFIX(4) << "Triggering consensus repl";
       // Trigger the consensus replication.
-
       {
         std::lock_guard<simple_spinlock> lock(lock_);
         replication_state_ = REPLICATING;
@@ -427,8 +441,7 @@ Status TransactionDriver::ApplyAsync() {
     DCHECK_EQ(prepare_state_, PREPARED);
     if (transaction_status_.ok()) {
       DCHECK_EQ(replication_state_, REPLICATED);
-      order_verifier_->CheckApply(op_id_copy_.index(),
-                                  prepare_physical_timestamp_);
+      order_verifier_->CheckApply(op_id_copy_.index(), 
prepare_physical_timestamp_);
       // Now that the transaction is committed in consensus advance the safe 
time.
       if (transaction_->state()->external_consistency_mode() != COMMIT_WAIT) {
         transaction_->state()->tablet_peer()->tablet()->mvcc_manager()->

http://git-wip-us.apache.org/repos/asf/kudu/blob/b8093f03/src/kudu/tablet/transactions/transaction_driver.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/transaction_driver.h 
b/src/kudu/tablet/transactions/transaction_driver.h
index 0501299..1072703 100644
--- a/src/kudu/tablet/transactions/transaction_driver.h
+++ b/src/kudu/tablet/transactions/transaction_driver.h
@@ -24,6 +24,7 @@
 #include "kudu/consensus/consensus.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/walltime.h"
+#include "kudu/server/clock.h"
 #include "kudu/tablet/transactions/transaction.h"
 #include "kudu/util/status.h"
 #include "kudu/util/trace.h"
@@ -96,6 +97,40 @@ class TransactionTracker;
 //
 // 
===========================================================================================
 //
+// Ordering requirements for lock acquisition and write (mvcc) transaction 
start
+//
+// On the leader side, starting the mvcc transaction for writes
+// (calling tablet_->StartTransaction()) must always be done _after_ any 
relevant row locks are
+// acquired (using AcquireLockForOp). This ensures that, within each row, 
timestamps only move
+// forward. If we took a timestamp before getting the row lock, we could have 
the following
+// situation:
+//
+//   Thread 1         |  Thread 2
+//   ----------------------
+//   Start tx 1       |
+//                    |  Start tx 2
+//                    |  Obtain row lock
+//                    |  Update row
+//                    |  Commit tx 2
+//   Obtain row lock  |
+//   Delete row       |
+//   Commit tx 1
+//
+// This would cause the mutation list to look like: @t1: DELETE, @t2: UPDATE 
which is invalid,
+// since we expect to be able to be able to replay mutations in increasing 
timestamp order on a
+// given row.
+//
+// This requirement is basically two-phase-locking: the order in which row 
locks are acquired for
+// transactions determines their serialization order. If/when we support 
multi-node serializable
+// transactions, we'll have to acquire _all_ row locks (across all nodes) 
before obtaining a
+// timestamp.
+//
+// Note that on non-leader replicas this requirement is no longer relevant. 
The leader assigned
+// the timestamps and serialized the transactions properly, so calling 
tablet_->StartTransaction()
+// before lock acquisition on non-leader replicas is inconsequential.
+//
+// 
===========================================================================================
+//
 // Tracking transaction results for exactly once semantics
 //
 // Exactly once semantics for transactions require that the results of 
previous executions
@@ -188,7 +223,8 @@ class TransactionDriver : public 
RefCountedThreadSafe<TransactionDriver> {
                     log::Log* log,
                     ThreadPool* prepare_pool,
                     ThreadPool* apply_pool,
-                    TransactionOrderVerifier* order_verifier);
+                    TransactionOrderVerifier* order_verifier,
+                    scoped_refptr<server::Clock> clock);
 
   // Perform any non-constructor initialization. Sets the transaction
   // that will be executed.
@@ -264,6 +300,7 @@ class TransactionDriver : public 
RefCountedThreadSafe<TransactionDriver> {
   // The task submitted to the prepare threadpool to prepare and start
   // the transaction. If PrepareAndStart() fails, calls HandleFailure.
   void PrepareAndStartTask();
+
   // Actually prepare and start.
   Status PrepareAndStart();
 
@@ -317,6 +354,11 @@ class TransactionDriver : public 
RefCountedThreadSafe<TransactionDriver> {
   // Lock that synchronizes access to the transaction's state.
   mutable simple_spinlock lock_;
 
+  // Temporarily have the clock on the driver so that we can assign timestamps 
to
+  // transactions.
+  // TODO(dralves) Remove this when the new TimeManager class gets in (part of 
KUDU-798).
+  scoped_refptr<server::Clock> clock_;
+
   // A copy of the transaction's OpId, set when the transaction first
   // receives one from Consensus and uninitialized until then.
   // TODO(todd): we have three separate copies of this now -- in 
TransactionState,

http://git-wip-us.apache.org/repos/asf/kudu/blob/b8093f03/src/kudu/tablet/transactions/transaction_tracker-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/transaction_tracker-test.cc 
b/src/kudu/tablet/transactions/transaction_tracker-test.cc
index 6b45157..54ef6fe 100644
--- a/src/kudu/tablet/transactions/transaction_tracker-test.cc
+++ b/src/kudu/tablet/transactions/transaction_tracker-test.cc
@@ -88,12 +88,14 @@ class TransactionTrackerTest : public KuduTest {
                     vector<scoped_refptr<TransactionDriver> >* drivers) {
     vector<scoped_refptr<TransactionDriver> > local_drivers;
     for (int i = 0; i < num_drivers; i++) {
-      scoped_refptr<TransactionDriver> driver(new TransactionDriver(&tracker_,
-                                                                    nullptr,
-                                                                    nullptr,
-                                                                    nullptr,
-                                                                    nullptr,
-                                                                    nullptr));
+      scoped_refptr<TransactionDriver> driver(
+          new TransactionDriver(&tracker_,
+                                nullptr,
+                                nullptr,
+                                nullptr,
+                                nullptr,
+                                nullptr,
+                                scoped_refptr<server::Clock>()));
       gscoped_ptr<NoOpTransaction> tx(new NoOpTransaction(new 
NoOpTransactionState));
       RETURN_NOT_OK(driver->Init(tx.PassAs<Transaction>(), consensus::LEADER));
       local_drivers.push_back(driver);

Reply via email to