Repository: kudu Updated Branches: refs/heads/master c19f30225 -> 3d3c6f033
KUDU-798 (part 1) Unify leader/follower mvcc behavior This patch does the following: 1) It moves timestamp assignment from tablet and into the TransactionDriver to be done prior to pushing the operation to consensus for replication. Follow up patches will move it to be done within consensus itself (though not necessarily managed by any of the consensus classes). 2) It makes all operations be "operations at a timestamp", making all operations have the same behavior within mvcc independently of whether they were started at the leader or at a follower. Follow up patches will completely remove the Mvcc APIs for automatic safe time advancement and timestamp assignment and will introduce the new entity responsible for "safe time". Change-Id: I3ba7212f9211f585d4bef00e5ccfc24d5eece224 Reviewed-on: http://gerrit.cloudera.org:8080/5055 Tested-by: Kudu Jenkins Reviewed-by: Todd Lipcon <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/b8093f03 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/b8093f03 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/b8093f03 Branch: refs/heads/master Commit: b8093f033504bb0cd2de94500a59225e61f5e7bd Parents: c19f302 Author: David Alves <[email protected]> Authored: Fri Nov 11 11:31:42 2016 -0800 Committer: David Ribeiro Alves <[email protected]> Committed: Tue Nov 29 21:53:56 2016 +0000 ---------------------------------------------------------------------- src/kudu/tablet/local_tablet_writer.h | 2 +- src/kudu/tablet/tablet.cc | 22 +++++----- src/kudu/tablet/tablet.h | 38 +++-------------- src/kudu/tablet/tablet_peer.cc | 6 ++- .../tablet/transactions/transaction_driver.cc | 25 ++++++++--- .../tablet/transactions/transaction_driver.h | 44 +++++++++++++++++++- .../transactions/transaction_tracker-test.cc | 14 ++++--- 7 files changed, 93 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/b8093f03/src/kudu/tablet/local_tablet_writer.h ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/local_tablet_writer.h b/src/kudu/tablet/local_tablet_writer.h index 7d488e8..ee9fbdb 100644 --- a/src/kudu/tablet/local_tablet_writer.h +++ b/src/kudu/tablet/local_tablet_writer.h @@ -96,7 +96,7 @@ class LocalTabletWriter { RETURN_NOT_OK(tablet_->DecodeWriteOperations(client_schema_, tx_state_.get())); RETURN_NOT_OK(tablet_->AcquireRowLocks(tx_state_.get())); - tablet_->StartTransaction(tx_state_.get()); + tablet_->AssignTimestampAndStartTransactionForTests(tx_state_.get()); // Create a "fake" OpId and set it in the TransactionState for anchoring. tx_state_->mutable_op_id()->CopyFrom(consensus::MaximumOpId()); http://git-wip-us.apache.org/repos/asf/kudu/blob/b8093f03/src/kudu/tablet/tablet.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc index bcde9d0..3793026 100644 --- a/src/kudu/tablet/tablet.cc +++ b/src/kudu/tablet/tablet.cc @@ -368,18 +368,13 @@ Status Tablet::AcquireLockForOp(WriteTransactionState* tx_state, RowOp* op) { return Status::OK(); } -void Tablet::StartTransaction(WriteTransactionState* tx_state) { +void Tablet::AssignTimestampAndStartTransactionForTests(WriteTransactionState* tx_state) { gscoped_ptr<ScopedTransaction> mvcc_tx; + CHECK(!tx_state->has_timestamp()); - // If the state already has a timestamp then we're replaying a transaction that occurred - // before a crash or at another node... - if (tx_state->has_timestamp()) { - mvcc_tx.reset(new ScopedTransaction(&mvcc_, tx_state->timestamp())); - - // ... otherwise this is a new transaction and we must assign a new timestamp. We either - // assign a timestamp in the future, if the consistency mode is COMMIT_WAIT, or we assign - // one in the present if the consistency mode is any other one. - } else if (tx_state->external_consistency_mode() == COMMIT_WAIT) { + // We either assign a timestamp in the future, if the consistency mode is COMMIT_WAIT, or + // we assign one in the present if the consistency mode is any other one. + if (tx_state->external_consistency_mode() == COMMIT_WAIT) { mvcc_tx.reset(new ScopedTransaction(&mvcc_, ScopedTransaction::NOW_LATEST)); } else { mvcc_tx.reset(new ScopedTransaction(&mvcc_, ScopedTransaction::NOW)); @@ -387,6 +382,13 @@ void Tablet::StartTransaction(WriteTransactionState* tx_state) { tx_state->SetMvccTxAndTimestamp(std::move(mvcc_tx)); } +void Tablet::StartTransaction(WriteTransactionState* tx_state) { + gscoped_ptr<ScopedTransaction> mvcc_tx; + DCHECK(tx_state->has_timestamp()); + mvcc_tx.reset(new ScopedTransaction(&mvcc_, tx_state->timestamp())); + tx_state->SetMvccTxAndTimestamp(std::move(mvcc_tx)); +} + Status Tablet::InsertOrUpsertUnlocked(WriteTransactionState *tx_state, RowOp* op, ProbeStats* stats) { http://git-wip-us.apache.org/repos/asf/kudu/blob/b8093f03/src/kudu/tablet/tablet.h ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/tablet.h b/src/kudu/tablet/tablet.h index 2986345..56b28e4 100644 --- a/src/kudu/tablet/tablet.h +++ b/src/kudu/tablet/tablet.h @@ -121,41 +121,15 @@ class Tablet { // otherwise destructed). Status AcquireRowLocks(WriteTransactionState* tx_state); - // Finish the Prepare phase of a write transaction. + // Starts an MVCC transaction which must have a pre-assigned timestamp. // - // Starts an MVCC transaction and assigns a timestamp for the transaction. - // This also snapshots the current set of tablet components into the transaction - // state. - // - // This should always be done _after_ any relevant row locks are acquired - // (using CreatePreparedInsert/CreatePreparedMutate). This ensures that, - // within each row, timestamps only move forward. If we took a timestamp before - // getting the row lock, we could have the following situation: - // - // Thread 1 | Thread 2 - // ---------------------- - // Start tx 1 | - // | Start tx 2 - // | Obtain row lock - // | Update row - // | Commit tx 2 - // Obtain row lock | - // Delete row | - // Commit tx 1 - // - // This would cause the mutation list to look like: @t1: DELETE, @t2: UPDATE - // which is invalid, since we expect to be able to be able to replay mutations - // in increasing timestamp order on a given row. - // - // This requirement is basically two-phase-locking: the order in which row locks - // are acquired for transactions determines their serialization order. If/when - // we support multi-node serializable transactions, we'll have to acquire _all_ - // row locks (across all nodes) before obtaining a timestamp. - // - // TODO: rename this to something like "FinishPrepare" or "StartApply", since + // TODO(todd): rename this to something like "FinishPrepare" or "StartApply", since // it's not the first thing in a transaction! void StartTransaction(WriteTransactionState* tx_state); + // Like the above but actually assigns the timestamp. Only used for tests. + void AssignTimestampAndStartTransactionForTests(WriteTransactionState* tx_state); + // Insert a new row into the tablet. // // The provided 'data' slice should have length equivalent to this @@ -399,7 +373,7 @@ class Tablet { // - the tablet components have been acquired // - the operation has been decoded Status InsertOrUpsertUnlocked(WriteTransactionState *tx_state, - RowOp* insert, + RowOp* op, ProbeStats* stats); // Same as above, but for UPDATE. http://git-wip-us.apache.org/repos/asf/kudu/blob/b8093f03/src/kudu/tablet/tablet_peer.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/tablet_peer.cc b/src/kudu/tablet/tablet_peer.cc index 46ce6ec..f636f9c 100644 --- a/src/kudu/tablet/tablet_peer.cc +++ b/src/kudu/tablet/tablet_peer.cc @@ -554,7 +554,8 @@ Status TabletPeer::NewLeaderTransactionDriver(gscoped_ptr<Transaction> transacti log_.get(), prepare_pool_.get(), apply_pool_, - &txn_order_verifier_); + &txn_order_verifier_, + clock_); RETURN_NOT_OK(tx_driver->Init(std::move(transaction), consensus::LEADER)); driver->swap(tx_driver); @@ -569,7 +570,8 @@ Status TabletPeer::NewReplicaTransactionDriver(gscoped_ptr<Transaction> transact log_.get(), prepare_pool_.get(), apply_pool_, - &txn_order_verifier_); + &txn_order_verifier_, + clock_); RETURN_NOT_OK(tx_driver->Init(std::move(transaction), consensus::REPLICA)); driver->swap(tx_driver); http://git-wip-us.apache.org/repos/asf/kudu/blob/b8093f03/src/kudu/tablet/transactions/transaction_driver.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/transactions/transaction_driver.cc b/src/kudu/tablet/transactions/transaction_driver.cc index 3a10c16..623767b 100644 --- a/src/kudu/tablet/transactions/transaction_driver.cc +++ b/src/kudu/tablet/transactions/transaction_driver.cc @@ -86,13 +86,15 @@ TransactionDriver::TransactionDriver(TransactionTracker *txn_tracker, Log* log, ThreadPool* prepare_pool, ThreadPool* apply_pool, - TransactionOrderVerifier* order_verifier) + TransactionOrderVerifier* order_verifier, + scoped_refptr<server::Clock> clock) : txn_tracker_(txn_tracker), consensus_(consensus), log_(log), prepare_pool_(prepare_pool), apply_pool_(apply_pool), order_verifier_(order_verifier), + clock_(std::move(clock)), trace_(new Trace()), start_time_(MonoTime::Now()), replication_state_(NOT_REPLICATING), @@ -241,7 +243,6 @@ Status TransactionDriver::PrepareAndStart() { prepare_physical_timestamp_ = GetMonoTimeMicros(); RETURN_NOT_OK(transaction_->Prepare()); - RETURN_NOT_OK(transaction_->Start()); // Only take the lock long enough to take a local copy of the // replication state and set our prepare state. This ensures that @@ -259,6 +260,7 @@ Status TransactionDriver::PrepareAndStart() { // preempted after the state is prepared apply can be triggered by another thread without the // rpc being registered. if (transaction_->type() == consensus::REPLICA) { + RETURN_NOT_OK(transaction_->Start()); RegisterFollowerTransactionOnResultTracker(); // ... else we're a client-started transaction. Make sure we're still the driver of the // RPC and give up if we aren't. @@ -277,13 +279,25 @@ Status TransactionDriver::PrepareAndStart() { switch (repl_state_copy) { case NOT_REPLICATING: { - // Set the timestamp in the message, now that it's prepared. + // Assign the timestamp just before submitting the transaction to consensus, if + // it doesn't have one. + // This is a placeholder since in the near future the timestamp will be assigned. + // within consensus. + // TODO(dralves) Remove this when the new TimeManager class gets in (part of KUDU-798) + DCHECK(!transaction_->state()->has_timestamp()); + if (transaction_->state()->external_consistency_mode() == COMMIT_WAIT) { + transaction_->state()->set_timestamp(clock_->NowLatest()); + } else { + transaction_->state()->set_timestamp(clock_->Now()); + } + transaction_->state()->consensus_round()->replicate_msg()->set_timestamp( transaction_->state()->timestamp().ToUint64()); + RETURN_NOT_OK(transaction_->Start()); + VLOG_WITH_PREFIX(4) << "Triggering consensus repl"; // Trigger the consensus replication. - { std::lock_guard<simple_spinlock> lock(lock_); replication_state_ = REPLICATING; @@ -427,8 +441,7 @@ Status TransactionDriver::ApplyAsync() { DCHECK_EQ(prepare_state_, PREPARED); if (transaction_status_.ok()) { DCHECK_EQ(replication_state_, REPLICATED); - order_verifier_->CheckApply(op_id_copy_.index(), - prepare_physical_timestamp_); + order_verifier_->CheckApply(op_id_copy_.index(), prepare_physical_timestamp_); // Now that the transaction is committed in consensus advance the safe time. if (transaction_->state()->external_consistency_mode() != COMMIT_WAIT) { transaction_->state()->tablet_peer()->tablet()->mvcc_manager()-> http://git-wip-us.apache.org/repos/asf/kudu/blob/b8093f03/src/kudu/tablet/transactions/transaction_driver.h ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/transactions/transaction_driver.h b/src/kudu/tablet/transactions/transaction_driver.h index 0501299..1072703 100644 --- a/src/kudu/tablet/transactions/transaction_driver.h +++ b/src/kudu/tablet/transactions/transaction_driver.h @@ -24,6 +24,7 @@ #include "kudu/consensus/consensus.h" #include "kudu/gutil/ref_counted.h" #include "kudu/gutil/walltime.h" +#include "kudu/server/clock.h" #include "kudu/tablet/transactions/transaction.h" #include "kudu/util/status.h" #include "kudu/util/trace.h" @@ -96,6 +97,40 @@ class TransactionTracker; // // =========================================================================================== // +// Ordering requirements for lock acquisition and write (mvcc) transaction start +// +// On the leader side, starting the mvcc transaction for writes +// (calling tablet_->StartTransaction()) must always be done _after_ any relevant row locks are +// acquired (using AcquireLockForOp). This ensures that, within each row, timestamps only move +// forward. If we took a timestamp before getting the row lock, we could have the following +// situation: +// +// Thread 1 | Thread 2 +// ---------------------- +// Start tx 1 | +// | Start tx 2 +// | Obtain row lock +// | Update row +// | Commit tx 2 +// Obtain row lock | +// Delete row | +// Commit tx 1 +// +// This would cause the mutation list to look like: @t1: DELETE, @t2: UPDATE which is invalid, +// since we expect to be able to be able to replay mutations in increasing timestamp order on a +// given row. +// +// This requirement is basically two-phase-locking: the order in which row locks are acquired for +// transactions determines their serialization order. If/when we support multi-node serializable +// transactions, we'll have to acquire _all_ row locks (across all nodes) before obtaining a +// timestamp. +// +// Note that on non-leader replicas this requirement is no longer relevant. The leader assigned +// the timestamps and serialized the transactions properly, so calling tablet_->StartTransaction() +// before lock acquisition on non-leader replicas is inconsequential. +// +// =========================================================================================== +// // Tracking transaction results for exactly once semantics // // Exactly once semantics for transactions require that the results of previous executions @@ -188,7 +223,8 @@ class TransactionDriver : public RefCountedThreadSafe<TransactionDriver> { log::Log* log, ThreadPool* prepare_pool, ThreadPool* apply_pool, - TransactionOrderVerifier* order_verifier); + TransactionOrderVerifier* order_verifier, + scoped_refptr<server::Clock> clock); // Perform any non-constructor initialization. Sets the transaction // that will be executed. @@ -264,6 +300,7 @@ class TransactionDriver : public RefCountedThreadSafe<TransactionDriver> { // The task submitted to the prepare threadpool to prepare and start // the transaction. If PrepareAndStart() fails, calls HandleFailure. void PrepareAndStartTask(); + // Actually prepare and start. Status PrepareAndStart(); @@ -317,6 +354,11 @@ class TransactionDriver : public RefCountedThreadSafe<TransactionDriver> { // Lock that synchronizes access to the transaction's state. mutable simple_spinlock lock_; + // Temporarily have the clock on the driver so that we can assign timestamps to + // transactions. + // TODO(dralves) Remove this when the new TimeManager class gets in (part of KUDU-798). + scoped_refptr<server::Clock> clock_; + // A copy of the transaction's OpId, set when the transaction first // receives one from Consensus and uninitialized until then. // TODO(todd): we have three separate copies of this now -- in TransactionState, http://git-wip-us.apache.org/repos/asf/kudu/blob/b8093f03/src/kudu/tablet/transactions/transaction_tracker-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/transactions/transaction_tracker-test.cc b/src/kudu/tablet/transactions/transaction_tracker-test.cc index 6b45157..54ef6fe 100644 --- a/src/kudu/tablet/transactions/transaction_tracker-test.cc +++ b/src/kudu/tablet/transactions/transaction_tracker-test.cc @@ -88,12 +88,14 @@ class TransactionTrackerTest : public KuduTest { vector<scoped_refptr<TransactionDriver> >* drivers) { vector<scoped_refptr<TransactionDriver> > local_drivers; for (int i = 0; i < num_drivers; i++) { - scoped_refptr<TransactionDriver> driver(new TransactionDriver(&tracker_, - nullptr, - nullptr, - nullptr, - nullptr, - nullptr)); + scoped_refptr<TransactionDriver> driver( + new TransactionDriver(&tracker_, + nullptr, + nullptr, + nullptr, + nullptr, + nullptr, + scoped_refptr<server::Clock>())); gscoped_ptr<NoOpTransaction> tx(new NoOpTransaction(new NoOpTransactionState)); RETURN_NOT_OK(driver->Init(tx.PassAs<Transaction>(), consensus::LEADER)); local_drivers.push_back(driver);
