This is an automated email from the ASF dual-hosted git repository. awong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit fa3b5712ec7495e89b32e91c6c8d09d558f7d4f9 Author: Andrew Wong <[email protected]> AuthorDate: Wed Sep 16 22:54:47 2020 -0700 KUDU-2612 p10: have timestamp assignment account for commit timestamps One of the hurdles to performing a transaction's commit on a participant is that the commit process must ensure repeatable reads. Without multi-op transactions, this is done via a dance between the TimeManager (the entity that tracks and assigns timestamps) and the MvccManager (the entity that tracks the lifecycle of ops). This patch extends the dance between the TimeManager and MvccManager to ensure that when a participant commits a transaction, all future ops will be assigned a higher timestamp. I found it time-consuming to peruse the existing codebase for how timestamp assignment ensured repeatable reads, so I added a block comment to time_manager.h describing how it does so. I also added a section about how this patch extends it to ensure repeatable reads in the context of transactions. Change-Id: I0412fd0cf778d96f3fe6b14624d8d06942f40e72 Reviewed-on: http://gerrit.cloudera.org:8080/16470 Tested-by: Kudu Jenkins Reviewed-by: Alexey Serbin <[email protected]> --- src/kudu/consensus/consensus_queue.cc | 5 +- src/kudu/consensus/time_manager-test.cc | 74 ++++++++++++ src/kudu/consensus/time_manager.cc | 13 +++ src/kudu/consensus/time_manager.h | 129 +++++++++++++++------ .../integration-tests/txn_participant-itest.cc | 121 ++++++++++++++++++- src/kudu/tablet/ops/participant_op.cc | 110 +++++++++++++----- src/kudu/tablet/ops/participant_op.h | 33 +++++- src/kudu/tablet/ops/write_op.cc | 2 +- src/kudu/tablet/ops/write_op.h | 2 +- src/kudu/tablet/tablet.cc | 23 +++- src/kudu/tablet/tablet.h | 14 ++- src/kudu/tablet/tablet_bootstrap.cc | 8 +- src/kudu/tablet/txn_participant.cc | 12 ++ src/kudu/tablet/txn_participant.h | 24 +++- 14 files changed, 487 insertions(+), 83 deletions(-) diff --git a/src/kudu/consensus/consensus_queue.cc b/src/kudu/consensus/consensus_queue.cc index 18c64d6..05aa800 100644 --- a/src/kudu/consensus/consensus_queue.cc +++ b/src/kudu/consensus/consensus_queue.cc @@ -411,8 +411,9 @@ Status PeerMessageQueue::AppendOperations(vector<ReplicateRefPtr> msgs, } // Update safe time in the TimeManager if we're leader. - // This will 'unpin' safe time advancement, which had stopped since we assigned a timestamp to - // the message. + // This will 'unpin' safe time advancement, which had stopped since we + // assigned a timestamp to the message. + // // Until we have leader leases, replicas only call this when the message is committed. if (queue_state_.mode == LEADER) { time_manager_->AdvanceSafeTimeWithMessage(*msgs.back()->get()); diff --git a/src/kudu/consensus/time_manager-test.cc b/src/kudu/consensus/time_manager-test.cc index bd02933..32608db 100644 --- a/src/kudu/consensus/time_manager-test.cc +++ b/src/kudu/consensus/time_manager-test.cc @@ -18,6 +18,7 @@ #include "kudu/consensus/time_manager.h" #include <memory> +#include <string> #include <thread> #include <vector> @@ -215,5 +216,78 @@ TEST_F(TimeManagerTest, TestTimeManagerLeaderMode) { after_latch->Wait(); } +// Test simulating safe time advancement as should be performed by an op that +// finalizes a transaction's commit timestamp. The commit timestamp should be +// used as a lower bound on new op timestamps. +TEST_F(TimeManagerTest, TestUpdateClockWithCommitTimestamp) { + Timestamp init = clock_.Now(); + InitTimeManager(init); + time_manager_->SetLeaderMode(); + const auto kShortTimeout = MonoDelta::FromMilliseconds(10); + { + // Operate on a commit timestamp a while (five seconds) in the future. + Timestamp txn1_commit_ts(init.value() + 5000000000); + const auto safe_time_before_update = time_manager_->GetSafeTime(); + ASSERT_OK(time_manager_->UpdateClockAndLastAssignedTimestamp(txn1_commit_ts)); + + // The serial timestamp should have been bumped forward, but not the safe + // time, which is pinned until the call to AdvanceSafeTimeWithMessage(). + // Thus, the next timestamp assigned should be higher than the commit + // timestamp, but the safe time should still not have moved. + ASSERT_EQ(safe_time_before_update, time_manager_->GetSafeTime()); + ReplicateMsg txn1_commit_replicate; + ASSERT_OK(time_manager_->AssignTimestamp(&txn1_commit_replicate)); + ASSERT_GT(txn1_commit_replicate.timestamp(), txn1_commit_ts.value()); + ASSERT_EQ(safe_time_before_update, time_manager_->GetSafeTime()); + Status s = time_manager_->WaitUntilSafe(txn1_commit_ts, MonoTime::Now() + kShortTimeout); + ASSERT_TRUE(s.IsTimedOut()) << s.ToString(); + + // Once the safe time is advanced, we should readily be able to wait for + // the safe time to pass. + time_manager_->AdvanceSafeTimeWithMessage(txn1_commit_replicate); + ASSERT_GT(time_manager_->GetSafeTime(), safe_time_before_update); + ASSERT_OK(time_manager_->WaitUntilSafe(txn1_commit_ts, MonoTime::Now() + kShortTimeout)); + } + + // If we update the clock with a timestamp in the past (e.g. if the commit + // timestamp assigned for a transaction is in the past), the last assigned + // timestamp should remain where it was, and since no ops are otherwise known + // to be in-flight, safe time should march forward. + { + Timestamp txn2_commit_ts(init); + const auto safe_time_before_update = time_manager_->GetSafeTime(); + ASSERT_OK(time_manager_->UpdateClockAndLastAssignedTimestamp(txn2_commit_ts)); + + // Safe time should move forward. + ASSERT_LT(safe_time_before_update, time_manager_->GetSafeTime()); + + // The next timestamp assigned should be higher than the commit timestamp, + // and safe time should be pinned until explicitly advanced. + ReplicateMsg txn2_commit_replicate; + ASSERT_OK(time_manager_->AssignTimestamp(&txn2_commit_replicate)); + const auto commit_op_ts = Timestamp(txn2_commit_replicate.timestamp()); + ASSERT_GT(commit_op_ts, txn2_commit_ts); + ASSERT_GT(commit_op_ts, safe_time_before_update); + ASSERT_GT(commit_op_ts, time_manager_->GetSafeTime()); + ASSERT_OK(time_manager_->WaitUntilSafe(txn2_commit_ts, MonoTime::Now() + kShortTimeout)); + Status s = time_manager_->WaitUntilSafe(commit_op_ts, MonoTime::Now() + kShortTimeout); + ASSERT_TRUE(s.IsTimedOut()) << s.ToString(); + + // Once the safe time is bumped, it should be unpinned and return a value + // higher than any timestamp we've previously assigned. + time_manager_->AdvanceSafeTimeWithMessage(txn2_commit_replicate); + ASSERT_LT(txn2_commit_replicate.timestamp(), time_manager_->GetSafeTime().value()); + ASSERT_OK(time_manager_->WaitUntilSafe(txn2_commit_ts, MonoTime::Now() + kShortTimeout)); + ASSERT_OK(time_manager_->WaitUntilSafe(commit_op_ts, MonoTime::Now() + kShortTimeout)); + } + + // Finally, when in non-leader mode, bumping the last assigned timestamp is + // disallowed. + time_manager_->SetNonLeaderMode(); + const auto& now = clock_.Now(); + Status s = time_manager_->UpdateClockAndLastAssignedTimestamp(now); + ASSERT_TRUE(s.IsIllegalState()) << s.ToString(); +} + } // namespace consensus } // namespace kudu diff --git a/src/kudu/consensus/time_manager.cc b/src/kudu/consensus/time_manager.cc index d84ddd3..ad421cd 100644 --- a/src/kudu/consensus/time_manager.cc +++ b/src/kudu/consensus/time_manager.cc @@ -139,6 +139,19 @@ Status TimeManager::MessageReceivedFromLeader(const ReplicateMsg& message) { return Status::OK(); } +Status TimeManager::UpdateClockAndLastAssignedTimestamp(const Timestamp& timestamp) { + RETURN_NOT_OK(clock_->Update(timestamp)); + Lock l(lock_); + if (PREDICT_FALSE(mode_ == NON_LEADER)) { + return Status::IllegalState(Substitute( + "Cannot bump the last assigned timestamp. Tablet is not " + "in leader mode. Last heard from a leader: $0 ago.", + (MonoTime::Now() - last_advanced_safe_time_).ToString())); + } + last_serial_ts_assigned_ = std::max(timestamp, last_serial_ts_assigned_); + return Status::OK(); +} + void TimeManager::AdvanceSafeTimeWithMessage(const ReplicateMsg& message) { Lock l(lock_); if (GetMessageConsistencyMode(message) == CLIENT_PROPAGATED) { diff --git a/src/kudu/consensus/time_manager.h b/src/kudu/consensus/time_manager.h index b574960..5dc9606 100644 --- a/src/kudu/consensus/time_manager.h +++ b/src/kudu/consensus/time_manager.h @@ -39,35 +39,79 @@ class ReplicateMsg; // Manages timestamp assignment to consensus rounds and safe time advancement. // -// Safe time corresponds to a timestamp before which all ops have been applied to the -// tablet or are in-flight and is a monotonically increasing timestamp (see note at the end -// of this class comment). +// The TimeManager is used in conjunction with the MvccManager to define and +// uphold the safe time, i.e. a timestamp before which all ops have been +// applied to the tablet or are in-flight, which is useful for enabling +// repeatable reads. // -// Snapshot scans can use WaitUntilSafe() to wait for a timestamp to be safe. After this method -// returns an OK status, all the ops whose timestamps fall before the scan's timestamp -// will be either committed or in-flight. If the scanner additionally uses the MvccManager to wait -// until the given timestamp is clean, then the read will be repeatable. +// Example of a leader TimeManager updating the safe time: +// - TimeManager::AssignTimestamp() returns a timestamp T that is higher than +// any other replicated by a leader. +// - An op is registered with the MvccManager with the timestamp T. +// Conceptually, T is thusly considered safe. Since all future ops will be +// assigned higher timestamps, all ops that would have assigned a lower +// timestamps are already complete or in-flight. +// - The op is written to the WAL via the ConsensusQueue, and then replicated; +// TimeManger::AdvanceSafeTimeWithMessage() is called with the replicate +// message of the op with timestamp T, signifying the new safe time. +// - Until this step completes, snapshot scans, which call WaitUntilSafe(t) +// where t >= T, will block. // -// In leader mode the TimeManager is responsible for assigning timestamps to ops -// and for moving the leader's safe time, which in turn may be sent to replicas on heartbeats -// moving their safe time. The leader's safe time moves with the clock unless there has been a -// op that was assigned a timestamp that is not yet known by the queue -// (i.e. AdvanceSafeTimeWithMessage() hasn't been called on the corresponding message). -// In this case the TimeManager returns the last known safe time. +// On followers, safe time is advanced via the following methods, called from +// the Raft threads: +// - MessageReceivedFromLeader() is used by followers on replicate messages +// received by the leader to ensure that, if the follower were to become +// leader, any timestamps assigned would be higher than any previously +// replicated timestamps. This does not bump safe time, but it does ensure +// that, were this node to become leader, any later assigned timestamp will +// be higher than T. +// - AdvanceSafeTime() is used by followers using the safe timestamps that are +// heartbeated from leader, used for the sake of snapshot scans. // -// On non-leader mode this class tracks the safe time sent by the leader and updates waiters -// when it advances. +// The leader's safe time moves with its clock unless there has been a op that +// was assigned a timestamp that is not yet known by the queue (i.e. +// AdvanceSafeTimeWithMessage() hasn't been called on the corresponding +// message). In this case the TimeManager returns the last known safe time. // -// This class's leadership status is meant to be in tune with the queue's as the queue -// is responsible for broadcasting safe time from a leader (and will eventually be responsible -// for calculating that leader's lease). +// This class's leadership status is meant to be in tune with the queue's as +// the queue is responsible for broadcasting safe time from a leader (and will +// eventually be responsible for calculating that leader's lease). // -// NOTE: Until leader leases are implemented the cluster's safe time can occasionally move back. -// This does not mean, however, that the timestamp returned by GetSafeTime() can move back. -// GetSafeTime will still return monotonically increasing timestamps, it's just -// that, in certain corner cases, the timestamp returned by GetSafeTime() can't be trusted -// to mean that all future messages will be assigned future timestamps. -// This anomaly can cause non-repeatable reads in certain conditions. +// NOTE: Until leader leases are implemented the cluster's safe time can +// occasionally move back. This does not mean, however, that the timestamp +// returned by GetSafeTime() can move back. GetSafeTime will still return +// monotonically increasing timestamps, it's just that, in certain corner +// cases, the timestamp returned by GetSafeTime() can't be trusted to mean that +// all future messages will be assigned future timestamps. This anomaly can +// cause non-repeatable reads in certain conditions. +// +// Multi-op transactions +// --------------------- +// Transaction participant leaders also help orchestrate the assignment of its +// commit timestamp. Below is an example of how the TimeManager and MvccManager +// can be used to handle assignment of a commit timestamp: +// - The BEGIN_COMMIT op is assigned a timestamp T_bc by the participant +// leader via the above steps, denoting T_bc as safe. +// - Unlike a regular (e.g. write) op, the MVCC op registered for BEGIN_COMMIT +// is not finished when the op is applied -- instead, the MVCC op is +// maintained in memory for the time being. +// - Until the MVCC op is completed below, further snapshot scans at t where +// t > T_bc will block. +// - T_bc is sent to the transaction coordinator, and a commit timestamp +// T_commit is determined that is higher than all timestamps that were +// returned by the participants. +// - The coordinator effects a FINALIZE_COMMIT op on participants; the request +// contains T_commit. +// - Before starting the FINALIZE_COMMIT op, the leader updates its clock using +// TimeManager::UpdateClockAndLastAssignedTimestamp(), guaranteeing the next +// timestamp assigned will be higher than T_commit. +// - The FINALIZE_COMMIT op is assigned a timestamp T_fc > T_commit by the +// participant leader via the above steps, denoting T_fc as safe. +// - The MVCC op from earlier is used in lieu of a new MVCC op; since this op +// has been in-flight for the duration of the commit process, any scans at +// time t >= T_commit must have called WaitUntilSafe(t), followed by +// MvccManager::WaitForSnapshotWithAllCommitted(t), thereby waiting for all +// ops below t to complete. // // This class is thread safe. class TimeManager { @@ -82,25 +126,40 @@ class TimeManager { // Sets this TimeManager to non-leader mode. void SetNonLeaderMode(); - // Assigns a timestamp to 'message' according to the message's ExternalConsistencyMode and/or - // message type. + // Assigns a timestamp to 'message' according to the message's + // ExternalConsistencyMode and/or message type. // - // Note that the timestamp in 'message' is not considered safe until the message has - // been appended to the queue. Until then safe time is pinned to the last known value. - // When the message is appended later on, AdvanceSafeTimeWithMessage() is called and safe time - // is advanced. + // The timestamp in 'message' is not considered safe until the message has + // been written to the WAL and begun replicating to followers. Until then, + // safe time is pinned to the last known value. When the message is appended + // later on, AdvanceSafeTimeWithMessage() is called and safe time is + // advanced. // // Requires Leader mode (non-OK status otherwise). Status AssignTimestamp(ReplicateMsg* message); - // Updates the internal state based on 'message' received from a leader replica. - // Replicas are expected to call this for every message received from a valid leader. + // Updates the internal state based on 'message' received from a leader + // replica. Replicas are expected to call this for every message received + // from a valid leader. // - // Returns Status::OK if the message/leader is valid and the clock was correctly updated. + // Returns Status::OK if the message/leader is valid and the clock was + // correctly updated. // // Requires non-leader mode (CHECK failure if it isn't). Status MessageReceivedFromLeader(const ReplicateMsg& message); + // Updates the clock to move forward to 'timestamp' if it is in the future, + // and updates internal state to indicate that all further timestamps + // assigned should be higher than 'timestamp'. + // + // It is expected that leader participants of a transaction call this before + // assigning a timestamp to the op that finalizes the commit, ensuring that + // the finalizing op will be assigned a later timestamp. + // + // Returns a not-OK status if called while not the leader or if there is an + // error updating the clock ('timestamp' was too far in the future). + Status UpdateClockAndLastAssignedTimestamp(const Timestamp& timestamp); + // Advances safe time based on the timestamp and type of 'message'. // // This only moves safe time if 'message's timestamp is higher than the currently known one. @@ -198,8 +257,8 @@ class TimeManager { // The last serial timestamp that was assigned. Timestamp last_serial_ts_assigned_; - // On replicas this is the latest safe time received from the leader, on the leader this is - // the last serial timestamp appended to the queue. + // On followers this is the latest safe time received from the leader, on the + // leader this is the last serial timestamp appended to the queue. Timestamp last_safe_ts_; // The last time we advanced safe time. diff --git a/src/kudu/integration-tests/txn_participant-itest.cc b/src/kudu/integration-tests/txn_participant-itest.cc index 1656bd2..90addc3 100644 --- a/src/kudu/integration-tests/txn_participant-itest.cc +++ b/src/kudu/integration-tests/txn_participant-itest.cc @@ -28,13 +28,17 @@ #include <glog/logging.h> #include <gtest/gtest.h> +#include "kudu/clock/clock.h" +#include "kudu/common/timestamp.h" #include "kudu/common/wire_protocol.h" #include "kudu/consensus/raft_consensus.h" +#include "kudu/consensus/time_manager.h" #include "kudu/gutil/ref_counted.h" #include "kudu/gutil/strings/join.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/integration-tests/test_workload.h" #include "kudu/mini-cluster/internal_mini_cluster.h" +#include "kudu/tablet/mvcc.h" #include "kudu/tablet/tablet.h" #include "kudu/tablet/tablet_replica.h" #include "kudu/tablet/txn_participant-test-util.h" @@ -77,7 +81,8 @@ namespace itest { namespace { vector<Status> RunOnReplicas(const vector<TabletReplica*>& replicas, int64_t txn_id, - ParticipantOpPB::ParticipantOpType type) { + ParticipantOpPB::ParticipantOpType type, + int64_t commit_timestamp = kDummyCommitTimestamp) { vector<Status> statuses(replicas.size(), Status::Incomplete("")); vector<thread> threads; for (int i = 0; i < replicas.size(); i++) { @@ -95,6 +100,7 @@ vector<Status> RunOnReplicas(const vector<TabletReplica*>& replicas, } return statuses; } + string TxnsAsString(const vector<TxnParticipant::TxnEntry>& txns) { return JoinMapped(txns, [](const TxnParticipant::TxnEntry& txn) { @@ -103,6 +109,25 @@ string TxnsAsString(const vector<TxnParticipant::TxnEntry>& txns) { }, ","); } + +Status RunOnReplica(TabletReplica* replica, int64_t txn_id, + ParticipantOpPB::ParticipantOpType type, + int64_t commit_timestamp = kDummyCommitTimestamp) { + return RunOnReplicas({ replica }, txn_id, type, commit_timestamp)[0]; +} + +// Emulate a snapshot scan by waiting for the safe time to be advanced past 't' +// and for all ops before 't' to complete. +Status WaitForCompletedOps(TabletReplica* replica, Timestamp t, MonoDelta timeout) { + const auto deadline = MonoTime::Now() + timeout; + RETURN_NOT_OK_PREPEND( + replica->time_manager()->WaitUntilSafe(t, deadline), "Failed to wait for safe time"); + tablet::MvccSnapshot snap; + RETURN_NOT_OK_PREPEND( + replica->tablet()->mvcc_manager()->WaitForSnapshotWithAllApplied(t, &snap, deadline), + "Failed to wait for ops to complete"); + return Status::OK(); +} } // anonymous namespace class TxnParticipantITest : public KuduTest { @@ -250,6 +275,100 @@ TEST_F(TxnParticipantITest, TestCopyParticipantOps) { }); } +// Test to ensure that the mechanisms built to allow snapshot scans to wait for +// safe time advancement will actually wait for transactions to commit. +TEST_F(TxnParticipantITest, TestWaitOnFinalizeCommit) { + const int kLeaderIdx = 0; + vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx); + auto* leader_replica = replicas[kLeaderIdx]; + auto* follower_replica = replicas[kLeaderIdx + 1]; + auto* clock = leader_replica->clock(); + const int64_t kTxnId = 1; + ASSERT_OK(leader_replica->consensus()->WaitUntilLeaderForTests(MonoDelta::FromSeconds(10))); + ASSERT_OK(RunOnReplica(leader_replica, kTxnId, ParticipantOpPB::BEGIN_TXN)); + + // We should consistently be able to scan a bit in the future just by + // waiting on leaders. Leader safe times move forward with its clock, while + // followers need to wait to be heartbeated to. Wait on heartbeats (wait 2x + // the heartbeat time to avoid flakiness). + const auto kShortTimeout = MonoDelta::FromMilliseconds(10); + const auto before_commit_ts = clock->Now(); + ASSERT_OK(WaitForCompletedOps(leader_replica, before_commit_ts, kShortTimeout)); + SleepFor(MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms * 2)); + ASSERT_OK(WaitForCompletedOps(follower_replica, before_commit_ts, kShortTimeout)); + + // Once we begin committing, safe time will be pinned. To ensure repeatable + // reads, scans asking for a timestamp higher than the BEGIN_COMMIT op's + // timestamp should wait. + ASSERT_OK(RunOnReplica(leader_replica, kTxnId, ParticipantOpPB::BEGIN_COMMIT)); + const auto before_finalize_ts = clock->Now(); + Status s; + s = WaitForCompletedOps(leader_replica, before_finalize_ts, kShortTimeout); + ASSERT_TRUE(s.IsTimedOut()) << s.ToString(); + s = WaitForCompletedOps(follower_replica, before_finalize_ts, kShortTimeout); + ASSERT_TRUE(s.IsTimedOut()) << s.ToString(); + + // Even if we wait for heartbeats to happen, safe time will not be advanced + // until the commit is finalized. + SleepFor(MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms * 2)); + s = WaitForCompletedOps(leader_replica, before_finalize_ts, kShortTimeout); + ASSERT_TRUE(s.IsTimedOut()) << s.ToString(); + s = WaitForCompletedOps(follower_replica, before_finalize_ts, kShortTimeout); + ASSERT_TRUE(s.IsTimedOut()) << s.ToString(); + + // Just because safe time is pinned doesn't mean we can't scan anything. We + // should still be able to wait for older timestamps. + ASSERT_OK(WaitForCompletedOps(leader_replica, before_commit_ts, kShortTimeout)); + ASSERT_OK(WaitForCompletedOps(follower_replica, before_commit_ts, kShortTimeout)); + + // Once we finalize the commit, safe time will continue forward. + const auto commit_ts_val = clock->Now().value() + 10000; + const auto commit_ts = Timestamp(commit_ts_val); + ASSERT_OK(RunOnReplica(leader_replica, kTxnId, ParticipantOpPB::FINALIZE_COMMIT, commit_ts_val)); + ASSERT_OK(WaitForCompletedOps(leader_replica, before_finalize_ts, kShortTimeout)); + ASSERT_OK(WaitForCompletedOps(follower_replica, before_finalize_ts, kShortTimeout)); + ASSERT_OK(WaitForCompletedOps(leader_replica, commit_ts, kShortTimeout)); + ASSERT_OK(WaitForCompletedOps(follower_replica, commit_ts, kShortTimeout)); +} + +// Like the above test, but ensures we can wait properly even if the +// transaction is aborted. +TEST_F(TxnParticipantITest, TestWaitOnAbortCommit) { + const int kLeaderIdx = 0; + vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx); + auto* leader_replica = replicas[kLeaderIdx]; + auto* follower_replica = replicas[kLeaderIdx + 1]; + auto* clock = leader_replica->clock(); + const int64_t kTxnId = 1; + ASSERT_OK(leader_replica->consensus()->WaitUntilLeaderForTests(MonoDelta::FromSeconds(10))); + ASSERT_OK(RunOnReplica(leader_replica, kTxnId, ParticipantOpPB::BEGIN_TXN)); + + const auto kShortTimeout = MonoDelta::FromMilliseconds(10); + const auto before_commit_ts = clock->Now(); + // Once we begin committing, safe time will be pinned. To ensure repeatable + // reads, scans asking for a timestamp higher than the BEGIN_COMMIT op's + // timestamp should wait. + ASSERT_OK(RunOnReplica(leader_replica, kTxnId, ParticipantOpPB::BEGIN_COMMIT)); + const auto before_abort_ts = clock->Now(); + Status s = WaitForCompletedOps(leader_replica, before_abort_ts, kShortTimeout); + ASSERT_TRUE(s.IsTimedOut()) << s.ToString(); + s = WaitForCompletedOps(follower_replica, before_abort_ts, kShortTimeout); + ASSERT_TRUE(s.IsTimedOut()) << s.ToString(); + + // Just because safe time is pinned doesn't mean we can't scan anything. We + // should still be able to wait for older timestamps. + ASSERT_OK(WaitForCompletedOps(leader_replica, before_commit_ts, kShortTimeout)); + ASSERT_OK(WaitForCompletedOps(follower_replica, before_commit_ts, kShortTimeout)); + + // When we abort the transaction, safe time should continue to move forward. + // On followers, this will happen via heartbeats, so we need to wait to + // heartbeat (wait 2x the heartbeat time to avoid flakiness). + ASSERT_OK(RunOnReplica(leader_replica, kTxnId, ParticipantOpPB::ABORT_TXN)); + ASSERT_OK(WaitForCompletedOps(leader_replica, before_abort_ts, kShortTimeout)); + SleepFor(MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms * 2)); + ASSERT_OK(WaitForCompletedOps(follower_replica, before_abort_ts, kShortTimeout)); +} + class TxnParticipantElectionStormITest : public TxnParticipantITest { public: void SetUp() override { diff --git a/src/kudu/tablet/ops/participant_op.cc b/src/kudu/tablet/ops/participant_op.cc index 25b70c6..4251bd2 100644 --- a/src/kudu/tablet/ops/participant_op.cc +++ b/src/kudu/tablet/ops/participant_op.cc @@ -27,10 +27,13 @@ #include "kudu/consensus/consensus.pb.h" #include "kudu/consensus/opid.pb.h" #include "kudu/consensus/raft_consensus.h" +#include "kudu/consensus/time_manager.h" #include "kudu/gutil/port.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/rpc/rpc_header.pb.h" +#include "kudu/tablet/mvcc.h" #include "kudu/tablet/ops/op.h" +#include "kudu/tablet/tablet.h" #include "kudu/tablet/tablet_replica.h" #include "kudu/tablet/txn_participant.h" #include "kudu/util/debug/trace_event.h" @@ -106,35 +109,16 @@ Status ParticipantOpState::ValidateOp() const { return Status::OK(); } -Status ParticipantOpState::PerformOp(const OpId& op_id) { - const auto& op = request()->op(); - Txn* txn = txn_.get(); - Status s; - switch (op.type()) { - // NOTE: these can currently never fail because we are only updating - // metadata. When we begin validating write ops before committing, we'll - // need to populate the response with errors. - case ParticipantOpPB::BEGIN_TXN: { - txn->BeginTransaction(op_id); - break; - } - case ParticipantOpPB::BEGIN_COMMIT: { - txn->BeginCommit(op_id); - break; - } - case ParticipantOpPB::FINALIZE_COMMIT: { - txn->FinalizeCommit(op_id, op.finalized_commit_timestamp()); - break; - } - case ParticipantOpPB::ABORT_TXN: { - txn->AbortTransaction(op_id); - break; - } - case ParticipantOpPB::UNKNOWN: { - return Status::InvalidArgument("unknown op type"); - } - } - return Status::OK(); +void ParticipantOpState::SetMvccOp(unique_ptr<ScopedOp> mvcc_op) { + DCHECK_EQ(ParticipantOpPB::BEGIN_COMMIT, request()->op().type()); + DCHECK(nullptr == begin_commit_mvcc_op_); + begin_commit_mvcc_op_ = std::move(mvcc_op); +} + +void ParticipantOpState::ReleaseMvccOpToTxn() { + DCHECK_EQ(ParticipantOpPB::BEGIN_COMMIT, request()->op().type()); + DCHECK(begin_commit_mvcc_op_); + txn_->SetCommitOp(std::move(begin_commit_mvcc_op_)); } void ParticipantOp::NewReplicateMsg(unique_ptr<ReplicateMsg>* replicate_msg) { @@ -151,6 +135,15 @@ Status ParticipantOp::Prepare() { TRACE("PREPARE: Starting."); state_->AcquireTxnAndLock(); RETURN_NOT_OK(state_->ValidateOp()); + + // Before we assign a timestamp, bump the clock so further ops get assigned + // higher timestamps (including this one). + if (state_->request()->op().type() == ParticipantOpPB::FINALIZE_COMMIT && + type() == consensus::LEADER) { + DCHECK(!state_->consensus_round()->replicate_msg()->has_timestamp()); + RETURN_NOT_OK(state_->tablet_replica()->time_manager()->UpdateClockAndLastAssignedTimestamp( + state_->commit_timestamp())); + } TRACE("PREPARE: Finished."); return Status::OK(); } @@ -159,16 +152,68 @@ Status ParticipantOp::Start() { DCHECK(!state_->has_timestamp()); DCHECK(state_->consensus_round()->replicate_msg()->has_timestamp()); state_->set_timestamp(Timestamp(state_->consensus_round()->replicate_msg()->timestamp())); + if (state_->request()->op().type() == ParticipantOpPB::BEGIN_COMMIT) { + // When beginning to commit, register an MVCC op so scanners at later + // timestamps wait for the commit to complete. + state_->tablet_replica()->tablet()->StartOp(state_.get()); + } TRACE("START. Timestamp: $0", clock::HybridClock::GetPhysicalValueMicros(state_->timestamp())); return Status::OK(); } +Status ParticipantOpState::PerformOp(const consensus::OpId& op_id, CommitMsg** commit_msg) { + const auto& op = request()->op(); + const auto& op_type = request()->op().type(); + Status s; + switch (op_type) { + // NOTE: these can currently never fail because we are only updating + // metadata. When we begin validating write ops before committing, we'll + // need to populate the response with errors. + case ParticipantOpPB::BEGIN_TXN: { + txn_->BeginTransaction(op_id); + break; + } + case ParticipantOpPB::BEGIN_COMMIT: { + // TODO(awong): Wait for all ops below this timestamp to complete. + txn_->BeginCommit(op_id); + break; + } + case ParticipantOpPB::FINALIZE_COMMIT: { + txn_->FinalizeCommit(op_id, op.finalized_commit_timestamp()); + // NOTE: we may not have a commit op if we are bootstrapping. + // TODO(awong): consider not replaying the FINALIZE_COMMIT unless the + // BEGIN_COMMIT also needs to be replayed. + if (txn_->commit_op()) { + txn_->commit_op()->FinishApplying(); + } + break; + } + case ParticipantOpPB::ABORT_TXN: { + txn_->AbortTransaction(op_id); + if (txn_->commit_op()) { + txn_->commit_op()->Abort(); + } + break; + } + case ParticipantOpPB::UNKNOWN: { + return Status::InvalidArgument("unknown op type"); + } + } + *commit_msg = google::protobuf::Arena::CreateMessage<CommitMsg>(pb_arena()); + (*commit_msg)->set_op_type(OperationType::PARTICIPANT_OP); + return Status::OK(); +} + Status ParticipantOp::Apply(CommitMsg** commit_msg) { TRACE_EVENT0("op", "ParticipantOp::Apply"); TRACE("APPLY: Starting."); - CHECK_OK(state_->PerformOp(state()->op_id())); - *commit_msg = google::protobuf::Arena::CreateMessage<CommitMsg>(state_->pb_arena()); - (*commit_msg)->set_op_type(OperationType::PARTICIPANT_OP); + state_->tablet_replica()->tablet()->StartApplying(state_.get()); + CHECK_OK(state_->PerformOp(state()->op_id(), commit_msg)); + // If this is a BEGIN_COMMIT op, pass the commit's MVCC op to the + // transaction, keeping it open until the commit is finalized or aborted. + if (state_->request()->op().type() == ParticipantOpPB::BEGIN_COMMIT) { + state_->ReleaseMvccOpToTxn(); + } TRACE("APPLY: Finished."); return Status::OK(); } @@ -182,6 +227,7 @@ void ParticipantOp::Finish(OpResult result) { TRACE("FINISH: Op aborted"); return; } + DCHECK_EQ(result, Op::APPLIED); // TODO(awong): when implementing transaction cleanup on participants, clean // up finalized and aborted transactions here. diff --git a/src/kudu/tablet/ops/participant_op.h b/src/kudu/tablet/ops/participant_op.h index 11e9e04..f4aa299 100644 --- a/src/kudu/tablet/ops/participant_op.h +++ b/src/kudu/tablet/ops/participant_op.h @@ -22,8 +22,12 @@ #include <string> #include <utility> +#include <glog/logging.h> + +#include "kudu/common/timestamp.h" #include "kudu/consensus/consensus.pb.h" #include "kudu/gutil/ref_counted.h" +#include "kudu/tablet/mvcc.h" #include "kudu/tablet/ops/op.h" #include "kudu/tablet/txn_participant.h" #include "kudu/tserver/tserver_admin.pb.h" @@ -68,7 +72,9 @@ class ParticipantOpState : public OpState { // // Anchors the given 'op_id' in the WAL, ensuring that subsequent bootstraps // of the tablet's WAL will leave the transaction in the appropriate state. - Status PerformOp(const consensus::OpId& op_id); + // Updates 'commit_msg' to include a commit message appropriate for this op + // using this op's arena. + Status PerformOp(const consensus::OpId& op_id, consensus::CommitMsg** commit_msg); // Releases the transaction and its lock. void ReleaseTxn(); @@ -77,6 +83,22 @@ class ParticipantOpState : public OpState { int64_t txn_id() { return request_->op().txn_id(); } + + Txn* txn() { + return txn_.get(); + } + + // Takes ownership of the scoped op, using it to track the commit op. + void SetMvccOp(std::unique_ptr<ScopedOp> mvcc_op); + + // Releases the commit op to the Txn; it is expected that the Txn will + // finish the MVCC op once FINALIZE_COMMIT or ABORT_TXN are called. + void ReleaseMvccOpToTxn(); + + Timestamp commit_timestamp() const { + CHECK(request()->op().has_finalized_commit_timestamp()); + return Timestamp(request()->op().finalized_commit_timestamp()); + } private: friend class ParticipantOp; @@ -88,6 +110,12 @@ class ParticipantOpState : public OpState { // TabletReplica if, for instance, we're bootstrapping a new Tablet. TxnParticipant* txn_participant_; + // MVCC op used to track the commit process of a transaction. This should be + // created only when starting a BEGIN_COMMIT op, and it should be released to + // the underlying Txn to track the commit's progress to its eventual + // FINALIZE_COMMIT or ABORT_TXN call. + std::unique_ptr<ScopedOp> begin_commit_mvcc_op_; + const tserver::ParticipantRequestPB* request_; tserver::ParticipantResponsePB* response_; @@ -96,6 +124,9 @@ class ParticipantOpState : public OpState { }; // Op that executes a transaction state change in the transaction participant. +// This op is used to orchestrate the transaction commit in such a way that it +// guarantees repeatable reads. See the block comment in time_manager.h for +// details on how this dance is performed. class ParticipantOp : public Op { public: ParticipantOp(std::unique_ptr<ParticipantOpState> state, diff --git a/src/kudu/tablet/ops/write_op.cc b/src/kudu/tablet/ops/write_op.cc index 87c9dc5..ec71baf 100644 --- a/src/kudu/tablet/ops/write_op.cc +++ b/src/kudu/tablet/ops/write_op.cc @@ -323,7 +323,7 @@ WriteOpState::WriteOpState(TabletReplica* tablet_replica, } } -void WriteOpState::SetMvccTx(unique_ptr<ScopedOp> mvcc_op) { +void WriteOpState::SetMvccOp(unique_ptr<ScopedOp> mvcc_op) { DCHECK(!mvcc_op_) << "Mvcc op already started/set."; mvcc_op_ = std::move(mvcc_op); } diff --git a/src/kudu/tablet/ops/write_op.h b/src/kudu/tablet/ops/write_op.h index 3c69eac..771ade7 100644 --- a/src/kudu/tablet/ops/write_op.h +++ b/src/kudu/tablet/ops/write_op.h @@ -149,7 +149,7 @@ class WriteOpState : public OpState { // This must be called exactly once, after the timestamp was acquired. // This also copies the timestamp from the MVCC op into the // WriteOpState object. - void SetMvccTx(std::unique_ptr<ScopedOp> mvcc_op); + void SetMvccOp(std::unique_ptr<ScopedOp> mvcc_op); // Set the Tablet components that this op will write into. // Called exactly once at the beginning of Apply, before applying its diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc index 811605e..ab9b347 100644 --- a/src/kudu/tablet/tablet.cc +++ b/src/kudu/tablet/tablet.cc @@ -64,6 +64,7 @@ #include "kudu/tablet/diskrowset.h" #include "kudu/tablet/memrowset.h" #include "kudu/tablet/ops/alter_schema_op.h" +#include "kudu/tablet/ops/participant_op.h" #include "kudu/tablet/ops/write_op.h" #include "kudu/tablet/row_op.h" #include "kudu/tablet/rowset_info.h" @@ -74,6 +75,7 @@ #include "kudu/tablet/tablet_metrics.h" #include "kudu/tablet/tablet_mm_ops.h" #include "kudu/tserver/tserver.pb.h" +#include "kudu/tserver/tserver_admin.pb.h" #include "kudu/util/bitmap.h" #include "kudu/util/bloom_filter.h" #include "kudu/util/debug/trace_event.h" @@ -577,7 +579,15 @@ void Tablet::StartOp(WriteOpState* op_state) { unique_ptr<ScopedOp> mvcc_op; DCHECK(op_state->has_timestamp()); mvcc_op.reset(new ScopedOp(&mvcc_, op_state->timestamp())); - op_state->SetMvccTx(std::move(mvcc_op)); + op_state->SetMvccOp(std::move(mvcc_op)); +} + +void Tablet::StartOp(ParticipantOpState* op_state) { + if (op_state->request()->op().type() == tserver::ParticipantOpPB::BEGIN_COMMIT) { + DCHECK(op_state->has_timestamp()); + unique_ptr<ScopedOp> mvcc_op(new ScopedOp(&mvcc_, op_state->timestamp())); + op_state->SetMvccOp(std::move(mvcc_op)); + } } bool Tablet::ValidateOpOrMarkFailed(RowOp* op) { @@ -842,6 +852,17 @@ void Tablet::StartApplying(WriteOpState* op_state) { op_state->set_tablet_components(components_); } +void Tablet::StartApplying(ParticipantOpState* op_state) { + const auto& op_type = op_state->request()->op().type(); + if (op_type == tserver::ParticipantOpPB::FINALIZE_COMMIT) { + // NOTE: we may not have an MVCC op if we are bootstrapping and did not + // replay a BEGIN_COMMIT op. + if (op_state->txn()->commit_op()) { + op_state->txn()->commit_op()->StartApplying(); + } + } +} + Status Tablet::BulkCheckPresence(const IOContext* io_context, WriteOpState* op_state) { int num_ops = op_state->row_ops().size(); diff --git a/src/kudu/tablet/tablet.h b/src/kudu/tablet/tablet.h index 2b642c2..191a917 100644 --- a/src/kudu/tablet/tablet.h +++ b/src/kudu/tablet/tablet.h @@ -80,6 +80,7 @@ class AlterSchemaOpState; class CompactionPolicy; class HistoryGcOpts; class MemRowSet; +class ParticipantOpState; class RowSetTree; class RowSetsInCompaction; class WriteOpState; @@ -151,6 +152,7 @@ class Tablet { // TODO(todd): rename this to something like "FinishPrepare" or "StartApply", since // it's not the first thing in an op! void StartOp(WriteOpState* op_state); + void StartOp(ParticipantOpState* op_state); // Like the above but actually assigns the timestamp. Only used for tests that // don't boot a tablet server. @@ -158,6 +160,7 @@ class Tablet { // Signal that the given op is about to Apply. void StartApplying(WriteOpState* op_state); + void StartApplying(ParticipantOpState* op_state); // Apply all of the row operations associated with this op. Status ApplyRowOperations(WriteOpState* op_state) WARN_UNUSED_RESULT; @@ -727,10 +730,6 @@ class Tablet { scoped_refptr<log::LogAnchorRegistry> log_anchor_registry_; TabletMemTrackers mem_trackers_; - // Maintains the set of in-flight transactions, and any WAL anchors - // associated with them. - TxnParticipant txn_participant_; - scoped_refptr<MetricEntity> metric_entity_; std::unique_ptr<TabletMetrics> metrics_; @@ -742,6 +741,13 @@ class Tablet { clock::Clock* clock_; MvccManager mvcc_; + + // Maintains the set of in-flight transactions, and any WAL anchors + // associated with them. + // NOTE: the participant may retain MVCC ops, so define it after the + // MvccManager, to ensure those ops get destructed before the MvccManager. + TxnParticipant txn_participant_; + LockManager lock_manager_; std::unique_ptr<CompactionPolicy> compaction_policy_; diff --git a/src/kudu/tablet/tablet_bootstrap.cc b/src/kudu/tablet/tablet_bootstrap.cc index 298c962..89dd6a7 100644 --- a/src/kudu/tablet/tablet_bootstrap.cc +++ b/src/kudu/tablet/tablet_bootstrap.cc @@ -1552,9 +1552,15 @@ Status TabletBootstrap::PlayTxnParticipantOpRequest(const IOContext* /*io_contex SCOPED_CLEANUP({ op_state.ReleaseTxn(); }); + LogEntryPB& commit_entry = *google::protobuf::Arena::CreateMessage<LogEntryPB>( + op_state.pb_arena()); + commit_entry.set_type(log::COMMIT); + CommitMsg* new_commit = commit_entry.mutable_commit(); + new_commit->CopyFrom(commit_msg); // NOTE: don't bother validating the current state of the op. Presumably that // happened the first time this op was written. - RETURN_NOT_OK(op_state.PerformOp(replicate_msg->id())); + tablet_->StartApplying(&op_state); + RETURN_NOT_OK(op_state.PerformOp(replicate_msg->id(), &new_commit)); return AppendCommitMsg(commit_msg); } diff --git a/src/kudu/tablet/txn_participant.cc b/src/kudu/tablet/txn_participant.cc index 8ade9ec..7424c7b 100644 --- a/src/kudu/tablet/txn_participant.cc +++ b/src/kudu/tablet/txn_participant.cc @@ -19,6 +19,7 @@ #include <algorithm> #include <cstdint> +#include <ostream> #include <unordered_map> #include <utility> #include <vector> @@ -34,6 +35,17 @@ using strings::Substitute; namespace kudu { namespace tablet { +Txn::~Txn() { + CHECK_OK(log_anchor_registry_->UnregisterIfAnchored(&log_anchor_)); + // As a sanity check, make sure our state makes sense: if we have an MVCC op + // for commit, we should have started to commit. + if (commit_op_) { + DCHECK(state_ == kCommitInProgress || + state_ == kCommitted || + state_ == kAborted) << StateToString(state_); + } +} + void Txn::AcquireWriteLock(std::unique_lock<rw_semaphore>* txn_lock) { std::unique_lock<rw_semaphore> l(state_lock_); *txn_lock = std::move(l); diff --git a/src/kudu/tablet/txn_participant.h b/src/kudu/tablet/txn_participant.h index 098d32b..eb7b741 100644 --- a/src/kudu/tablet/txn_participant.h +++ b/src/kudu/tablet/txn_participant.h @@ -18,8 +18,10 @@ #include <atomic> #include <cstdint> +#include <memory> #include <mutex> #include <unordered_map> +#include <utility> #include <vector> #include <glog/logging.h> @@ -30,6 +32,7 @@ #include "kudu/gutil/port.h" #include "kudu/gutil/ref_counted.h" #include "kudu/gutil/strings/substitute.h" +#include "kudu/tablet/mvcc.h" #include "kudu/util/locks.h" #include "kudu/util/rw_semaphore.h" #include "kudu/util/status.h" @@ -93,10 +96,7 @@ class Txn : public RefCountedThreadSafe<Txn> { log_anchor_registry_(log_anchor_registry), state_(kInitializing), commit_timestamp_(-1) {} - - ~Txn() { - CHECK_OK(log_anchor_registry_->UnregisterIfAnchored(&log_anchor_)); - } + ~Txn(); // Takes the state lock in write mode and returns it. As transaction state is // meant to be driven via an op driver, lock acquisition is expected to be @@ -179,6 +179,15 @@ class Txn : public RefCountedThreadSafe<Txn> { return commit_timestamp_; } + void SetCommitOp(std::unique_ptr<ScopedOp> commit_op) { + DCHECK(nullptr == commit_op_.get()); + commit_op_ = std::move(commit_op); + } + + ScopedOp* commit_op() { + return commit_op_.get(); + } + private: friend class RefCountedThreadSafe<Txn>; @@ -215,6 +224,13 @@ class Txn : public RefCountedThreadSafe<Txn> { // transaction should be applied, and -1 otherwise. std::atomic<int64_t> commit_timestamp_; + // Scoped op with a lifecycle that spans between the BEGIN_COMMIT op and + // corresponding FINALIZE_COMMIT or ABORT_TXN op, used to ensure that + // scanners wait until we finish the transaction if we've begun committing, + // before proceeding with a scan. This ensures scans on this participant are + // repeatable. + std::unique_ptr<ScopedOp> commit_op_; + DISALLOW_COPY_AND_ASSIGN(Txn); };
