This is an automated email from the ASF dual-hosted git repository. awong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 90aa4fa7d1527f376803440a4642668e3d798748 Author: Andrew Wong <[email protected]> AuthorDate: Tue Oct 6 00:06:49 2020 -0700 KUDU-2612 p11: persist txn metadata in superblock Currently, transaction metadata is entirely stored in the WALs; once a transaction is begun, it anchors WALs indefinitely. This is untenable, as any tablet that participates in a transaction would never be able to GC its WALs. This patch addresses this by storing transaction metadata in the tablet superblock for the following participant ops: - BEGIN_TXN: an empty record is added to the superblock for the given transaction ID. In the future, this can be used to store the owner of the transaction. - FINALIZE_COMMIT: the commit timestamp is added to the superblock for the given transaction ID. The commit timestamp of a transaction will be used when scanning over mutations applied to a tablet instead of the apply timestamp. - ABORT_TXN: an abort bit is added to the superblock for the given transaction ID. Upon applying each of these ops, the op's OpId is anchored in the WALs until the tablet metadata is successfully flushed, ensuring that if we don't flush the tablet metadata before restarting, we can rebuild any ops that whose mutations had not been persisted by replaying the WALs. What about BEGIN_COMMIT ops? While these ops will still need to be anchored, BEGIN_COMMIT ops don't need to persist any long-term information. As such, their anchoring is slightly different -- rather than mutating the tablet metadata and unanchoring on a metadata flush, BEGIN_COMMIT ops will update the in-memory state for the in-flight Txn, and unanchor once the corresponding FINALIZE_COMMIT or ABORT_TXN begins applying. On bootstrap, the following rules are applied: - If we've persisted terminal information (i.e. abort or commit) about a transaction ID, we can skip replaying any participant op for that transaction. - If we otherwise have a persisted record for the transaction ID, we should start an open transaction and replay all participant ops for that transaction. - If we have no record of a transaction persisted, we must replay all participant ops for the given transaction. While storing things in the superblock is not ideal, there are further improvements that can be made to reduce its size, e.g. after finalizing a transaction, the transactions mutations may be merged in with the rest of the tablet; once all mutations of a given transaction have been merged, the transaction metadata may be removed from the metadata. Change-Id: I2f32808aef10484f4e0ad3942bb005f61fbdb34a Reviewed-on: http://gerrit.cloudera.org:8080/16492 Tested-by: Andrew Wong <[email protected]> Reviewed-by: Alexey Serbin <[email protected]> --- .../integration-tests/txn_participant-itest.cc | 81 +++++++- src/kudu/tablet/metadata.proto | 20 ++ src/kudu/tablet/ops/participant_op.cc | 40 ++-- src/kudu/tablet/ops/participant_op.h | 6 +- src/kudu/tablet/tablet.cc | 37 +++- src/kudu/tablet/tablet.h | 25 ++- src/kudu/tablet/tablet_bootstrap.cc | 56 ++++-- src/kudu/tablet/tablet_metadata-test.cc | 75 ++++++++ src/kudu/tablet/tablet_metadata.cc | 94 +++++++++ src/kudu/tablet/tablet_metadata.h | 83 +++++++- src/kudu/tablet/txn_participant-test.cc | 211 +++++++++++++++------ src/kudu/tablet/txn_participant.cc | 56 +++++- src/kudu/tablet/txn_participant.h | 64 +++++-- 13 files changed, 727 insertions(+), 121 deletions(-) diff --git a/src/kudu/integration-tests/txn_participant-itest.cc b/src/kudu/integration-tests/txn_participant-itest.cc index 90addc3..1d41573 100644 --- a/src/kudu/integration-tests/txn_participant-itest.cc +++ b/src/kudu/integration-tests/txn_participant-itest.cc @@ -21,6 +21,7 @@ #include <memory> #include <string> #include <thread> +#include <unordered_map> #include <utility> #include <vector> @@ -34,12 +35,15 @@ #include "kudu/consensus/raft_consensus.h" #include "kudu/consensus/time_manager.h" #include "kudu/gutil/ref_counted.h" +#include "kudu/gutil/stl_util.h" #include "kudu/gutil/strings/join.h" #include "kudu/gutil/strings/substitute.h" +#include "kudu/integration-tests/cluster_itest_util.h" #include "kudu/integration-tests/test_workload.h" #include "kudu/mini-cluster/internal_mini_cluster.h" #include "kudu/tablet/mvcc.h" #include "kudu/tablet/tablet.h" +#include "kudu/tablet/tablet_metadata.h" #include "kudu/tablet/tablet_replica.h" #include "kudu/tablet/txn_participant-test-util.h" #include "kudu/tablet/txn_participant.h" @@ -48,6 +52,7 @@ #include "kudu/tserver/ts_tablet_manager.h" #include "kudu/tserver/tserver.pb.h" #include "kudu/tserver/tserver_admin.pb.h" +#include "kudu/util/metrics.h" #include "kudu/util/monotime.h" #include "kudu/util/status.h" #include "kudu/util/test_macros.h" @@ -56,9 +61,15 @@ DECLARE_bool(raft_enable_pre_election); DECLARE_double(leader_failure_max_missed_heartbeat_periods); DECLARE_int32(consensus_inject_latency_ms_in_notifications); +DECLARE_int32(flush_threshold_secs); +DECLARE_int32(flush_threshold_mb); DECLARE_int32(follower_unavailable_considered_failed_sec); +DECLARE_int32(log_segment_size_mb); +DECLARE_int32(maintenance_manager_polling_interval_ms); DECLARE_int32(raft_heartbeat_interval_ms); +METRIC_DECLARE_histogram(log_gc_duration); + using kudu::cluster::InternalMiniCluster; using kudu::cluster::InternalMiniClusterOptions; using kudu::tablet::kCommitSequence; @@ -72,6 +83,7 @@ using kudu::tserver::ParticipantResponsePB; using std::string; using std::thread; using std::unique_ptr; +using std::unordered_map; using std::vector; using strings::Substitute; @@ -116,8 +128,8 @@ Status RunOnReplica(TabletReplica* replica, int64_t txn_id, return RunOnReplicas({ replica }, txn_id, type, commit_timestamp)[0]; } -// Emulate a snapshot scan by waiting for the safe time to be advanced past 't' -// and for all ops before 't' to complete. +// Emulates a snapshot scan by waiting for the safe time to be advanced past +// 't' and for all ops before 't' to complete. Status WaitForCompletedOps(TabletReplica* replica, Timestamp t, MonoDelta timeout) { const auto deadline = MonoTime::Now() + timeout; RETURN_NOT_OK_PREPEND( @@ -132,6 +144,9 @@ Status WaitForCompletedOps(TabletReplica* replica, Timestamp t, MonoDelta timeou class TxnParticipantITest : public KuduTest { public: + ~TxnParticipantITest() { + STLDeleteValues(&ts_map_); + } void SetUp() override { KuduTest::SetUp(); InternalMiniClusterOptions opts; @@ -139,16 +154,20 @@ class TxnParticipantITest : public KuduTest { cluster_.reset(new InternalMiniCluster(env_, std::move(opts))); ASSERT_OK(cluster_->Start()); NO_FATALS(SetUpTable()); + ASSERT_OK(CreateTabletServerMap(cluster_->master_proxy(), cluster_->messenger(), &ts_map_)); } // Creates a single-tablet replicated table. - void SetUpTable() { + void SetUpTable(string* table_name = nullptr) { TestWorkload w(cluster_.get()); w.Setup(); w.Start(); while (w.rows_inserted() < 1) { SleepFor(MonoDelta::FromMilliseconds(10)); } + if (table_name) { + *table_name = w.table_name(); + } w.StopAndJoin(); } @@ -172,6 +191,7 @@ class TxnParticipantITest : public KuduTest { protected: unique_ptr<InternalMiniCluster> cluster_; + unordered_map<string, TServerDetails*> ts_map_; }; // Test that participant ops only applied to followers via replication from @@ -225,10 +245,27 @@ TEST_F(TxnParticipantITest, TestReplicateParticipantOps) { } } +class ParticipantCopyITest : public TxnParticipantITest, + public ::testing::WithParamInterface<bool> { + public: + void SetUp() override { + // To test the behavior with WAL GC, encourage flushing so our WALs don't + // stay anchored for too long. + FLAGS_flush_threshold_mb = 1; + FLAGS_flush_threshold_secs = 1; + FLAGS_maintenance_manager_polling_interval_ms = 10; + // Additionally, make the WAL segments smaller to encourage more frequent + // roll-over onto WAL segments. + FLAGS_log_segment_size_mb = 1; + NO_FATALS(TxnParticipantITest::SetUp()); + } +}; + // Test that participant ops are copied when performing a tablet copy, // resulting in identical transaction states on the new copy. -TEST_F(TxnParticipantITest, TestCopyParticipantOps) { - NO_FATALS(SetUpTable()); +TEST_P(ParticipantCopyITest, TestCopyParticipantOps) { + string table_name; + NO_FATALS(SetUpTable(&table_name)); constexpr const int kNumTxns = 10; constexpr const int kLeaderIdx = 0; @@ -258,6 +295,25 @@ TEST_F(TxnParticipantITest, TestCopyParticipantOps) { }); } + // If we're meant to GC WALs, insert rows to the tablet until a WAL GC op + // happens. + if (GetParam()) { + for (int i = 0; i < kNumTxns; i++) { + ASSERT_TRUE(leader_replica->tablet_metadata()->HasTxnMetadata(i)); + } + TestWorkload w(cluster_.get()); + w.set_table_name(table_name); + w.Setup(); + w.Start(); + auto gc_ops = leader_replica->tablet()->GetMetricEntity()->FindOrCreateHistogram( + &METRIC_log_gc_duration); + const auto initial_gcs = gc_ops->TotalCount(); + while (gc_ops->TotalCount() == initial_gcs) { + SleepFor(MonoDelta::FromMilliseconds(10)); + } + w.StopAndJoin(); + } + // Set ourselves up to make a copy. cluster_->mini_tablet_server(kDeadServerIdx)->Shutdown(); ASSERT_OK(cluster_->AddTabletServer()); @@ -272,20 +328,28 @@ TEST_F(TxnParticipantITest, TestCopyParticipantOps) { ASSERT_TRUE(new_ts->server()->tablet_manager()->LookupTablet(tablets[0], &r)); ASSERT_OK(r->WaitUntilConsensusRunning(kTimeout)); ASSERT_EQ(expected_txns, r->tablet()->txn_participant()->GetTxnsForTests()); + for (int i = 0; i < kNumTxns; i++) { + ASSERT_TRUE(r->tablet_metadata()->HasTxnMetadata(i)); + } }); } +INSTANTIATE_TEST_CASE_P(ShouldGCWals, ParticipantCopyITest, ::testing::Values(true, false)); // Test to ensure that the mechanisms built to allow snapshot scans to wait for // safe time advancement will actually wait for transactions to commit. TEST_F(TxnParticipantITest, TestWaitOnFinalizeCommit) { const int kLeaderIdx = 0; vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx); + auto* leader_replica = replicas[kLeaderIdx]; auto* follower_replica = replicas[kLeaderIdx + 1]; auto* clock = leader_replica->clock(); const int64_t kTxnId = 1; ASSERT_OK(leader_replica->consensus()->WaitUntilLeaderForTests(MonoDelta::FromSeconds(10))); ASSERT_OK(RunOnReplica(leader_replica, kTxnId, ParticipantOpPB::BEGIN_TXN)); + const MonoDelta kAgreeTimeout = MonoDelta::FromSeconds(10); + const auto& tablet_id = leader_replica->tablet()->tablet_id(); + ASSERT_OK(WaitForServersToAgree(kAgreeTimeout, ts_map_, tablet_id, /*minimum_index*/1)); // We should consistently be able to scan a bit in the future just by // waiting on leaders. Leader safe times move forward with its clock, while @@ -301,6 +365,7 @@ TEST_F(TxnParticipantITest, TestWaitOnFinalizeCommit) { // reads, scans asking for a timestamp higher than the BEGIN_COMMIT op's // timestamp should wait. ASSERT_OK(RunOnReplica(leader_replica, kTxnId, ParticipantOpPB::BEGIN_COMMIT)); + ASSERT_OK(WaitForServersToAgree(kAgreeTimeout, ts_map_, tablet_id, /*minimum_index*/1)); const auto before_finalize_ts = clock->Now(); Status s; s = WaitForCompletedOps(leader_replica, before_finalize_ts, kShortTimeout); @@ -325,6 +390,7 @@ TEST_F(TxnParticipantITest, TestWaitOnFinalizeCommit) { const auto commit_ts_val = clock->Now().value() + 10000; const auto commit_ts = Timestamp(commit_ts_val); ASSERT_OK(RunOnReplica(leader_replica, kTxnId, ParticipantOpPB::FINALIZE_COMMIT, commit_ts_val)); + ASSERT_OK(WaitForServersToAgree(kAgreeTimeout, ts_map_, tablet_id, /*minimum_index*/1)); ASSERT_OK(WaitForCompletedOps(leader_replica, before_finalize_ts, kShortTimeout)); ASSERT_OK(WaitForCompletedOps(follower_replica, before_finalize_ts, kShortTimeout)); ASSERT_OK(WaitForCompletedOps(leader_replica, commit_ts, kShortTimeout)); @@ -342,6 +408,9 @@ TEST_F(TxnParticipantITest, TestWaitOnAbortCommit) { const int64_t kTxnId = 1; ASSERT_OK(leader_replica->consensus()->WaitUntilLeaderForTests(MonoDelta::FromSeconds(10))); ASSERT_OK(RunOnReplica(leader_replica, kTxnId, ParticipantOpPB::BEGIN_TXN)); + const MonoDelta kAgreeTimeout = MonoDelta::FromSeconds(10); + const auto& tablet_id = leader_replica->tablet()->tablet_id(); + ASSERT_OK(WaitForServersToAgree(kAgreeTimeout, ts_map_, tablet_id, /*minimum_index*/1)); const auto kShortTimeout = MonoDelta::FromMilliseconds(10); const auto before_commit_ts = clock->Now(); @@ -349,6 +418,7 @@ TEST_F(TxnParticipantITest, TestWaitOnAbortCommit) { // reads, scans asking for a timestamp higher than the BEGIN_COMMIT op's // timestamp should wait. ASSERT_OK(RunOnReplica(leader_replica, kTxnId, ParticipantOpPB::BEGIN_COMMIT)); + ASSERT_OK(WaitForServersToAgree(kAgreeTimeout, ts_map_, tablet_id, /*minimum_index*/1)); const auto before_abort_ts = clock->Now(); Status s = WaitForCompletedOps(leader_replica, before_abort_ts, kShortTimeout); ASSERT_TRUE(s.IsTimedOut()) << s.ToString(); @@ -364,6 +434,7 @@ TEST_F(TxnParticipantITest, TestWaitOnAbortCommit) { // On followers, this will happen via heartbeats, so we need to wait to // heartbeat (wait 2x the heartbeat time to avoid flakiness). ASSERT_OK(RunOnReplica(leader_replica, kTxnId, ParticipantOpPB::ABORT_TXN)); + ASSERT_OK(WaitForServersToAgree(kAgreeTimeout, ts_map_, tablet_id, /*minimum_index*/1)); ASSERT_OK(WaitForCompletedOps(leader_replica, before_abort_ts, kShortTimeout)); SleepFor(MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms * 2)); ASSERT_OK(WaitForCompletedOps(follower_replica, before_abort_ts, kShortTimeout)); diff --git a/src/kudu/tablet/metadata.proto b/src/kudu/tablet/metadata.proto index 9c58636..00e28ff 100644 --- a/src/kudu/tablet/metadata.proto +++ b/src/kudu/tablet/metadata.proto @@ -77,6 +77,19 @@ enum TabletDataState { TABLET_DATA_TOMBSTONED = 3; } +// Metadata that indicates the state of a transaction. +message TxnMetadataPB { + // Whether the transaction was aborted. If true, 'commit_timestamp' must not + // be set. + optional bool aborted = 1; + + // The commit timestamp of the transaction. If set, 'aborted' must not be + // set. + optional int64 commit_timestamp = 2; + + // TODO(awong): add an owner field to this for uncommitted transactions. +} + // The super-block keeps track of the tablet data blocks. // A tablet contains one or more RowSets, which contain // a set of blocks (one for each column), a set of delta blocks @@ -150,6 +163,13 @@ message TabletSuperBlockPB { // The dimension label for tablet. Used by the master to determine load when // creating new tablet replicas based on dimension. optional string dimension_label = 18; + + // Map from txn ID to metadata associated with the transaction. This is + // updated on each metadata flush to reflect the current in-memory state of + // transactions. In between an in-memory state update and a flush, + // participant ops should be anchored to replay the updates upon restarting. + // TODO(awong): consider storing these separately from the superblock. + map<int64, TxnMetadataPB> txn_metadata = 20; } // Tablet states represent stages of a TabletReplica's object lifecycle and are diff --git a/src/kudu/tablet/ops/participant_op.cc b/src/kudu/tablet/ops/participant_op.cc index 4251bd2..2241ad3 100644 --- a/src/kudu/tablet/ops/participant_op.cc +++ b/src/kudu/tablet/ops/participant_op.cc @@ -161,7 +161,7 @@ Status ParticipantOp::Start() { return Status::OK(); } -Status ParticipantOpState::PerformOp(const consensus::OpId& op_id, CommitMsg** commit_msg) { +Status ParticipantOpState::PerformOp(const consensus::OpId& op_id, Tablet* tablet) { const auto& op = request()->op(); const auto& op_type = request()->op().type(); Status s; @@ -170,26 +170,29 @@ Status ParticipantOpState::PerformOp(const consensus::OpId& op_id, CommitMsg** c // metadata. When we begin validating write ops before committing, we'll // need to populate the response with errors. case ParticipantOpPB::BEGIN_TXN: { - txn_->BeginTransaction(op_id); + tablet->BeginTransaction(txn_.get(), op_id); break; } case ParticipantOpPB::BEGIN_COMMIT: { - // TODO(awong): Wait for all ops below this timestamp to complete. txn_->BeginCommit(op_id); + ReleaseMvccOpToTxn(); break; } case ParticipantOpPB::FINALIZE_COMMIT: { - txn_->FinalizeCommit(op_id, op.finalized_commit_timestamp()); - // NOTE: we may not have a commit op if we are bootstrapping. - // TODO(awong): consider not replaying the FINALIZE_COMMIT unless the - // BEGIN_COMMIT also needs to be replayed. + DCHECK(op.has_finalized_commit_timestamp()); + const auto& commit_ts = op.finalized_commit_timestamp(); + tablet->CommitTransaction(txn_.get(), Timestamp(commit_ts), op_id); + // NOTE: we may not have a commit op if we are bootstrapping and we GCed + // the BEGIN_COMMIT op before flushing the finalized commit timestamp. if (txn_->commit_op()) { txn_->commit_op()->FinishApplying(); } break; } case ParticipantOpPB::ABORT_TXN: { - txn_->AbortTransaction(op_id); + tablet->AbortTransaction(txn_.get(), op_id); + // NOTE: we may not have a commit op if we are aborting before beginning + // to commit. if (txn_->commit_op()) { txn_->commit_op()->Abort(); } @@ -199,8 +202,6 @@ Status ParticipantOpState::PerformOp(const consensus::OpId& op_id, CommitMsg** c return Status::InvalidArgument("unknown op type"); } } - *commit_msg = google::protobuf::Arena::CreateMessage<CommitMsg>(pb_arena()); - (*commit_msg)->set_op_type(OperationType::PARTICIPANT_OP); return Status::OK(); } @@ -208,12 +209,9 @@ Status ParticipantOp::Apply(CommitMsg** commit_msg) { TRACE_EVENT0("op", "ParticipantOp::Apply"); TRACE("APPLY: Starting."); state_->tablet_replica()->tablet()->StartApplying(state_.get()); - CHECK_OK(state_->PerformOp(state()->op_id(), commit_msg)); - // If this is a BEGIN_COMMIT op, pass the commit's MVCC op to the - // transaction, keeping it open until the commit is finalized or aborted. - if (state_->request()->op().type() == ParticipantOpPB::BEGIN_COMMIT) { - state_->ReleaseMvccOpToTxn(); - } + CHECK_OK(state_->PerformOp(state()->op_id(), state_->tablet_replica()->tablet())); + *commit_msg = google::protobuf::Arena::CreateMessage<CommitMsg>(state_->pb_arena()); + (*commit_msg)->set_op_type(OperationType::PARTICIPANT_OP); TRACE("APPLY: Finished."); return Status::OK(); } @@ -222,15 +220,19 @@ void ParticipantOp::Finish(OpResult result) { auto txn_id = state_->request()->op().txn_id(); state_->ReleaseTxn(); TxnParticipant* txn_participant = state_->txn_participant_; + + // If the transaction is complete, get rid of the in-flight Txn. + txn_participant->ClearIfComplete(txn_id); + if (PREDICT_FALSE(result == Op::ABORTED)) { + // NOTE: The only way we end up with an init failure is if we ran a + // BEGIN_TXN op but aborted mid-way, leaving the Txn in the kInitialized + // state and no further ops attempting to drive the state change to kOpen. txn_participant->ClearIfInitFailed(txn_id); TRACE("FINISH: Op aborted"); return; } - DCHECK_EQ(result, Op::APPLIED); - // TODO(awong): when implementing transaction cleanup on participants, clean - // up finalized and aborted transactions here. TRACE("FINISH: Op applied"); } diff --git a/src/kudu/tablet/ops/participant_op.h b/src/kudu/tablet/ops/participant_op.h index f4aa299..7c8bbd1 100644 --- a/src/kudu/tablet/ops/participant_op.h +++ b/src/kudu/tablet/ops/participant_op.h @@ -41,6 +41,7 @@ class OpId; } // namespace consensus namespace tablet { +class Tablet; class TabletReplica; // An OpState for an update to transaction participant state. @@ -72,9 +73,8 @@ class ParticipantOpState : public OpState { // // Anchors the given 'op_id' in the WAL, ensuring that subsequent bootstraps // of the tablet's WAL will leave the transaction in the appropriate state. - // Updates 'commit_msg' to include a commit message appropriate for this op - // using this op's arena. - Status PerformOp(const consensus::OpId& op_id, consensus::CommitMsg** commit_msg); + // Uses 'tablet' for this anchoring, and to update metadata. + Status PerformOp(const consensus::OpId& op_id, Tablet* tablet); // Releases the transaction and its lock. void ReleaseTxn(); diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc index ab9b347..0bbda9b 100644 --- a/src/kudu/tablet/tablet.cc +++ b/src/kudu/tablet/tablet.cc @@ -209,8 +209,10 @@ METRIC_DEFINE_gauge_uint64(tablet, last_write_elapsed_time, "Seconds Since Last using kudu::MaintenanceManager; using kudu::clock::HybridClock; +using kudu::consensus::OpId; using kudu::fs::IOContext; using kudu::log::LogAnchorRegistry; +using kudu::log::MinLogIndexAnchorer; using std::endl; using std::make_shared; using std::ostream; @@ -256,6 +258,7 @@ Tablet::Tablet(scoped_refptr<TabletMetadata> metadata, mem_trackers_(tablet_id(), std::move(parent_mem_tracker)), next_mrs_id_(0), clock_(clock), + txn_participant_(metadata_), rowsets_flush_sem_(1), state_(kInitialized), last_write_time_(MonoTime::Now()), @@ -315,7 +318,7 @@ Tablet::~Tablet() { CHECK_EQ(expected_state, _local_state); \ } while (0) -Status Tablet::Open() { +Status Tablet::Open(const unordered_set<int64_t>& in_flight_txn_ids) { TRACE_EVENT0("tablet", "Tablet::Open"); RETURN_IF_STOPPED_OR_CHECK_STATE(kInitialized); @@ -325,6 +328,14 @@ Status Tablet::Open() { RowSetVector rowsets_opened; + // If we persisted the state of any transaction IDs before shutting down, + // initialize those that were in-flight here as kOpen. If there were any ops + // applied that didn't get persisted to the tablet metadata, the bootstrap + // process will replay those ops. + for (const auto& txn_id : in_flight_txn_ids) { + txn_participant_.CreateOpenTransaction(txn_id, log_anchor_registry_.get()); + } + fs::IOContext io_context({ tablet_id() }); // open the tablet row-sets for (const shared_ptr<RowSetMetadata>& rowset_meta : metadata_->rowsets()) { @@ -1005,6 +1016,30 @@ Status Tablet::CheckHasNotBeenStopped(State* cur_state) const { return Status::OK(); } +void Tablet::BeginTransaction(Txn* txn, const OpId& op_id) { + unique_ptr<MinLogIndexAnchorer> anchor(new MinLogIndexAnchorer(log_anchor_registry_.get(), + Substitute("BEGIN_TXN-$0-$1", txn->txn_id(), txn))); + anchor->AnchorIfMinimum(op_id.index()); + metadata_->AddTxnMetadata(txn->txn_id(), std::move(anchor)); + txn->BeginTransaction(); +} + +void Tablet::CommitTransaction(Txn* txn, Timestamp commit_ts, const OpId& op_id) { + unique_ptr<MinLogIndexAnchorer> anchor(new MinLogIndexAnchorer(log_anchor_registry_.get(), + Substitute("FINALIZE_COMMIT-$0-$1", txn->txn_id(), txn))); + anchor->AnchorIfMinimum(op_id.index()); + metadata_->AddCommitTimestamp(txn->txn_id(), commit_ts, std::move(anchor)); + txn->FinalizeCommit(commit_ts.value()); +} + +void Tablet::AbortTransaction(Txn* txn, const OpId& op_id) { + unique_ptr<MinLogIndexAnchorer> anchor(new MinLogIndexAnchorer(log_anchor_registry_.get(), + Substitute("ABORT_TXN-$0-$1", txn->txn_id(), txn))); + anchor->AnchorIfMinimum(op_id.index()); + metadata_->AbortTransaction(txn->txn_id(), std::move(anchor)); + txn->AbortTransaction(); +} + Status Tablet::ApplyRowOperations(WriteOpState* op_state) { int num_ops = op_state->row_ops().size(); diff --git a/src/kudu/tablet/tablet.h b/src/kudu/tablet/tablet.h index 191a917..227ceea 100644 --- a/src/kudu/tablet/tablet.h +++ b/src/kudu/tablet/tablet.h @@ -25,6 +25,7 @@ #include <mutex> #include <ostream> #include <string> +#include <unordered_set> #include <vector> #include <glog/logging.h> @@ -54,6 +55,7 @@ #include "kudu/util/status.h" namespace kudu { + class AlterTableTest; class ConstContiguousRow; class EncodedKey; @@ -66,6 +68,10 @@ class Timestamp; struct IterWithBounds; struct IteratorStats; +namespace consensus { +class OpId; +} // namespace consensus + namespace clock { class Clock; } // namespace clock @@ -111,9 +117,9 @@ class Tablet { ~Tablet(); - // Open the tablet. + // Open the tablet, initializing transactions for 'in_flight_txn_ids'. // Upon completion, the tablet enters the kBootstrapping state. - Status Open(); + Status Open(const std::unordered_set<int64_t>& in_flight_txn_ids = std::unordered_set<int64_t>{}); // Mark that the tablet has finished bootstrapping. // This transitions from kBootstrapping to kOpen state. @@ -172,6 +178,21 @@ class Tablet { RowOp* row_op, ProbeStats* stats) WARN_UNUSED_RESULT; + // Begins the transaction, recording its presence in the tablet metadata. + // Upon calling this, 'op_id' will be anchored until the metadata is flushed, + // using 'txn' as the anchor owner. + void BeginTransaction(Txn* txn, const consensus::OpId& op_id); + + // Commits the transaction, recording its commit timestamp in the tablet metadata. + // Upon calling this, 'op_id' will be anchored until the metadata is flushed, + // using 'txn' as the anchor owner. + void CommitTransaction(Txn* txn, Timestamp commit_ts, const consensus::OpId& op_id); + + // Aborts the transaction, recording the abort in the tablet metadata. + // Upon calling this, 'op_id' will be anchored until the metadata is flushed, + // using 'txn' as the anchor owner. + void AbortTransaction(Txn* txn, const consensus::OpId& op_id); + // Create a new row iterator which yields the rows as of the current MVCC // state of this tablet. // The returned iterator is not initialized. diff --git a/src/kudu/tablet/tablet_bootstrap.cc b/src/kudu/tablet/tablet_bootstrap.cc index 89dd6a7..a6298cf 100644 --- a/src/kudu/tablet/tablet_bootstrap.cc +++ b/src/kudu/tablet/tablet_bootstrap.cc @@ -25,6 +25,7 @@ #include <ostream> #include <string> #include <unordered_map> +#include <unordered_set> #include <utility> #include <vector> @@ -130,6 +131,7 @@ using kudu::pb_util::SecureDebugString; using kudu::pb_util::SecureShortDebugString; using kudu::rpc::ResultTracker; using kudu::tserver::AlterSchemaRequestPB; +using kudu::tserver::ParticipantOpPB; using kudu::tserver::WriteRequestPB; using kudu::tserver::WriteResponsePB; using std::map; @@ -456,6 +458,11 @@ class TabletBootstrap { // Snapshot of which stores were flushed prior to restart. FlushedStoresSnapshot flushed_stores_; + // Transactions that were persisted as being in-flight (neither finalized or + // aborted) and completed prior to restart. + std::unordered_set<int64_t> in_flight_txn_ids_; + std::unordered_set<int64_t> terminal_txn_ids_; + DISALLOW_COPY_AND_ASSIGN(TabletBootstrap); }; @@ -604,6 +611,7 @@ Status TabletBootstrap::RunBootstrap(shared_ptr<Tablet>* rebuilt_tablet, } RETURN_NOT_OK(flushed_stores_.InitFrom(*tablet_meta_.get())); + tablet_meta_->GetTxnIds(&in_flight_txn_ids_, &terminal_txn_ids_); bool has_blocks; RETURN_NOT_OK(OpenTablet(&has_blocks)); @@ -663,7 +671,7 @@ Status TabletBootstrap::OpenTablet(bool* has_blocks) { // doing nothing for now except opening a tablet locally. { SCOPED_LOG_SLOW_EXECUTION_PREFIX(INFO, 100, LogPrefix(), "opening tablet"); - RETURN_NOT_OK(tablet->Open()); + RETURN_NOT_OK(tablet->Open(in_flight_txn_ids_)); } *has_blocks = tablet->num_rowsets() != 0; tablet_ = std::move(tablet); @@ -1545,22 +1553,42 @@ Status TabletBootstrap::PlayChangeConfigRequest(const IOContext* /*io_context*/, Status TabletBootstrap::PlayTxnParticipantOpRequest(const IOContext* /*io_context*/, ReplicateMsg* replicate_msg, const CommitMsg& commit_msg) { + // When replaying participant ops: + // - If we've persisted completed transactions (i.e. aborted or committed), + // we can skip replaying all participant ops for that transaction. + // - If we otherwise have persisted that a transaction exists, it was + // persisted on-disk as being in-flight before restarting, we should have + // opened a transaction before starting to replay, and we should replay all + // participant ops for that transaction. + // - If we have no record of a transaction persisted, it was not persisted + // on-disk as being in-flight, and we must replay all participant ops. ParticipantOpState op_state(tablet_replica_.get(), tablet_->txn_participant(), &replicate_msg->participant_request()); - op_state.AcquireTxnAndLock(); - SCOPED_CLEANUP({ - op_state.ReleaseTxn(); - }); - LogEntryPB& commit_entry = *google::protobuf::Arena::CreateMessage<LogEntryPB>( - op_state.pb_arena()); - commit_entry.set_type(log::COMMIT); - CommitMsg* new_commit = commit_entry.mutable_commit(); - new_commit->CopyFrom(commit_msg); - // NOTE: don't bother validating the current state of the op. Presumably that - // happened the first time this op was written. - tablet_->StartApplying(&op_state); - RETURN_NOT_OK(op_state.PerformOp(replicate_msg->id(), &new_commit)); + const auto& op_type = op_state.request()->op().type(); + if (ContainsKey(terminal_txn_ids_, op_state.txn_id())) { + return AppendCommitMsg(commit_msg); + } + bool persisted_in_flight = ContainsKey(in_flight_txn_ids_, op_state.txn_id()); + if ((persisted_in_flight && op_type != ParticipantOpPB::BEGIN_TXN) || + !persisted_in_flight) { + op_state.mutable_op_id()->CopyFrom(replicate_msg->id()); + op_state.set_timestamp(Timestamp(replicate_msg->timestamp())); + op_state.AcquireTxnAndLock(); + SCOPED_CLEANUP({ + op_state.ReleaseTxn(); + }); + // Start an MVCC op to track the commit if we're beginning to commit. + tablet_->StartOp(&op_state); + + // Start applying the commit's MVCC op if we're finalizing the commit. + // PerformOp() will take care of finishing applying the op. + tablet_->StartApplying(&op_state); + + // NOTE: don't bother validating the current state of the op. Presumably that + // happened the first time this op was written. + RETURN_NOT_OK(op_state.PerformOp(replicate_msg->id(), tablet_.get())); + } return AppendCommitMsg(commit_msg); } diff --git a/src/kudu/tablet/tablet_metadata-test.cc b/src/kudu/tablet/tablet_metadata-test.cc index 4dd511f..40d2f2e 100644 --- a/src/kudu/tablet/tablet_metadata-test.cc +++ b/src/kudu/tablet/tablet_metadata-test.cc @@ -22,6 +22,8 @@ #include <memory> #include <ostream> #include <string> +#include <unordered_map> +#include <unordered_set> #include <boost/optional/optional.hpp> #include <gflags/gflags.h> @@ -31,8 +33,11 @@ #include "kudu/common/common.pb.h" #include "kudu/common/partial_row.h" #include "kudu/common/schema.h" +#include "kudu/common/timestamp.h" #include "kudu/common/wire_protocol-test-util.h" +#include "kudu/consensus/log_anchor_registry.h" #include "kudu/fs/block_id.h" +#include "kudu/gutil/map-util.h" #include "kudu/gutil/port.h" #include "kudu/gutil/ref_counted.h" #include "kudu/tablet/local_tablet_writer.h" @@ -49,9 +54,12 @@ DEFINE_int64(test_row_set_count, 1000, ""); DEFINE_int64(test_block_count_per_rs, 1000, ""); +using kudu::log::LogAnchorRegistry; +using kudu::log::MinLogIndexAnchorer; using std::map; using std::shared_ptr; using std::unique_ptr; +using std::unordered_set; namespace kudu { namespace tablet { @@ -220,5 +228,72 @@ TEST_F(TestTabletMetadata, BenchmarkCollectBlockIds) { } } +TEST_F(TestTabletMetadata, TestTxnMetadata) { + constexpr const char* kOwner = "txn"; + const Timestamp kDummyTimestamp = Timestamp(1337); + scoped_refptr<LogAnchorRegistry> registry(new LogAnchorRegistry); + TabletMetadata* meta = harness_->tablet()->metadata(); + const auto make_anchor = [&] () { + return unique_ptr<MinLogIndexAnchorer>(new MinLogIndexAnchorer(registry.get(), kOwner)); + }; + int64_t kCommittedTxnId = 1; + int64_t kAbortedTxnId = 2; + int64_t kInFlightTxnId = 3; + meta->AddTxnMetadata(kCommittedTxnId, make_anchor()); + meta->AddCommitTimestamp(kCommittedTxnId, kDummyTimestamp, make_anchor()); + ASSERT_EQ(1, meta->GetTxnMetadata().size()); + + meta->AddTxnMetadata(kAbortedTxnId, make_anchor()); + meta->AbortTransaction(kAbortedTxnId, make_anchor()); + ASSERT_EQ(2, meta->GetTxnMetadata().size()); + + meta->AddTxnMetadata(kInFlightTxnId, make_anchor()); + + // Validate the transactions' fields. + const auto validate_txn_metas = [&] (TabletMetadata* meta) { + auto txn_metas = meta->GetTxnMetadata(); + ASSERT_EQ(3, txn_metas.size()); + ASSERT_TRUE(meta->HasTxnMetadata(kCommittedTxnId)); + ASSERT_TRUE(meta->HasTxnMetadata(kAbortedTxnId)); + ASSERT_TRUE(meta->HasTxnMetadata(kInFlightTxnId)); + + const auto& committed_txn = FindOrDie(txn_metas, kCommittedTxnId); + ASSERT_FALSE(committed_txn->aborted()); + ASSERT_NE(boost::none, committed_txn->commit_timestamp()); + ASSERT_EQ(kDummyTimestamp.value(), *committed_txn->commit_timestamp()); + + const auto& aborted_txn = FindOrDie(txn_metas, kAbortedTxnId); + ASSERT_TRUE(aborted_txn->aborted()); + ASSERT_EQ(boost::none, aborted_txn->commit_timestamp()); + + const auto& in_flight_txn = FindOrDie(txn_metas, kInFlightTxnId); + ASSERT_FALSE(in_flight_txn->aborted()); + ASSERT_EQ(boost::none, in_flight_txn->commit_timestamp()); + + unordered_set<int64_t> in_flight_txn_ids; + unordered_set<int64_t> terminal_txn_ids; + meta->GetTxnIds(&in_flight_txn_ids, &terminal_txn_ids); + ASSERT_EQ(1, in_flight_txn_ids.size()); + ASSERT_EQ(2, terminal_txn_ids.size()); + ASSERT_TRUE(ContainsKey(in_flight_txn_ids, kInFlightTxnId)); + ASSERT_TRUE(ContainsKey(terminal_txn_ids, kAbortedTxnId)); + ASSERT_TRUE(ContainsKey(terminal_txn_ids, kCommittedTxnId)); + }; + NO_FATALS(validate_txn_metas(meta)); + + // Validate the superblock fields are set. + TabletSuperBlockPB superblock_pb; + ASSERT_OK(meta->ToSuperBlock(&superblock_pb)); + ASSERT_EQ(3, superblock_pb.txn_metadata_size()); + + // Flush and reload the metadata. + ASSERT_OK(meta->Flush()); + scoped_refptr<TabletMetadata> new_meta; + ASSERT_OK(TabletMetadata::Load(harness_->fs_manager(), + harness_->tablet()->tablet_id(), + &new_meta)); + NO_FATALS(validate_txn_metas(new_meta.get())); +} + } // namespace tablet } // namespace kudu diff --git a/src/kudu/tablet/tablet_metadata.cc b/src/kudu/tablet/tablet_metadata.cc index 095c681..4d9405e 100644 --- a/src/kudu/tablet/tablet_metadata.cc +++ b/src/kudu/tablet/tablet_metadata.cc @@ -23,15 +23,20 @@ #include <ostream> #include <string> #include <type_traits> +#include <unordered_map> +#include <utility> #include <boost/optional/optional.hpp> #include <gflags/gflags.h> +#include <google/protobuf/stubs/port.h> #include "kudu/common/common.pb.h" #include "kudu/common/schema.h" +#include "kudu/common/timestamp.h" #include "kudu/common/wire_protocol.h" #include "kudu/consensus/opid.pb.h" #include "kudu/consensus/opid_util.h" +#include "kudu/consensus/log_anchor_registry.h" #include "kudu/fs/block_id.h" #include "kudu/fs/block_manager.h" #include "kudu/fs/data_dirs.h" @@ -64,11 +69,14 @@ using kudu::consensus::MinimumOpId; using kudu::consensus::OpId; using kudu::fs::BlockManager; using kudu::fs::BlockDeletionTransaction; +using kudu::log::MinLogIndexAnchorer; using kudu::pb_util::SecureDebugString; using kudu::pb_util::SecureShortDebugString; using std::memory_order_relaxed; using std::shared_ptr; using std::string; +using std::unordered_map; +using std::unordered_set; using std::unique_ptr; using std::vector; using strings::Substitute; @@ -488,6 +496,19 @@ Status TabletMetadata::LoadFromSuperBlock(const TabletSuperBlockPB& superblock) if (superblock.has_table_type() && superblock.table_type() != TableTypePB::DEFAULT_TABLE) { table_type_ = superblock.table_type(); } + + std::unordered_map<int64_t, scoped_refptr<TxnMetadata>> txn_metas; + for (const auto& txn_id_and_metadata : superblock.txn_metadata()) { + const auto& txn_meta = txn_id_and_metadata.second; + EmplaceOrDie(&txn_metas, txn_id_and_metadata.first, + new TxnMetadata( + txn_meta.has_aborted() && txn_meta.aborted(), + txn_meta.has_commit_timestamp() ? + boost::make_optional(txn_meta.commit_timestamp()) : + boost::none + )); + } + txn_metadata_by_txn_id_ = std::move(txn_metas); } // Now is a good time to clean up any orphaned blocks that may have been @@ -573,6 +594,7 @@ Status TabletMetadata::Flush() { MutexLock l_flush(flush_lock_); BlockIdContainer orphaned; TabletSuperBlockPB pb; + vector<unique_ptr<MinLogIndexAnchorer>> anchors_needing_flush; { std::lock_guard<LockType> l(data_lock_); CHECK_GE(num_flush_pins_, 0); @@ -592,12 +614,17 @@ Status TabletMetadata::Flush() { // want to accidentally delete those blocks before that next metadata update // is persisted. See KUDU-701 for details. orphaned.assign(orphaned_blocks_.begin(), orphaned_blocks_.end()); + anchors_needing_flush = std::move(anchors_needing_flush_); } pre_flush_callback_(); RETURN_NOT_OK(ReplaceSuperBlockUnlocked(pb)); TRACE("Metadata flushed"); l_flush.Unlock(); + // Now that we've flushed, we can unanchor our WALs by destructing our + // anchors. + anchors_needing_flush.clear(); + // Now that the superblock is written, try to delete the orphaned blocks. // // If we crash just before the deletion, we'll retry when reloading from @@ -706,6 +733,19 @@ Status TabletMetadata::ToSuperBlockUnlocked(TabletSuperBlockPB* super_block, meta->ToProtobuf(pb.add_rowsets()); } + for (const auto& txn_id_and_metadata : txn_metadata_by_txn_id_) { + TxnMetadataPB meta_pb; + const auto& txn_meta = txn_id_and_metadata.second; + const auto& commit_ts = txn_meta->commit_timestamp(); + if (commit_ts) { + meta_pb.set_commit_timestamp(*commit_ts); + } + if (txn_meta->aborted()) { + meta_pb.set_aborted(true); + } + InsertOrDie(pb.mutable_txn_metadata(), txn_id_and_metadata.first, std::move(meta_pb)); + } + DCHECK(schema_->has_column_ids()); RETURN_NOT_OK_PREPEND(SchemaToPB(*schema_, pb.mutable_schema()), "Couldn't serialize schema into superblock"); @@ -754,6 +794,60 @@ Status TabletMetadata::CreateRowSet(shared_ptr<RowSetMetadata>* rowset) { return Status::OK(); } +void TabletMetadata::AddTxnMetadata(int64_t txn_id, unique_ptr<MinLogIndexAnchorer> log_anchor) { + std::lock_guard<LockType> l(data_lock_); + EmplaceOrDie(&txn_metadata_by_txn_id_, txn_id, new TxnMetadata()); + anchors_needing_flush_.emplace_back(std::move(log_anchor)); +} + +void TabletMetadata::AddCommitTimestamp(int64_t txn_id, Timestamp commit_timestamp, + unique_ptr<MinLogIndexAnchorer> log_anchor) { + std::lock_guard<LockType> l(data_lock_); + auto txn_metadata = FindPtrOrNull(txn_metadata_by_txn_id_, txn_id); + CHECK(txn_metadata); + txn_metadata->set_commit_timestamp(commit_timestamp.value()); + anchors_needing_flush_.emplace_back(std::move(log_anchor)); +} + +void TabletMetadata::AbortTransaction(int64_t txn_id, unique_ptr<MinLogIndexAnchorer> log_anchor) { + std::lock_guard<LockType> l(data_lock_); + auto txn_metadata = FindPtrOrNull(txn_metadata_by_txn_id_, txn_id); + CHECK(txn_metadata); + txn_metadata->set_aborted(); + anchors_needing_flush_.emplace_back(std::move(log_anchor)); +} + +bool TabletMetadata::HasTxnMetadata(int64_t txn_id) { + std::lock_guard<LockType> l(data_lock_); + return ContainsKey(txn_metadata_by_txn_id_, txn_id); +} + +void TabletMetadata::GetTxnIds(unordered_set<int64_t>* in_flight_txn_ids, + unordered_set<int64_t>* terminal_txn_ids) { + std::unordered_set<int64_t> in_flights; + std::unordered_set<int64_t> terminals; + std::lock_guard<LockType> l(data_lock_); + for (const auto& txn_id_and_metadata : txn_metadata_by_txn_id_) { + const auto& txn_meta = txn_id_and_metadata.second; + if (txn_meta->commit_timestamp() || txn_meta->aborted()) { + if (terminal_txn_ids) { + EmplaceOrDie(&terminals, txn_id_and_metadata.first); + } + } else { + EmplaceOrDie(&in_flights, txn_id_and_metadata.first); + } + } + *in_flight_txn_ids = std::move(in_flights); + if (terminal_txn_ids) { + *terminal_txn_ids = std::move(terminals); + } +} + +unordered_map<int64_t, scoped_refptr<TxnMetadata>> TabletMetadata::GetTxnMetadata() const { + std::lock_guard<LockType> l(data_lock_); + return txn_metadata_by_txn_id_; +} + const RowSetMetadata *TabletMetadata::GetRowSetForTests(int64_t id) const { for (const shared_ptr<RowSetMetadata>& rowset_meta : rowsets_) { if (rowset_meta->id() == id) { diff --git a/src/kudu/tablet/tablet_metadata.h b/src/kudu/tablet/tablet_metadata.h index 8b26887..b6eeae8 100644 --- a/src/kudu/tablet/tablet_metadata.h +++ b/src/kudu/tablet/tablet_metadata.h @@ -19,10 +19,14 @@ #include <atomic> #include <cstdint> #include <memory> +#include <mutex> #include <string> +#include <unordered_map> #include <unordered_set> +#include <utility> #include <vector> +#include <boost/none_t.hpp> #include <boost/optional/optional.hpp> #include <glog/logging.h> @@ -43,10 +47,15 @@ namespace kudu { class BlockIdPB; class FsManager; class Schema; +class Timestamp; namespace consensus { class OpId; -} +} // namespace consensus + +namespace log { +class MinLogIndexAnchorer; +} // namespace log namespace tablet { @@ -57,6 +66,42 @@ typedef std::unordered_set<int64_t> RowSetMetadataIds; extern const int64_t kNoDurableMemStore; +// Encapsulates the persistent state associated with a transaction. +class TxnMetadata : public RefCountedThreadSafe<TxnMetadata> { + public: + TxnMetadata(bool aborted = false, + boost::optional<int64_t> commit_ts = boost::none) + : aborted_(aborted), + commit_timestamp_(std::move(commit_ts)) {} + void set_aborted() { + std::lock_guard<simple_spinlock> l(lock_); + CHECK(boost::none == commit_timestamp_); + aborted_ = true; + } + void set_commit_timestamp(int64_t commit_ts) { + std::lock_guard<simple_spinlock> l(lock_); + CHECK(boost::none == commit_timestamp_); + commit_timestamp_ = commit_ts; + } + + bool aborted() const { + std::lock_guard<simple_spinlock> l(lock_); + return aborted_; + } + boost::optional<int64_t> commit_timestamp() const { + std::lock_guard<simple_spinlock> l(lock_); + return commit_timestamp_; + } + + private: + friend class RefCountedThreadSafe<TxnMetadata>; + ~TxnMetadata() = default; + + mutable simple_spinlock lock_; + bool aborted_; + boost::optional<int64_t> commit_timestamp_; +}; + // Manages the "blocks tracking" for the specified tablet. // // TabletMetadata is owned by the Tablet. As new blocks are written to store @@ -230,6 +275,33 @@ class TabletMetadata : public RefCountedThreadSafe<TabletMetadata> { // calls to do so. Status CreateRowSet(std::shared_ptr<RowSetMetadata>* rowset); + // Records the fact that the given transaction ID has been started, adopting + // the anchor until the metadata is flushed. The transaction must not already + // have metadata associated with it. + void AddTxnMetadata(int64_t txn_id, std::unique_ptr<log::MinLogIndexAnchorer> log_anchor); + + // Records the fact that the transaction was committed at the given + // timestamp, adopting the anchor until the metadata is flushed. The + // transaction must already have metadata associated with it. + // + // TODO(awong): implement a way to remove timstamps once all relevant + // transactional mutations have been merged with the rest of the tablet. + void AddCommitTimestamp(int64_t txn_id, Timestamp commit_timestamp, + std::unique_ptr<log::MinLogIndexAnchorer> log_anchor); + + // Adds an abort bit to the transaction metadata, adopting the anchor until + // the metadata is flushed. The transaction must already have metadata + // associated with it. + void AbortTransaction(int64_t txn_id, std::unique_ptr<log::MinLogIndexAnchorer> log_anchor); + + // Returns whether a given transaction has metadata. + bool HasTxnMetadata(int64_t txn_id); + + // Returns the transaction IDs that were persisted as being in-flight or + // terminal. + void GetTxnIds(std::unordered_set<int64_t>* in_flight_txn_ids, + std::unordered_set<int64_t>* terminal_txn_ids = nullptr); + const RowSetMetadataVector& rowsets() const { return rowsets_; } FsManager *fs_manager() const { return fs_manager_; } @@ -272,6 +344,8 @@ class TabletMetadata : public RefCountedThreadSafe<TabletMetadata> { RowSetMetadata *GetRowSetForTests(int64_t id); + std::unordered_map<int64_t, scoped_refptr<TxnMetadata>> GetTxnMetadata() const; + // Return standard "T xxx P yyy" log prefix. std::string LogPrefix() const; @@ -372,6 +446,13 @@ class TabletMetadata : public RefCountedThreadSafe<TabletMetadata> { int64_t last_durable_mrs_id_; + // Log anchors that are waiting on a metadata flush to be unanchored. + std::vector<std::unique_ptr<log::MinLogIndexAnchorer>> anchors_needing_flush_; + + // Commit timestamps for transactions whose mutations have not yet been + // merged with main state. + std::unordered_map<int64_t, scoped_refptr<TxnMetadata>> txn_metadata_by_txn_id_;; + // The current schema version. This is owned by this class. // We don't use unique_ptr so that we can do an atomic swap. Schema* schema_; diff --git a/src/kudu/tablet/txn_participant-test.cc b/src/kudu/tablet/txn_participant-test.cc index 44ded0e..02c9287 100644 --- a/src/kudu/tablet/txn_participant-test.cc +++ b/src/kudu/tablet/txn_participant-test.cc @@ -49,6 +49,7 @@ #include "kudu/tablet/ops/op_tracker.h" #include "kudu/tablet/ops/participant_op.h" #include "kudu/tablet/tablet.h" +#include "kudu/tablet/tablet_metadata.h" #include "kudu/tablet/tablet_replica-test-base.h" #include "kudu/tablet/tablet_replica.h" #include "kudu/tablet/txn_participant-test-util.h" @@ -140,8 +141,8 @@ class TxnParticipantTest : public TabletReplicaTestBase { // Writes an op to the WAL, rolls over onto a new WAL segment, and flushes // the MRS, leaving us with a new WAL segment that should be GC-able unless // previous WAL segments are anchored. - Status WriteRolloverAndFlush(int* current_key) { - RETURN_NOT_OK(Write(*current_key++)); + Status WriteRolloverAndFlush(int key) { + RETURN_NOT_OK(Write(key)); RETURN_NOT_OK(tablet_replica_->log()->WaitUntilAllFlushed()); RETURN_NOT_OK(tablet_replica_->log()->AllocateSegmentAndRollOverForTests()); return tablet_replica_->tablet()->Flush(); @@ -378,7 +379,7 @@ TEST_F(TxnParticipantTest, TestReplayParticipantOps) { ASSERT_EQ(vector<TxnParticipant::TxnEntry>({ { kTxnId, Txn::kCommitted, kDummyCommitTimestamp } }), txn_participant()->GetTxnsForTests()); - ASSERT_OK(RestartReplica()); + ASSERT_OK(RestartReplica(/*reset_tablet*/true)); ASSERT_EQ(vector<TxnParticipant::TxnEntry>({ { kTxnId, Txn::kCommitted, kDummyCommitTimestamp } }), txn_participant()->GetTxnsForTests()); @@ -396,12 +397,17 @@ TEST_F(TxnParticipantTest, TestAllOpsRegisterAnchors) { ParticipantResponsePB resp; ASSERT_OK(CallParticipantOp(tablet_replica_.get(), txn_id, op, kDummyCommitTimestamp, &resp)); - ASSERT_EQ(1, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests()); - int64_t log_index = -1; - tablet_replica_->log_anchor_registry()->GetEarliestRegisteredLogIndex(&log_index); - ASSERT_EQ(++expected_index, log_index); + ASSERT_OK(tablet_replica_->tablet_metadata()->Flush()); + expected_index++; + if (op == ParticipantOpPB::BEGIN_COMMIT) { + ASSERT_EQ(1, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests()); + int64_t log_index = -1; + tablet_replica_->log_anchor_registry()->GetEarliestRegisteredLogIndex(&log_index); + ASSERT_EQ(expected_index, log_index); + } else { + ASSERT_EQ(0, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests()); + } } - ASSERT_TRUE(txn_participant()->ClearIfCompleteForTests(txn_id)); ASSERT_EQ(0, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests()); }; NO_FATALS(check_participant_ops_are_anchored(1, { @@ -416,76 +422,165 @@ TEST_F(TxnParticipantTest, TestAllOpsRegisterAnchors) { })); } -// Test that participant ops are anchored, the anchors are updated as a -// transaction's state gets updated. -TEST_F(TxnParticipantTest, TestParticipantOpsAnchorWALs) { +// Test that participant ops result in tablet metadata updates that can survive +// restarts, and that the appropriate anchors are in place as we progress +// through a transaction's life cycle. +TEST_F(TxnParticipantTest, TestTxnMetadataSurvivesRestart) { const int64_t kTxnId = 1; - // First, perform some initial participant ops and roll the WAL segments so - // there are some candidates for WAL GC. + // First, do a sanity check that there's nothing GCable. + int64_t gcable_size; + ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size)); + ASSERT_EQ(0, gcable_size); + + // Perform some initial participant ops and roll the WAL segments so there + // are some candidates for WAL GC. ParticipantResponsePB resp; ASSERT_OK(CallParticipantOp(tablet_replica_.get(), kTxnId, ParticipantOpPB::BEGIN_TXN, kDummyCommitTimestamp, &resp)); ASSERT_FALSE(resp.has_error()); + ASSERT_OK(tablet_replica_->tablet_metadata()->Flush()); ASSERT_OK(tablet_replica_->log()->WaitUntilAllFlushed()); ASSERT_OK(tablet_replica_->log()->AllocateSegmentAndRollOverForTests()); + ASSERT_EQ(0, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests()); - // Write and flush some ops that would otherwise lead to GC-able WAL - // segments. Since there is an anchored participant op in the WAL before - // these writes, the tablet should not be GC-able. + // We can't GC down to 0 segments, so write some rows and roll onto new + // segments so we have a segment to GC. int current_key = 0; - ASSERT_OK(WriteRolloverAndFlush(¤t_key)); - ASSERT_OK(WriteRolloverAndFlush(¤t_key)); - int64_t gcable_size; + ASSERT_OK(WriteRolloverAndFlush(current_key++)); + ASSERT_OK(WriteRolloverAndFlush(current_key++)); + ASSERT_EQ(0, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests()); + + // We should be able to GC the WALs that have the BEGIN_TXN op. + ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size)); + ASSERT_GT(gcable_size, 0); + ASSERT_OK(tablet_replica_->RunLogGC()); ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size)); ASSERT_EQ(0, gcable_size); - // WAL GC should proceed to clear out ops for both the transaction and the - // inserts. + // At this point, we have a single segment with some writes in it. + // Write and flush a BEGIN_COMMIT op. Once we GC, our WAL will start on this + // op, and WALs should be anchored until the commit is finalized, regardless + // of whether there are more segments. ASSERT_OK(CallParticipantOp(tablet_replica_.get(), kTxnId, ParticipantOpPB::BEGIN_COMMIT, kDummyCommitTimestamp, &resp)); ASSERT_FALSE(resp.has_error()); - ASSERT_OK(CallParticipantOp(tablet_replica_.get(), kTxnId, ParticipantOpPB::FINALIZE_COMMIT, - kDummyCommitTimestamp, &resp)); - ASSERT_FALSE(resp.has_error()); + ASSERT_OK(tablet_replica_->tablet_metadata()->Flush()); + ASSERT_OK(tablet_replica_->log()->WaitUntilAllFlushed()); + ASSERT_OK(tablet_replica_->log()->AllocateSegmentAndRollOverForTests()); ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size)); ASSERT_GT(gcable_size, 0); - ASSERT_OK(tablet_replica_->RunLogGC()); ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size)); ASSERT_EQ(0, gcable_size); - // Ensure the transaction bootstraps to the expected state. - // NOTE: we need to reset the tablet here to reset the TxnParticipant. - // Otherwise, we might start the replica with a LogAnchor already registered. + // Even if we write and roll over, we shouldn't be able to GC because of the + // anchor. + ASSERT_OK(WriteRolloverAndFlush(current_key++)); + ASSERT_OK(WriteRolloverAndFlush(current_key++)); + ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size)); + ASSERT_EQ(0, gcable_size); + ASSERT_EQ(1, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests()); ASSERT_OK(RestartReplica(/*reset_tablet*/true)); ASSERT_EQ(vector<TxnParticipant::TxnEntry>({ - { kTxnId, Txn::kCommitted, kDummyCommitTimestamp } + { kTxnId, Txn::kCommitInProgress, -1 } }), txn_participant()->GetTxnsForTests()); - // Roll onto new WAL segments and add more segments so we can get to a state - // without any transaction ops in the WALs. - ASSERT_OK(WriteRolloverAndFlush(¤t_key)); - ASSERT_OK(WriteRolloverAndFlush(¤t_key)); + // Once we finalize the commit, the BEGIN_COMMIT anchor should be released. + ASSERT_EQ(1, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests()); + ASSERT_OK(CallParticipantOp(tablet_replica_.get(), kTxnId, ParticipantOpPB::FINALIZE_COMMIT, + kDummyCommitTimestamp, &resp)); + ASSERT_FALSE(resp.has_error()); + ASSERT_EQ(1, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests()); + ASSERT_OK(tablet_replica_->tablet_metadata()->Flush()); + ASSERT_EQ(0, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests()); - // While the transaction still exists, we shouldn't GC anything. + // Because we rebuilt the WAL, we only have one segment and thus shouldn't be + // able to GC anything until we add more segments. ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size)); ASSERT_EQ(0, gcable_size); - - // Once we cull the transaction state in memory, we should be left with no - // trace of the transaction. - ASSERT_TRUE(txn_participant()->ClearIfCompleteForTests(kTxnId)); + ASSERT_OK(WriteRolloverAndFlush(current_key++)); + ASSERT_OK(WriteRolloverAndFlush(current_key++)); ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size)); ASSERT_GT(gcable_size, 0); - ASSERT_OK(tablet_replica_->RunLogGC()); ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size)); ASSERT_EQ(0, gcable_size); - // Do a final check that we bootstrap to the expected state (i.e. the - // transaction is culled). + // Ensure the transaction bootstraps to the expected state. ASSERT_OK(RestartReplica(/*reset_tablet*/true)); - ASSERT_TRUE(txn_participant()->GetTxnsForTests().empty()); + ASSERT_EQ(vector<TxnParticipant::TxnEntry>({ + { kTxnId, Txn::kCommitted, kDummyCommitTimestamp } + }), txn_participant()->GetTxnsForTests()); +} + +class MetadataFlushTxnParticipantTest : public TxnParticipantTest, + public ::testing::WithParamInterface<bool> {}; + +// Test rebuilding transaction state from the WALs and metadata. +TEST_P(MetadataFlushTxnParticipantTest, TestRebuildTxnMetadata) { + const bool should_flush = GetParam(); + const int64_t kTxnId = 1; + ParticipantResponsePB resp; + ASSERT_OK(CallParticipantOp(tablet_replica_.get(), kTxnId, ParticipantOpPB::BEGIN_TXN, + kDummyCommitTimestamp, &resp)); + ASSERT_FALSE(resp.has_error()); + if (should_flush) { + ASSERT_OK(tablet_replica_->tablet_metadata()->Flush()); + } + + ASSERT_OK(RestartReplica(/*reset_tablet*/true)); + ASSERT_EQ(vector<TxnParticipant::TxnEntry>({ + { kTxnId, Txn::kOpen, -1 } + }), txn_participant()->GetTxnsForTests()); + ASSERT_OK(CallParticipantOp(tablet_replica_.get(), kTxnId, ParticipantOpPB::BEGIN_COMMIT, + kDummyCommitTimestamp, &resp)); + ASSERT_FALSE(resp.has_error()); + // NOTE: BEGIN_COMMIT ops don't anchor on metadata flush, so don't bother + // flushing. + + ASSERT_OK(RestartReplica(/*reset_tablet*/true)); + ASSERT_EQ(vector<TxnParticipant::TxnEntry>({ + { kTxnId, Txn::kCommitInProgress, -1 } + }), txn_participant()->GetTxnsForTests()); + ASSERT_OK(CallParticipantOp(tablet_replica_.get(), kTxnId, ParticipantOpPB::FINALIZE_COMMIT, + kDummyCommitTimestamp, &resp)); + ASSERT_FALSE(resp.has_error()); + if (should_flush) { + ASSERT_OK(tablet_replica_->tablet_metadata()->Flush()); + } + + ASSERT_OK(RestartReplica(/*reset_tablet*/true)); + ASSERT_EQ(vector<TxnParticipant::TxnEntry>({ + { kTxnId, Txn::kCommitted, kDummyCommitTimestamp } + }), txn_participant()->GetTxnsForTests()); + + // Now perform the same validation but for a transaction that gets aborted. + const int64_t kAbortedTxnId = 2; + ASSERT_OK(CallParticipantOp(tablet_replica_.get(), kAbortedTxnId, ParticipantOpPB::BEGIN_TXN, + kDummyCommitTimestamp, &resp)); + ASSERT_FALSE(resp.has_error()); + if (should_flush) { + ASSERT_OK(tablet_replica_->tablet_metadata()->Flush()); + } + ASSERT_OK(RestartReplica(/*reset_tablet*/true)); + ASSERT_EQ(vector<TxnParticipant::TxnEntry>({ + { kTxnId, Txn::kCommitted, kDummyCommitTimestamp }, + { kAbortedTxnId, Txn::kOpen, -1 } + }), txn_participant()->GetTxnsForTests()); + ASSERT_OK(CallParticipantOp(tablet_replica_.get(), kAbortedTxnId, ParticipantOpPB::ABORT_TXN, + kDummyCommitTimestamp, &resp)); + ASSERT_FALSE(resp.has_error()); + if (should_flush) { + ASSERT_OK(tablet_replica_->tablet_metadata()->Flush()); + } + ASSERT_OK(RestartReplica(/*reset_tablet*/true)); + ASSERT_EQ(vector<TxnParticipant::TxnEntry>({ + { kTxnId, Txn::kCommitted, kDummyCommitTimestamp }, + { kAbortedTxnId, Txn::kAborted, -1 } + }), txn_participant()->GetTxnsForTests()); } +INSTANTIATE_TEST_CASE_P(ShouldFlushMetadata, MetadataFlushTxnParticipantTest, + ::testing::Values(true, false)); // Similar to the above test, but checking that in-flight ops anchor the WALs. TEST_F(TxnParticipantTest, TestActiveParticipantOpsAnchorWALs) { @@ -512,8 +607,8 @@ TEST_F(TxnParticipantTest, TestActiveParticipantOpsAnchorWALs) { // Create some WAL segments to ensure some would-be-GC-able segments. int current_key = 0; - ASSERT_OK(WriteRolloverAndFlush(¤t_key)); - ASSERT_OK(WriteRolloverAndFlush(¤t_key)); + ASSERT_OK(WriteRolloverAndFlush(current_key++)); + ASSERT_OK(WriteRolloverAndFlush(current_key++)); // Our participant op is still pending, and nothing should be GC-able. ASSERT_EQ(1, tablet_replica_->op_tracker()->GetNumPendingForTests()); @@ -522,29 +617,31 @@ TEST_F(TxnParticipantTest, TestActiveParticipantOpsAnchorWALs) { ASSERT_EQ(0, gcable_size); // Finish applying the participant op and proceed to completion. + // Even with more segments added, the WAL should be anchored until we flush + // the tablet metadata to include the transaction. apply_continue.CountDown(); latch.Wait(); + ASSERT_FALSE(resp.has_error()); + ASSERT_EQ(1, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests()); + ASSERT_OK(tablet_replica_->tablet_metadata()->Flush()); + ASSERT_EQ(0, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests()); - // Even though we've completed the op, the replicate message should still be - // anchored while the in-memory transaction state exists on this participant. + // Add some segments to ensure there are enough segments to GC. + ASSERT_OK(WriteRolloverAndFlush(current_key++)); + ASSERT_OK(WriteRolloverAndFlush(current_key++)); ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size)); - ASSERT_EQ(0, gcable_size); - ASSERT_OK(WriteRolloverAndFlush(¤t_key)); - ASSERT_OK(WriteRolloverAndFlush(¤t_key)); - ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size)); - ASSERT_EQ(0, gcable_size); + ASSERT_GT(gcable_size, 0); - // The moment we update the in-memory state, we should be able to GC. - ASSERT_OK(CallParticipantOp(tablet_replica_.get(), kTxnId, ParticipantOpPB::BEGIN_COMMIT, - kDummyCommitTimestamp, &resp)); - ASSERT_FALSE(resp.has_error()); + // Now that we've completed the op, we can get rid of the WAL segments that + // had the participant op. + ASSERT_OK(tablet_replica_->RunLogGC()); ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size)); - ASSERT_GT(gcable_size, 0); + ASSERT_EQ(0, gcable_size); // As a sanity check, ensure we get to the expected state if we reboot. ASSERT_OK(RestartReplica(/*reset_tablet*/true)); ASSERT_EQ(vector<TxnParticipant::TxnEntry>({ - { kTxnId, Txn::kCommitInProgress, -1 } + { kTxnId, Txn::kOpen, -1 } }), txn_participant()->GetTxnsForTests()); } diff --git a/src/kudu/tablet/txn_participant.cc b/src/kudu/tablet/txn_participant.cc index 7424c7b..eebf2be 100644 --- a/src/kudu/tablet/txn_participant.cc +++ b/src/kudu/tablet/txn_participant.cc @@ -24,6 +24,8 @@ #include <utility> #include <vector> +#include <boost/optional/optional.hpp> + #include "kudu/gutil/map-util.h" #include "kudu/gutil/ref_counted.h" #include "kudu/gutil/strings/substitute.h" @@ -36,7 +38,7 @@ namespace kudu { namespace tablet { Txn::~Txn() { - CHECK_OK(log_anchor_registry_->UnregisterIfAnchored(&log_anchor_)); + CHECK_OK(log_anchor_registry_->UnregisterIfAnchored(&begin_commit_anchor_)); // As a sanity check, make sure our state makes sense: if we have an MVCC op // for commit, we should have started to commit. if (commit_op_) { @@ -51,11 +53,19 @@ void Txn::AcquireWriteLock(std::unique_lock<rw_semaphore>* txn_lock) { *txn_lock = std::move(l); } +void TxnParticipant::CreateOpenTransaction(int64_t txn_id, + LogAnchorRegistry* log_anchor_registry) { + std::lock_guard<simple_spinlock> l(lock_); + EmplaceOrDie(&txns_, txn_id, new Txn(txn_id, log_anchor_registry, + tablet_metadata_, Txn::kOpen)); +} + scoped_refptr<Txn> TxnParticipant::GetOrCreateTransaction(int64_t txn_id, LogAnchorRegistry* log_anchor_registry) { // TODO(awong): add a 'user' field to these transactions. std::lock_guard<simple_spinlock> l(lock_); - return LookupOrInsertNewSharedPtr(&txns_, txn_id, txn_id, log_anchor_registry); + return LookupOrInsertNewSharedPtr(&txns_, txn_id, txn_id, log_anchor_registry, + tablet_metadata_); } void TxnParticipant::ClearIfInitFailed(int64_t txn_id) { @@ -68,7 +78,7 @@ void TxnParticipant::ClearIfInitFailed(int64_t txn_id) { } } -bool TxnParticipant::ClearIfCompleteForTests(int64_t txn_id) { +bool TxnParticipant::ClearIfComplete(int64_t txn_id) { std::lock_guard<simple_spinlock> l(lock_); Txn* txn = FindPointeeOrNull(txns_, txn_id); // NOTE: If this is the only reference to the transaction, we can forego @@ -95,6 +105,46 @@ vector<TxnParticipant::TxnEntry> TxnParticipant::GetTxnsForTests() const { }); } } + // Include any transactions that are in a terminal state by first updating + // any now-terminal transactions that we already created entries for above. + auto txn_metas = tablet_metadata_->GetTxnMetadata(); + for (auto& txn_entry : txns) { + auto* txn_meta = FindPointeeOrNull(txn_metas, txn_entry.txn_id); + if (txn_meta) { + txn_metas.erase(txn_entry.txn_id); + if (txn_meta->aborted()) { + txn_entry.state = Txn::kAborted; + txn_entry.commit_timestamp = -1; + continue; + } + const auto& commit_ts = txn_meta->commit_timestamp(); + if (commit_ts) { + txn_entry.state = Txn::kCommitted; + txn_entry.commit_timestamp = *commit_ts; + continue; + } + } + } + // Create entries for the rest of the terminal transactions. + for (const auto& id_and_meta : txn_metas) { + const auto& txn_id = id_and_meta.first; + const auto& txn_meta = id_and_meta.second; + TxnEntry txn_entry; + txn_entry.txn_id = txn_id; + if (txn_meta->aborted()) { + txn_entry.state = Txn::kAborted; + txn_entry.commit_timestamp = -1; + txns.emplace_back(std::move(txn_entry)); + continue; + } + const auto& commit_ts = txn_meta->commit_timestamp(); + if (commit_ts) { + txn_entry.state = Txn::kCommitted; + txn_entry.commit_timestamp = *commit_ts; + txns.emplace_back(std::move(txn_entry)); + continue; + } + } std::sort(txns.begin(), txns.end(), [] (const TxnEntry& a, const TxnEntry& b) { return a.txn_id < b.txn_id; }); return txns; diff --git a/src/kudu/tablet/txn_participant.h b/src/kudu/tablet/txn_participant.h index eb7b741..795e712 100644 --- a/src/kudu/tablet/txn_participant.h +++ b/src/kudu/tablet/txn_participant.h @@ -33,6 +33,7 @@ #include "kudu/gutil/ref_counted.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/tablet/mvcc.h" +#include "kudu/tablet/tablet_metadata.h" #include "kudu/util/locks.h" #include "kudu/util/rw_semaphore.h" #include "kudu/util/status.h" @@ -91,10 +92,12 @@ class Txn : public RefCountedThreadSafe<Txn> { // contains the participant op that replays to the transaction's current // in-memory state is not GCed, allowing us to rebuild this transaction's // in-memory state upon rebooting a server. - Txn(int64_t txn_id, log::LogAnchorRegistry* log_anchor_registry) + Txn(int64_t txn_id, log::LogAnchorRegistry* log_anchor_registry, + scoped_refptr<TabletMetadata> tablet_metadata, State init_state = kInitializing) : txn_id_(txn_id), log_anchor_registry_(log_anchor_registry), - state_(kInitializing), + tablet_metadata_(std::move(tablet_metadata)), + state_(init_state), commit_timestamp_(-1) {} ~Txn(); @@ -108,6 +111,11 @@ class Txn : public RefCountedThreadSafe<Txn> { // replicating a participant op. Status ValidateBeginTransaction() const { DCHECK(state_lock_.is_locked()); + if (PREDICT_FALSE(tablet_metadata_->HasTxnMetadata(txn_id_))) { + return Status::IllegalState( + strings::Substitute("Transaction metadata for transaction $0 already exists", + txn_id_)); + } if (PREDICT_FALSE(state_ != kInitializing)) { return Status::IllegalState( strings::Substitute("Cannot begin transaction in state: $0", @@ -149,26 +157,23 @@ class Txn : public RefCountedThreadSafe<Txn> { // Applies the given state transitions. Should be called while holding the // state lock in write mode after successfully replicating a participant op. - void BeginTransaction(const consensus::OpId& op_id) { - CHECK_OK(log_anchor_registry_->RegisterOrUpdate( - op_id.index(), strings::Substitute("Txn-$0-$1", txn_id_, this), &log_anchor_)); + void BeginTransaction() { SetState(kOpen); } void BeginCommit(const consensus::OpId& op_id) { CHECK_OK(log_anchor_registry_->RegisterOrUpdate( - op_id.index(), strings::Substitute("Txn-$0-$1", txn_id_, this), &log_anchor_)); + op_id.index(), strings::Substitute("BEGIN_COMMIT-$0-$1", txn_id_, this), + &begin_commit_anchor_)); SetState(kCommitInProgress); } - void FinalizeCommit(const consensus::OpId& op_id, int64_t finalized_commit_timestamp) { - CHECK_OK(log_anchor_registry_->RegisterOrUpdate( - op_id.index(), strings::Substitute("Txn-$0-$1", txn_id_, this), &log_anchor_)); - SetState(kCommitted); + void FinalizeCommit(int64_t finalized_commit_timestamp) { commit_timestamp_ = finalized_commit_timestamp; + SetState(kCommitted); + log_anchor_registry_->UnregisterIfAnchored(&begin_commit_anchor_); } - void AbortTransaction(const consensus::OpId& op_id) { - CHECK_OK(log_anchor_registry_->RegisterOrUpdate( - op_id.index(), strings::Substitute("Txn-$0-$1", txn_id_, this), &log_anchor_)); + void AbortTransaction() { SetState(kAborted); + log_anchor_registry_->UnregisterIfAnchored(&begin_commit_anchor_); } // Simple accessors for state. No locks are required to call these. @@ -178,6 +183,9 @@ class Txn : public RefCountedThreadSafe<Txn> { int64_t commit_timestamp() const { return commit_timestamp_; } + int64_t txn_id() const { + return txn_id_; + } void SetCommitOp(std::unique_ptr<ScopedOp> commit_op) { DCHECK(nullptr == commit_op_.get()); @@ -200,6 +208,13 @@ class Txn : public RefCountedThreadSafe<Txn> { // Returns an error if the transaction has not finished initializing. Status CheckFinishedInitializing() const { if (PREDICT_FALSE(state_ == kInitializing)) { + if (tablet_metadata_->HasTxnMetadata(txn_id_)) { + // We created this transaction as a part of this op (i.e. it was not + // already in flight), and there is existing metadata for it. + return Status::IllegalState( + strings::Substitute("Transaction metadata for transaction $0 already exists", + txn_id_)); + } return Status::NotFound("Transaction hasn't been successfully started"); } return Status::OK(); @@ -211,7 +226,12 @@ class Txn : public RefCountedThreadSafe<Txn> { // Log anchor registry with which to anchor WAL segments, and an anchor to // update upon applying a state change. log::LogAnchorRegistry* log_anchor_registry_; - log::LogAnchor log_anchor_; + + // Anchor used to prevent GC of a BEGIN_COMMIT op. + log::LogAnchor begin_commit_anchor_; + + // Tablet metadata used to persist this transaction's metadata. + scoped_refptr<TabletMetadata> tablet_metadata_; // Lock protecting access to 'state_' and 'commit_timestamp'. Ops that intend // on mutating 'state_' must take this lock in write mode. Ops that intend on @@ -237,6 +257,9 @@ class Txn : public RefCountedThreadSafe<Txn> { // Tracks the on-going transactions in which a given tablet is participating. class TxnParticipant { public: + explicit TxnParticipant(scoped_refptr<TabletMetadata> tmeta) + : tablet_metadata_(std::move(tmeta)) {} + // Convenience struct representing a Txn of this participant. This is useful // for testing, as it easy to construct. struct TxnEntry { @@ -245,6 +268,10 @@ class TxnParticipant { int64_t commit_timestamp; }; + // Creates a transaction in kOpen. + void CreateOpenTransaction(int64_t txn_id, + log::LogAnchorRegistry* log_anchor_registry); + // Gets the transaction state for the given transaction ID, creating it in // the kInitializing state if one doesn't already exist. scoped_refptr<Txn> GetOrCreateTransaction(int64_t txn_id, @@ -266,9 +293,11 @@ class TxnParticipant { // // Returns whether or not this call actually cleared the transaction (i.e. // returns 'false' if the transaction was not found).. - bool ClearIfCompleteForTests(int64_t txn_id); + bool ClearIfComplete(int64_t txn_id); - // Returns the transactions, sorted by transaction ID. + // Returns the transactions, sorted by transaction ID. This returns both + // in-flight transactions tracked by 'txns_' as well as transactions that + // have terminated and persisted metadata via abort or commit. std::vector<TxnEntry> GetTxnsForTests() const; private: @@ -277,6 +306,9 @@ class TxnParticipant { // Maps from transaction ID to the corresponding transaction state. std::unordered_map<int64_t, scoped_refptr<Txn>> txns_; + + // Tablet metadata used to persist this transaction's metadata. + scoped_refptr<TabletMetadata> tablet_metadata_; }; inline bool operator==(const TxnParticipant::TxnEntry& lhs, const TxnParticipant::TxnEntry& rhs) {
