This is an automated email from the ASF dual-hosted git repository. awong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit e5202c0fc04f97d22ea3eb88625d0bd9d86d07f0 Author: Andrew Wong <[email protected]> AuthorDate: Wed May 12 18:40:59 2021 -0700 [txns] add a means to get transaction metadata from participant This patch adds a GET_METADATA participant RPC and a method to call it through the TxnSystemClient. The RPC currently returns the TxnMetadataPB for the given transaction, giving a bit of insight into the state of the given participant. I will use this in an upcoming tool that drills into a transaction and its participants. Change-Id: I4b03f13f174bd9a83609fb7ed6106746777b4207 Reviewed-on: http://gerrit.cloudera.org:8080/17446 Tested-by: Kudu Jenkins Reviewed-by: Alexey Serbin <[email protected]> --- .../integration-tests/txn_participant-itest.cc | 62 +++++++++++++++++++++- src/kudu/tablet/tablet_metadata.cc | 30 +++++------ src/kudu/tablet/tablet_metadata.h | 4 ++ src/kudu/tablet/txn_metadata.h | 28 ++++++++-- src/kudu/tablet/txn_participant-test.cc | 54 +++++++++++++++++++ src/kudu/transactions/participant_rpc.cc | 23 +++++--- src/kudu/transactions/participant_rpc.h | 11 +++- src/kudu/transactions/txn_system_client.cc | 15 ++++-- src/kudu/transactions/txn_system_client.h | 10 +++- src/kudu/tserver/tablet_service.cc | 27 ++++++++++ src/kudu/tserver/tserver_admin.proto | 7 +++ 11 files changed, 232 insertions(+), 39 deletions(-) diff --git a/src/kudu/integration-tests/txn_participant-itest.cc b/src/kudu/integration-tests/txn_participant-itest.cc index 46f6bc9..a8e7267 100644 --- a/src/kudu/integration-tests/txn_participant-itest.cc +++ b/src/kudu/integration-tests/txn_participant-itest.cc @@ -34,10 +34,10 @@ #include "kudu/clock/clock.h" #include "kudu/common/partial_row.h" #include "kudu/common/row_operations.h" +#include "kudu/common/row_operations.pb.h" #include "kudu/common/timestamp.h" #include "kudu/common/wire_protocol-test-util.h" #include "kudu/common/wire_protocol.h" -#include "kudu/common/wire_protocol.pb.h" #include "kudu/consensus/raft_consensus.h" #include "kudu/consensus/time_manager.h" #include "kudu/gutil/ref_counted.h" @@ -98,7 +98,7 @@ using kudu::tablet::kInitializing; using kudu::tablet::kOpen; using kudu::tablet::MakeParticipantOp; using kudu::tablet::TabletReplica; -using kudu::tablet::TxnState; +using kudu::tablet::TxnMetadataPB; using kudu::tablet::TxnParticipant; using kudu::tablet::TxnState; using kudu::transactions::TxnSystemClient; @@ -831,6 +831,64 @@ TEST_F(TxnParticipantITest, TestProxyTabletNotFound) { } } +TEST_F(TxnParticipantITest, TestTxnSystemClientGetMetadata) { + unique_ptr<TxnSystemClient> txn_client; + ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), &txn_client)); + constexpr const auto kTxnId = 0; + constexpr const int kLeaderIdx = 0; + vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx); + auto* leader_replica = replicas[kLeaderIdx]; + const auto tablet_id = leader_replica->tablet_id(); + ASSERT_OK(leader_replica->consensus()->WaitUntilLeaderForTests(kDefaultTimeout)); + + // Get commit-related metadata. + TxnMetadataPB meta_pb; + Status s = txn_client->ParticipateInTransaction( + tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::GET_METADATA), kDefaultTimeout, + /*begin_commit_timestamp*/nullptr, &meta_pb); + ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString(); + + ASSERT_OK(txn_client->ParticipateInTransaction( + tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::BEGIN_TXN), kDefaultTimeout)); + ASSERT_OK(txn_client->ParticipateInTransaction( + tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::GET_METADATA), kDefaultTimeout, + /*begin_commit_timestamp*/nullptr, &meta_pb)); + ASSERT_FALSE(meta_pb.has_aborted()); + ASSERT_FALSE(meta_pb.has_commit_mvcc_op_timestamp()); + ASSERT_FALSE(meta_pb.has_commit_timestamp()); + + ASSERT_OK(txn_client->ParticipateInTransaction( + tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::BEGIN_COMMIT), kDefaultTimeout)); + ASSERT_OK(txn_client->ParticipateInTransaction( + tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::GET_METADATA), kDefaultTimeout, + /*begin_commit_timestamp*/nullptr, &meta_pb)); + ASSERT_FALSE(meta_pb.has_aborted()); + ASSERT_TRUE(meta_pb.has_commit_mvcc_op_timestamp()); + ASSERT_FALSE(meta_pb.has_commit_timestamp()); + + ASSERT_OK(txn_client->ParticipateInTransaction( + tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::FINALIZE_COMMIT), kDefaultTimeout)); + ASSERT_OK(txn_client->ParticipateInTransaction( + tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::GET_METADATA), kDefaultTimeout, + /*begin_commit_timestamp*/nullptr, &meta_pb)); + ASSERT_FALSE(meta_pb.has_aborted()); + ASSERT_TRUE(meta_pb.has_commit_mvcc_op_timestamp()); + ASSERT_TRUE(meta_pb.has_commit_timestamp()); + + // Get abort-related metadata. + constexpr const auto kAbortedTxnId = 1; + ASSERT_OK(txn_client->ParticipateInTransaction( + tablet_id, MakeParticipantOp(kAbortedTxnId, ParticipantOpPB::BEGIN_TXN), kDefaultTimeout)); + ASSERT_OK(txn_client->ParticipateInTransaction( + tablet_id, MakeParticipantOp(kAbortedTxnId, ParticipantOpPB::ABORT_TXN), kDefaultTimeout)); + ASSERT_OK(txn_client->ParticipateInTransaction( + tablet_id, MakeParticipantOp(kAbortedTxnId, ParticipantOpPB::GET_METADATA), kDefaultTimeout, + /*begin_commit_timestamp*/nullptr, &meta_pb)); + ASSERT_TRUE(meta_pb.has_aborted()); + ASSERT_FALSE(meta_pb.has_commit_mvcc_op_timestamp()); + ASSERT_FALSE(meta_pb.has_commit_timestamp()); +} + // Test that we can start multiple transactions on the same participant. TEST_F(TxnParticipantITest, TestTxnSystemClientBeginTxnDoesntLock) { constexpr const int kLeaderIdx = 0; diff --git a/src/kudu/tablet/tablet_metadata.cc b/src/kudu/tablet/tablet_metadata.cc index fe321b2..6e635a5 100644 --- a/src/kudu/tablet/tablet_metadata.cc +++ b/src/kudu/tablet/tablet_metadata.cc @@ -654,7 +654,7 @@ Status TabletMetadata::UpdateUnlocked( } for (const auto& txn_id : txns_being_flushed) { auto txn_meta = FindOrDie(txn_metadata_by_txn_id_, txn_id); - txn_meta->set_flushed_committed_mrs_unlocked(); + txn_meta->set_flushed_committed_mrs(); } RowSetMetadataVector new_rowsets = rowsets_; @@ -746,22 +746,8 @@ Status TabletMetadata::ToSuperBlockUnlocked(TabletSuperBlockPB* super_block, } for (const auto& txn_id_and_metadata : txn_metadata_by_txn_id_) { - TxnMetadataPB meta_pb; const auto& txn_meta = txn_id_and_metadata.second; - const auto& commit_ts = txn_meta->commit_timestamp(); - if (commit_ts) { - meta_pb.set_commit_timestamp(commit_ts->value()); - } - const auto& commit_mvcc_op_ts = txn_meta->commit_mvcc_op_timestamp(); - if (commit_mvcc_op_ts) { - meta_pb.set_commit_mvcc_op_timestamp(commit_mvcc_op_ts->value()); - } - if (txn_meta->aborted()) { - meta_pb.set_aborted(true); - } - if (txn_meta->flushed_committed_mrs_unlocked()) { - meta_pb.set_flushed_committed_mrs(true); - } + TxnMetadataPB meta_pb = txn_meta->ToPB(); InsertOrDie(pb.mutable_txn_metadata(), txn_id_and_metadata.first, meta_pb); } @@ -898,7 +884,7 @@ void TabletMetadata::GetTxnIds(unordered_set<int64_t>* in_flight_txn_ids, // If we have not flushed the MRS after committing, the bootstrap process // will need to create an MRS for it, even if the transaction is committed. if (txn_ids_with_mrs && - !txn_meta->flushed_committed_mrs_unlocked() && + !txn_meta->flushed_committed_mrs() && !txn_meta->aborted()) { EmplaceOrDie(&needs_mrs, txn_id); } @@ -917,6 +903,16 @@ unordered_map<int64_t, scoped_refptr<TxnMetadata>> TabletMetadata::GetTxnMetadat return txn_metadata_by_txn_id_; } +bool TabletMetadata::GetTxnMetadataPB(int64_t txn_id, TxnMetadataPB* pb) const { + std::lock_guard<LockType> l(data_lock_); + auto txn_meta = FindPtrOrNull(txn_metadata_by_txn_id_, txn_id); + if (!txn_meta) { + return false; + } + *pb = txn_meta->ToPB(); + return true; +} + const RowSetMetadata *TabletMetadata::GetRowSetForTests(int64_t id) const { for (const shared_ptr<RowSetMetadata>& rowset_meta : rowsets_) { if (rowset_meta->id() == id) { diff --git a/src/kudu/tablet/tablet_metadata.h b/src/kudu/tablet/tablet_metadata.h index b9edeeb..55053b9 100644 --- a/src/kudu/tablet/tablet_metadata.h +++ b/src/kudu/tablet/tablet_metadata.h @@ -278,6 +278,10 @@ class TabletMetadata : public RefCountedThreadSafe<TabletMetadata> { bool HasTxnMetadata(int64_t txn_id, TxnState* state = nullptr, Timestamp* timestamp = nullptr); + // Populates 'pb' with metadata for the given transaction, returning false if + // the transaction doesn't exist and true otherwise. + bool GetTxnMetadataPB(int64_t txn_id, TxnMetadataPB* pb) const; + // Returns the transaction IDs that were persisted as being in-flight, // terminal (committed or aborted), and having un-flushed MRSs. void GetTxnIds(std::unordered_set<int64_t>* in_flight_txn_ids, diff --git a/src/kudu/tablet/txn_metadata.h b/src/kudu/tablet/txn_metadata.h index bcbeb0d..9debe6a 100644 --- a/src/kudu/tablet/txn_metadata.h +++ b/src/kudu/tablet/txn_metadata.h @@ -21,6 +21,7 @@ #include "kudu/common/timestamp.h" #include "kudu/gutil/ref_counted.h" +#include "kudu/tablet/metadata.pb.h" #include "kudu/util/locks.h" #include "kudu/util/mutex.h" @@ -39,13 +40,12 @@ class TxnMetadata : public RefCountedThreadSafe<TxnMetadata> { commit_timestamp_(std::move(commit_ts)), flushed_committed_mrs_(flushed_committed_mrs) {} - // NOTE: access to 'flushed_committed_mrs_' is not inherently threadsafe -- - // it is expected that the caller will ensure thread safety (e.g. - // TabletMetadata only calls this with its flush lock held). - void set_flushed_committed_mrs_unlocked() { + void set_flushed_committed_mrs() { + std::lock_guard<simple_spinlock> l(lock_); flushed_committed_mrs_ = true; } - bool flushed_committed_mrs_unlocked() const { + bool flushed_committed_mrs() const { + std::lock_guard<simple_spinlock> l(lock_); return flushed_committed_mrs_; } @@ -87,6 +87,24 @@ class TxnMetadata : public RefCountedThreadSafe<TxnMetadata> { *commit_ts = commit_timestamp_; } + TxnMetadataPB ToPB() { + TxnMetadataPB pb; + std::lock_guard<simple_spinlock> l(lock_); + if (commit_timestamp_) { + pb.set_commit_timestamp(commit_timestamp_->value()); + } + if (commit_mvcc_op_timestamp_) { + pb.set_commit_mvcc_op_timestamp(commit_mvcc_op_timestamp_->value()); + } + if (aborted_) { + pb.set_aborted(true); + } + if (flushed_committed_mrs_) { + pb.set_flushed_committed_mrs(true); + } + return pb; + } + private: friend class RefCountedThreadSafe<TxnMetadata>; ~TxnMetadata() = default; diff --git a/src/kudu/tablet/txn_participant-test.cc b/src/kudu/tablet/txn_participant-test.cc index 41ceb44..e1451ab 100644 --- a/src/kudu/tablet/txn_participant-test.cc +++ b/src/kudu/tablet/txn_participant-test.cc @@ -52,6 +52,7 @@ #include "kudu/gutil/map-util.h" #include "kudu/gutil/ref_counted.h" #include "kudu/gutil/strings/substitute.h" +#include "kudu/tablet/metadata.pb.h" #include "kudu/tablet/ops/op.h" #include "kudu/tablet/ops/op_driver.h" #include "kudu/tablet/ops/op_tracker.h" @@ -608,6 +609,59 @@ TEST_F(TxnParticipantTest, TestTakePartitionLockOnRestart) { ASSERT_OK(Write(3, kTxnTwo)); } +TEST_F(TxnParticipantTest, TestGetAbortedTxnMetadata) { + TxnMetadataPB pb; + const auto& meta = tablet_replica_->tablet_metadata(); + ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::ABORT_TXN, + kDummyCommitTimestamp)); + ASSERT_TRUE(meta->GetTxnMetadataPB(kTxnId, &pb)); + ASSERT_TRUE(pb.aborted()); + ASSERT_FALSE(pb.has_commit_mvcc_op_timestamp()); + ASSERT_FALSE(pb.has_commit_timestamp()); + ASSERT_FALSE(pb.has_flushed_committed_mrs()); +} + +TEST_F(TxnParticipantTest, TestGetCommittedTxnMetadata) { + TxnMetadataPB pb; + const auto& meta = tablet_replica_->tablet_metadata(); + ASSERT_FALSE(meta->GetTxnMetadataPB(kTxnId, &pb)); + ASSERT_FALSE(pb.has_aborted()); + ASSERT_FALSE(pb.has_commit_mvcc_op_timestamp()); + ASSERT_FALSE(pb.has_commit_timestamp()); + ASSERT_FALSE(pb.has_flushed_committed_mrs()); + + ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_TXN, + kDummyCommitTimestamp)); + // There will be metadata, but it will be empty since there is not + // commit/abort information yet. + ASSERT_TRUE(meta->GetTxnMetadataPB(kTxnId, &pb)); + ASSERT_FALSE(pb.has_aborted()); + ASSERT_FALSE(pb.has_commit_mvcc_op_timestamp()); + ASSERT_FALSE(pb.has_commit_timestamp()); + ASSERT_FALSE(pb.has_flushed_committed_mrs()); + + // Once we begin committing, we should see the BEGIN_COMMIT op timestamp. + ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_COMMIT, + kDummyCommitTimestamp)); + ASSERT_TRUE(meta->GetTxnMetadataPB(kTxnId, &pb)); + ASSERT_FALSE(pb.has_aborted()); + ASSERT_TRUE(pb.has_commit_mvcc_op_timestamp()); + ASSERT_LT(0, pb.commit_mvcc_op_timestamp()); + ASSERT_FALSE(pb.has_commit_timestamp()); + ASSERT_FALSE(pb.has_flushed_committed_mrs()); + + // Once we finalize the commit, we should see a commit timestamp. + ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::FINALIZE_COMMIT, + kDummyCommitTimestamp)); + ASSERT_TRUE(meta->GetTxnMetadataPB(kTxnId, &pb)); + ASSERT_FALSE(pb.has_aborted()); + ASSERT_TRUE(pb.has_commit_mvcc_op_timestamp()); + ASSERT_LT(0, pb.commit_mvcc_op_timestamp()); + ASSERT_TRUE(pb.has_commit_timestamp()); + ASSERT_EQ(kDummyCommitTimestamp, pb.commit_timestamp()); + ASSERT_FALSE(pb.has_flushed_committed_mrs()); +} + // Test that participant ops result in tablet metadata updates that can survive // restarts, and that the appropriate anchors are in place as we progress // through a transaction's life cycle. diff --git a/src/kudu/transactions/participant_rpc.cc b/src/kudu/transactions/participant_rpc.cc index 1cc76f6..091a547 100644 --- a/src/kudu/transactions/participant_rpc.cc +++ b/src/kudu/transactions/participant_rpc.cc @@ -38,6 +38,7 @@ #include "kudu/rpc/rpc.h" #include "kudu/rpc/rpc_controller.h" #include "kudu/rpc/rpc_header.pb.h" +#include "kudu/tablet/metadata.pb.h" #include "kudu/tserver/tserver.pb.h" #include "kudu/tserver/tserver_admin.pb.h" #include "kudu/tserver/tserver_admin.proxy.h" @@ -53,6 +54,7 @@ using kudu::rpc::ErrorStatusPB; using kudu::rpc::ResponseCallback; using kudu::rpc::RetriableRpc; using kudu::rpc::RetriableRpcStatus; +using kudu::tablet::TxnMetadataPB; using kudu::tserver::TabletServerErrorPB; using std::string; using std::unique_ptr; @@ -67,7 +69,8 @@ ParticipantRpc* ParticipantRpc::NewRpc( unique_ptr<TxnParticipantContext> ctx, const MonoTime& deadline, StatusCallback user_cb, - Timestamp* begin_commit_timestamp) { + Timestamp* begin_commit_timestamp, + TxnMetadataPB* metadata_pb) { scoped_refptr<MetaCacheServerPicker> server_picker( new MetaCacheServerPicker(ctx->client, ctx->client->data_->meta_cache_, @@ -77,20 +80,23 @@ ParticipantRpc* ParticipantRpc::NewRpc( std::move(server_picker), deadline, std::move(user_cb), - begin_commit_timestamp); + begin_commit_timestamp, + metadata_pb); } ParticipantRpc::ParticipantRpc(unique_ptr<TxnParticipantContext> ctx, scoped_refptr<MetaCacheServerPicker> replica_picker, const MonoTime& deadline, StatusCallback user_cb, - Timestamp* begin_commit_timestamp) + Timestamp* begin_commit_timestamp, + TxnMetadataPB* metadata_pb) : RetriableRpc(std::move(replica_picker), ctx->client->data_->request_tracker_, deadline, ctx->client->data_->messenger_), client_(ctx->client), tablet_(std::move(ctx->tablet)), user_cb_(std::move(user_cb)), - begin_commit_timestamp_(begin_commit_timestamp) { + begin_commit_timestamp_(begin_commit_timestamp), + metadata_pb_(metadata_pb) { req_.set_tablet_id(tablet_->tablet_id()); *req_.mutable_op() = std::move(ctx->participant_op); } @@ -223,8 +229,13 @@ RetriableRpcStatus ParticipantRpc::AnalyzeResponse(const Status& rpc_cb_status) void ParticipantRpc::Finish(const Status& status) { // Free memory upon completion. unique_ptr<ParticipantRpc> delete_me(this); - if (status.ok() && begin_commit_timestamp_ && resp_.has_timestamp()) { - *begin_commit_timestamp_ = Timestamp(resp_.timestamp()); + if (status.ok()) { + if (begin_commit_timestamp_ && resp_.has_timestamp()) { + *begin_commit_timestamp_ = Timestamp(resp_.timestamp()); + } + if (metadata_pb_ && resp_.has_metadata()) { + *metadata_pb_ = resp_.metadata(); + } } user_cb_(status); } diff --git a/src/kudu/transactions/participant_rpc.h b/src/kudu/transactions/participant_rpc.h index f35c45f..dd4a8d0 100644 --- a/src/kudu/transactions/participant_rpc.h +++ b/src/kudu/transactions/participant_rpc.h @@ -38,6 +38,10 @@ namespace client { class KuduClient; } // namespace client +namespace tablet { +class TxnMetadataPB; +} // namespace tablet + namespace transactions { // Context to be used when sending RPCs to specific tablets. @@ -59,7 +63,8 @@ class ParticipantRpc final : public rpc::RetriableRpc<client::internal::RemoteTa static ParticipantRpc* NewRpc(std::unique_ptr<TxnParticipantContext> ctx, const MonoTime& deadline, StatusCallback user_cb, - Timestamp* begin_commit_timestamp = nullptr); + Timestamp* begin_commit_timestamp = nullptr, + tablet::TxnMetadataPB* metadata_pb = nullptr); ~ParticipantRpc() {} std::string ToString() const override; @@ -79,12 +84,14 @@ class ParticipantRpc final : public rpc::RetriableRpc<client::internal::RemoteTa scoped_refptr<client::internal::MetaCacheServerPicker> replica_picker, const MonoTime& deadline, StatusCallback user_cb, - Timestamp* begin_commit_timestamp); + Timestamp* begin_commit_timestamp, + tablet::TxnMetadataPB* metadata_pb); client::KuduClient* client_; scoped_refptr<client::internal::RemoteTablet> tablet_; const StatusCallback user_cb_; Timestamp* begin_commit_timestamp_; + tablet::TxnMetadataPB* metadata_pb_; }; } // namespace transactions diff --git a/src/kudu/transactions/txn_system_client.cc b/src/kudu/transactions/txn_system_client.cc index 83e9ead..28bf23d 100644 --- a/src/kudu/transactions/txn_system_client.cc +++ b/src/kudu/transactions/txn_system_client.cc @@ -81,6 +81,7 @@ using kudu::master::PingRequestPB; using kudu::master::PingResponsePB; using kudu::rpc::Messenger; using kudu::rpc::RpcController; +using kudu::tablet::TxnMetadataPB; using kudu::tserver::CoordinatorOpPB; using kudu::tserver::CoordinatorOpResultPB; using kudu::tserver::ParticipantOpPB; @@ -370,10 +371,11 @@ Status TxnSystemClient::CoordinateTransactionAsync(CoordinatorOpPB coordinate_tx Status TxnSystemClient::ParticipateInTransaction(const string& tablet_id, const ParticipantOpPB& participant_op, const MonoDelta& timeout, - Timestamp* begin_commit_timestamp) { + Timestamp* begin_commit_timestamp, + TxnMetadataPB* metadata_pb) { Synchronizer sync; ParticipateInTransactionAsync(tablet_id, participant_op, timeout, - sync.AsStatusCallback(), begin_commit_timestamp); + sync.AsStatusCallback(), begin_commit_timestamp, metadata_pb); return sync.Wait(); } @@ -381,7 +383,8 @@ void TxnSystemClient::ParticipateInTransactionAsync(const string& tablet_id, ParticipantOpPB participant_op, const MonoDelta& timeout, StatusCallback cb, - Timestamp* begin_commit_timestamp) { + Timestamp* begin_commit_timestamp, + TxnMetadataPB* metadata_pb) { MonoTime deadline = MonoTime::Now() + timeout; unique_ptr<TxnParticipantContext> ctx( new TxnParticipantContext({ @@ -396,7 +399,8 @@ void TxnSystemClient::ParticipateInTransactionAsync(const string& tablet_id, // See https://taylorconor.com/blog/noncopyable-lambdas/ for more details. client_->data_->meta_cache_->LookupTabletById( client_.get(), tablet_id, deadline, &ctx_raw->tablet, - [cb = std::move(cb), deadline, ctx_raw, begin_commit_timestamp] (const Status& s) mutable { + [cb = std::move(cb), deadline, ctx_raw, begin_commit_timestamp, metadata_pb] + (const Status& s) mutable { unique_ptr<TxnParticipantContext> unique_ctx(ctx_raw); if (PREDICT_FALSE(!s.ok())) { cb(s); @@ -406,7 +410,8 @@ void TxnSystemClient::ParticipateInTransactionAsync(const string& tablet_id, std::move(unique_ctx), deadline, std::move(cb), - begin_commit_timestamp); + begin_commit_timestamp, + metadata_pb); rpc->SendRpc(); }); } diff --git a/src/kudu/transactions/txn_system_client.h b/src/kudu/transactions/txn_system_client.h index 6b4c193..6c919a8 100644 --- a/src/kudu/transactions/txn_system_client.h +++ b/src/kudu/transactions/txn_system_client.h @@ -52,6 +52,10 @@ namespace rpc { class Messenger; } // namespace rpc +namespace tablet { +class TxnMetadataPB; +} // namespace tablet + namespace tserver { class CoordinatorOpPB; class CoordinatorOpResultPB; @@ -151,12 +155,14 @@ class TxnSystemClient { Status ParticipateInTransaction(const std::string& tablet_id, const tserver::ParticipantOpPB& participant_op, const MonoDelta& timeout, - Timestamp* begin_commit_timestamp = nullptr); + Timestamp* begin_commit_timestamp = nullptr, + tablet::TxnMetadataPB* metadata_pb = nullptr); void ParticipateInTransactionAsync(const std::string& tablet_id, tserver::ParticipantOpPB participant_op, const MonoDelta& timeout, StatusCallback cb, - Timestamp* begin_commit_timestamp = nullptr); + Timestamp* begin_commit_timestamp = nullptr, + tablet::TxnMetadataPB* metadata_pb = nullptr); private: friend class itest::TxnStatusTableITest; diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc index 4cd699f..8adc52b 100644 --- a/src/kudu/tserver/tablet_service.cc +++ b/src/kudu/tserver/tablet_service.cc @@ -234,6 +234,7 @@ using kudu::tablet::TABLET_DATA_TOMBSTONED; using kudu::tablet::Tablet; using kudu::tablet::TabletReplica; using kudu::tablet::TabletStatePB; +using kudu::tablet::TxnMetadataPB;; using kudu::tablet::WriteAuthorizationContext; using kudu::tablet::WriteOpState; using kudu::tablet::WritePrivilegeType; @@ -1336,6 +1337,15 @@ void TabletServiceAdminImpl::CoordinateTransaction(const CoordinateTransactionRe void TabletServiceAdminImpl::ParticipateInTransaction(const ParticipantRequestPB* req, ParticipantResponsePB* resp, RpcContext* context) { + if (!req->has_op() || !req->op().has_type() || !req->op().has_txn_id() || + !req->has_tablet_id()) { + Status s = Status::InvalidArgument( + Substitute("Missing fields in request: $0", SecureShortDebugString(*req))); + SetupErrorAndRespond(resp->mutable_error(), s, + TabletServerErrorPB::UNKNOWN_ERROR, + context); + return; + } scoped_refptr<TabletReplica> replica; if (!LookupRunningTabletReplicaOrRespond(server_->tablet_manager(), req->tablet_id(), resp, context, &replica)) { @@ -1348,6 +1358,23 @@ void TabletServiceAdminImpl::ParticipateInTransaction(const ParticipantRequestPB SetupErrorAndRespond(resp->mutable_error(), s, error_code, context); return; } + const auto& op = req->op(); + if (op.type() == ParticipantOpPB::GET_METADATA) { + TxnMetadataPB pb; + const auto* meta = tablet->metadata(); + if (meta->GetTxnMetadataPB(op.txn_id(), &pb)) { + *resp->mutable_metadata() = std::move(pb); + context->RespondSuccess(); + return; + } + SetupErrorAndRespond( + resp->mutable_error(), + Status::InvalidArgument(Substitute("txn ID $0 has no metadata", op.txn_id())), + TabletServerErrorPB::UNKNOWN_ERROR, + context); + return; + } + // TODO(awong): consider memory-based throttling? // TODO(awong): we should also persist the transaction's owner, and prevent // other users from mutating it. diff --git a/src/kudu/tserver/tserver_admin.proto b/src/kudu/tserver/tserver_admin.proto index 9572318..ea9435b 100644 --- a/src/kudu/tserver/tserver_admin.proto +++ b/src/kudu/tserver/tserver_admin.proto @@ -102,6 +102,9 @@ message ParticipantOpPB { BEGIN_COMMIT = 2; FINALIZE_COMMIT = 3; ABORT_TXN = 4; + + // Get metadata about the given participant. + GET_METADATA = 5; } optional ParticipantOpType type = 1; optional int64 txn_id = 2; @@ -120,6 +123,10 @@ message ParticipantResponsePB { // The timestamp chosen by the server for this participant op. optional fixed64 timestamp = 2; + + // Metadata returned for a GET_METADATA op. May be absent if no metadata + // existed on the given participant for the given transaction. + optional tablet.TxnMetadataPB metadata = 3; } message AlterSchemaRequestPB {
