This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch branch-1.15.x
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit b031fd11783d370663c8bcb220c2da3ab91d99cd
Author: Andrew Wong <[email protected]>
AuthorDate: Wed May 12 18:40:59 2021 -0700

    [txns] add a means to get transaction metadata from participant
    
    This patch adds a GET_METADATA participant RPC and a method to call it
    through the TxnSystemClient. The RPC currently returns the TxnMetadataPB
    for the given transaction, giving a bit of insight into the state of the
    given participant.
    
    I will use this in an upcoming tool that drills into a transaction and
    its participants.
    
    Change-Id: I4b03f13f174bd9a83609fb7ed6106746777b4207
    Reviewed-on: http://gerrit.cloudera.org:8080/17446
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <[email protected]>
    Reviewed-on: http://gerrit.cloudera.org:8080/17467
    Reviewed-by: Bankim Bhavsar <[email protected]>
    Tested-by: Andrew Wong <[email protected]>
---
 .../integration-tests/txn_participant-itest.cc     | 62 +++++++++++++++++++++-
 src/kudu/tablet/tablet_metadata.cc                 | 30 +++++------
 src/kudu/tablet/tablet_metadata.h                  |  4 ++
 src/kudu/tablet/txn_metadata.h                     | 28 ++++++++--
 src/kudu/tablet/txn_participant-test.cc            | 54 +++++++++++++++++++
 src/kudu/transactions/participant_rpc.cc           | 23 +++++---
 src/kudu/transactions/participant_rpc.h            | 11 +++-
 src/kudu/transactions/txn_system_client.cc         | 15 ++++--
 src/kudu/transactions/txn_system_client.h          | 10 +++-
 src/kudu/tserver/tablet_service.cc                 | 27 ++++++++++
 src/kudu/tserver/tserver_admin.proto               |  7 +++
 11 files changed, 232 insertions(+), 39 deletions(-)

diff --git a/src/kudu/integration-tests/txn_participant-itest.cc 
b/src/kudu/integration-tests/txn_participant-itest.cc
index 46f6bc9..a8e7267 100644
--- a/src/kudu/integration-tests/txn_participant-itest.cc
+++ b/src/kudu/integration-tests/txn_participant-itest.cc
@@ -34,10 +34,10 @@
 #include "kudu/clock/clock.h"
 #include "kudu/common/partial_row.h"
 #include "kudu/common/row_operations.h"
+#include "kudu/common/row_operations.pb.h"
 #include "kudu/common/timestamp.h"
 #include "kudu/common/wire_protocol-test-util.h"
 #include "kudu/common/wire_protocol.h"
-#include "kudu/common/wire_protocol.pb.h"
 #include "kudu/consensus/raft_consensus.h"
 #include "kudu/consensus/time_manager.h"
 #include "kudu/gutil/ref_counted.h"
@@ -98,7 +98,7 @@ using kudu::tablet::kInitializing;
 using kudu::tablet::kOpen;
 using kudu::tablet::MakeParticipantOp;
 using kudu::tablet::TabletReplica;
-using kudu::tablet::TxnState;
+using kudu::tablet::TxnMetadataPB;
 using kudu::tablet::TxnParticipant;
 using kudu::tablet::TxnState;
 using kudu::transactions::TxnSystemClient;
@@ -831,6 +831,64 @@ TEST_F(TxnParticipantITest, TestProxyTabletNotFound) {
   }
 }
 
+TEST_F(TxnParticipantITest, TestTxnSystemClientGetMetadata) {
+  unique_ptr<TxnSystemClient> txn_client;
+  ASSERT_OK(TxnSystemClient::Create(cluster_->master_rpc_addrs(), 
&txn_client));
+  constexpr const auto kTxnId = 0;
+  constexpr const int kLeaderIdx = 0;
+  vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx);
+  auto* leader_replica = replicas[kLeaderIdx];
+  const auto tablet_id = leader_replica->tablet_id();
+  
ASSERT_OK(leader_replica->consensus()->WaitUntilLeaderForTests(kDefaultTimeout));
+
+  // Get commit-related metadata.
+  TxnMetadataPB meta_pb;
+  Status s = txn_client->ParticipateInTransaction(
+      tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::GET_METADATA), 
kDefaultTimeout,
+      /*begin_commit_timestamp*/nullptr, &meta_pb);
+  ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+
+  ASSERT_OK(txn_client->ParticipateInTransaction(
+      tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::BEGIN_TXN), 
kDefaultTimeout));
+  ASSERT_OK(txn_client->ParticipateInTransaction(
+      tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::GET_METADATA), 
kDefaultTimeout,
+      /*begin_commit_timestamp*/nullptr, &meta_pb));
+  ASSERT_FALSE(meta_pb.has_aborted());
+  ASSERT_FALSE(meta_pb.has_commit_mvcc_op_timestamp());
+  ASSERT_FALSE(meta_pb.has_commit_timestamp());
+
+  ASSERT_OK(txn_client->ParticipateInTransaction(
+      tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::BEGIN_COMMIT), 
kDefaultTimeout));
+  ASSERT_OK(txn_client->ParticipateInTransaction(
+      tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::GET_METADATA), 
kDefaultTimeout,
+      /*begin_commit_timestamp*/nullptr, &meta_pb));
+  ASSERT_FALSE(meta_pb.has_aborted());
+  ASSERT_TRUE(meta_pb.has_commit_mvcc_op_timestamp());
+  ASSERT_FALSE(meta_pb.has_commit_timestamp());
+
+  ASSERT_OK(txn_client->ParticipateInTransaction(
+      tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::FINALIZE_COMMIT), 
kDefaultTimeout));
+  ASSERT_OK(txn_client->ParticipateInTransaction(
+      tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::GET_METADATA), 
kDefaultTimeout,
+      /*begin_commit_timestamp*/nullptr, &meta_pb));
+  ASSERT_FALSE(meta_pb.has_aborted());
+  ASSERT_TRUE(meta_pb.has_commit_mvcc_op_timestamp());
+  ASSERT_TRUE(meta_pb.has_commit_timestamp());
+
+  // Get abort-related metadata.
+  constexpr const auto kAbortedTxnId = 1;
+  ASSERT_OK(txn_client->ParticipateInTransaction(
+      tablet_id, MakeParticipantOp(kAbortedTxnId, ParticipantOpPB::BEGIN_TXN), 
kDefaultTimeout));
+  ASSERT_OK(txn_client->ParticipateInTransaction(
+      tablet_id, MakeParticipantOp(kAbortedTxnId, ParticipantOpPB::ABORT_TXN), 
kDefaultTimeout));
+  ASSERT_OK(txn_client->ParticipateInTransaction(
+      tablet_id, MakeParticipantOp(kAbortedTxnId, 
ParticipantOpPB::GET_METADATA), kDefaultTimeout,
+      /*begin_commit_timestamp*/nullptr, &meta_pb));
+  ASSERT_TRUE(meta_pb.has_aborted());
+  ASSERT_FALSE(meta_pb.has_commit_mvcc_op_timestamp());
+  ASSERT_FALSE(meta_pb.has_commit_timestamp());
+}
+
 // Test that we can start multiple transactions on the same participant.
 TEST_F(TxnParticipantITest, TestTxnSystemClientBeginTxnDoesntLock) {
   constexpr const int kLeaderIdx = 0;
diff --git a/src/kudu/tablet/tablet_metadata.cc 
b/src/kudu/tablet/tablet_metadata.cc
index fe321b2..6e635a5 100644
--- a/src/kudu/tablet/tablet_metadata.cc
+++ b/src/kudu/tablet/tablet_metadata.cc
@@ -654,7 +654,7 @@ Status TabletMetadata::UpdateUnlocked(
   }
   for (const auto& txn_id : txns_being_flushed) {
     auto txn_meta = FindOrDie(txn_metadata_by_txn_id_, txn_id);
-    txn_meta->set_flushed_committed_mrs_unlocked();
+    txn_meta->set_flushed_committed_mrs();
   }
 
   RowSetMetadataVector new_rowsets = rowsets_;
@@ -746,22 +746,8 @@ Status 
TabletMetadata::ToSuperBlockUnlocked(TabletSuperBlockPB* super_block,
   }
 
   for (const auto& txn_id_and_metadata : txn_metadata_by_txn_id_) {
-    TxnMetadataPB meta_pb;
     const auto& txn_meta = txn_id_and_metadata.second;
-    const auto& commit_ts = txn_meta->commit_timestamp();
-    if (commit_ts) {
-      meta_pb.set_commit_timestamp(commit_ts->value());
-    }
-    const auto& commit_mvcc_op_ts = txn_meta->commit_mvcc_op_timestamp();
-    if (commit_mvcc_op_ts) {
-      meta_pb.set_commit_mvcc_op_timestamp(commit_mvcc_op_ts->value());
-    }
-    if (txn_meta->aborted()) {
-      meta_pb.set_aborted(true);
-    }
-    if (txn_meta->flushed_committed_mrs_unlocked()) {
-      meta_pb.set_flushed_committed_mrs(true);
-    }
+    TxnMetadataPB meta_pb = txn_meta->ToPB();
     InsertOrDie(pb.mutable_txn_metadata(), txn_id_and_metadata.first, meta_pb);
   }
 
@@ -898,7 +884,7 @@ void TabletMetadata::GetTxnIds(unordered_set<int64_t>* 
in_flight_txn_ids,
     // If we have not flushed the MRS after committing, the bootstrap process
     // will need to create an MRS for it, even if the transaction is committed.
     if (txn_ids_with_mrs &&
-        !txn_meta->flushed_committed_mrs_unlocked() &&
+        !txn_meta->flushed_committed_mrs() &&
         !txn_meta->aborted()) {
       EmplaceOrDie(&needs_mrs, txn_id);
     }
@@ -917,6 +903,16 @@ unordered_map<int64_t, scoped_refptr<TxnMetadata>> 
TabletMetadata::GetTxnMetadat
   return txn_metadata_by_txn_id_;
 }
 
+bool TabletMetadata::GetTxnMetadataPB(int64_t txn_id, TxnMetadataPB* pb) const 
{
+  std::lock_guard<LockType> l(data_lock_);
+  auto txn_meta = FindPtrOrNull(txn_metadata_by_txn_id_, txn_id);
+  if (!txn_meta) {
+    return false;
+  }
+  *pb = txn_meta->ToPB();
+  return true;
+}
+
 const RowSetMetadata *TabletMetadata::GetRowSetForTests(int64_t id) const {
   for (const shared_ptr<RowSetMetadata>& rowset_meta : rowsets_) {
     if (rowset_meta->id() == id) {
diff --git a/src/kudu/tablet/tablet_metadata.h 
b/src/kudu/tablet/tablet_metadata.h
index b9edeeb..55053b9 100644
--- a/src/kudu/tablet/tablet_metadata.h
+++ b/src/kudu/tablet/tablet_metadata.h
@@ -278,6 +278,10 @@ class TabletMetadata : public 
RefCountedThreadSafe<TabletMetadata> {
   bool HasTxnMetadata(int64_t txn_id, TxnState* state = nullptr,
                       Timestamp* timestamp = nullptr);
 
+  // Populates 'pb' with metadata for the given transaction, returning false if
+  // the transaction doesn't exist and true otherwise.
+  bool GetTxnMetadataPB(int64_t txn_id, TxnMetadataPB* pb) const;
+
   // Returns the transaction IDs that were persisted as being in-flight,
   // terminal (committed or aborted), and having un-flushed MRSs.
   void GetTxnIds(std::unordered_set<int64_t>* in_flight_txn_ids,
diff --git a/src/kudu/tablet/txn_metadata.h b/src/kudu/tablet/txn_metadata.h
index bcbeb0d..9debe6a 100644
--- a/src/kudu/tablet/txn_metadata.h
+++ b/src/kudu/tablet/txn_metadata.h
@@ -21,6 +21,7 @@
 
 #include "kudu/common/timestamp.h"
 #include "kudu/gutil/ref_counted.h"
+#include "kudu/tablet/metadata.pb.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/mutex.h"
 
@@ -39,13 +40,12 @@ class TxnMetadata : public 
RefCountedThreadSafe<TxnMetadata> {
         commit_timestamp_(std::move(commit_ts)),
         flushed_committed_mrs_(flushed_committed_mrs) {}
 
-  // NOTE: access to 'flushed_committed_mrs_' is not inherently threadsafe --
-  // it is expected that the caller will ensure thread safety (e.g.
-  // TabletMetadata only calls this with its flush lock held).
-  void set_flushed_committed_mrs_unlocked() {
+  void set_flushed_committed_mrs() {
+    std::lock_guard<simple_spinlock> l(lock_);
     flushed_committed_mrs_ = true;
   }
-  bool flushed_committed_mrs_unlocked() const {
+  bool flushed_committed_mrs() const {
+    std::lock_guard<simple_spinlock> l(lock_);
     return flushed_committed_mrs_;
   }
 
@@ -87,6 +87,24 @@ class TxnMetadata : public RefCountedThreadSafe<TxnMetadata> 
{
     *commit_ts = commit_timestamp_;
   }
 
+  TxnMetadataPB ToPB() {
+    TxnMetadataPB pb;
+    std::lock_guard<simple_spinlock> l(lock_);
+    if (commit_timestamp_) {
+      pb.set_commit_timestamp(commit_timestamp_->value());
+    }
+    if (commit_mvcc_op_timestamp_) {
+      pb.set_commit_mvcc_op_timestamp(commit_mvcc_op_timestamp_->value());
+    }
+    if (aborted_) {
+      pb.set_aborted(true);
+    }
+    if (flushed_committed_mrs_) {
+      pb.set_flushed_committed_mrs(true);
+    }
+    return pb;
+  }
+
  private:
   friend class RefCountedThreadSafe<TxnMetadata>;
   ~TxnMetadata() = default;
diff --git a/src/kudu/tablet/txn_participant-test.cc 
b/src/kudu/tablet/txn_participant-test.cc
index 41ceb44..e1451ab 100644
--- a/src/kudu/tablet/txn_participant-test.cc
+++ b/src/kudu/tablet/txn_participant-test.cc
@@ -52,6 +52,7 @@
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/tablet/metadata.pb.h"
 #include "kudu/tablet/ops/op.h"
 #include "kudu/tablet/ops/op_driver.h"
 #include "kudu/tablet/ops/op_tracker.h"
@@ -608,6 +609,59 @@ TEST_F(TxnParticipantTest, TestTakePartitionLockOnRestart) 
{
   ASSERT_OK(Write(3, kTxnTwo));
 }
 
+TEST_F(TxnParticipantTest, TestGetAbortedTxnMetadata) {
+  TxnMetadataPB pb;
+  const auto& meta = tablet_replica_->tablet_metadata();
+  ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::ABORT_TXN,
+                                       kDummyCommitTimestamp));
+  ASSERT_TRUE(meta->GetTxnMetadataPB(kTxnId, &pb));
+  ASSERT_TRUE(pb.aborted());
+  ASSERT_FALSE(pb.has_commit_mvcc_op_timestamp());
+  ASSERT_FALSE(pb.has_commit_timestamp());
+  ASSERT_FALSE(pb.has_flushed_committed_mrs());
+}
+
+TEST_F(TxnParticipantTest, TestGetCommittedTxnMetadata) {
+  TxnMetadataPB pb;
+  const auto& meta = tablet_replica_->tablet_metadata();
+  ASSERT_FALSE(meta->GetTxnMetadataPB(kTxnId, &pb));
+  ASSERT_FALSE(pb.has_aborted());
+  ASSERT_FALSE(pb.has_commit_mvcc_op_timestamp());
+  ASSERT_FALSE(pb.has_commit_timestamp());
+  ASSERT_FALSE(pb.has_flushed_committed_mrs());
+
+  ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_TXN,
+                                       kDummyCommitTimestamp));
+  // There will be metadata, but it will be empty since there is not
+  // commit/abort information yet.
+  ASSERT_TRUE(meta->GetTxnMetadataPB(kTxnId, &pb));
+  ASSERT_FALSE(pb.has_aborted());
+  ASSERT_FALSE(pb.has_commit_mvcc_op_timestamp());
+  ASSERT_FALSE(pb.has_commit_timestamp());
+  ASSERT_FALSE(pb.has_flushed_committed_mrs());
+
+  // Once we begin committing, we should see the BEGIN_COMMIT op timestamp.
+  ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_COMMIT,
+                                       kDummyCommitTimestamp));
+  ASSERT_TRUE(meta->GetTxnMetadataPB(kTxnId, &pb));
+  ASSERT_FALSE(pb.has_aborted());
+  ASSERT_TRUE(pb.has_commit_mvcc_op_timestamp());
+  ASSERT_LT(0, pb.commit_mvcc_op_timestamp());
+  ASSERT_FALSE(pb.has_commit_timestamp());
+  ASSERT_FALSE(pb.has_flushed_committed_mrs());
+
+  // Once we finalize the commit, we should see a commit timestamp.
+  ASSERT_OK(CallParticipantOpCheckResp(kTxnId, 
ParticipantOpPB::FINALIZE_COMMIT,
+                                       kDummyCommitTimestamp));
+  ASSERT_TRUE(meta->GetTxnMetadataPB(kTxnId, &pb));
+  ASSERT_FALSE(pb.has_aborted());
+  ASSERT_TRUE(pb.has_commit_mvcc_op_timestamp());
+  ASSERT_LT(0, pb.commit_mvcc_op_timestamp());
+  ASSERT_TRUE(pb.has_commit_timestamp());
+  ASSERT_EQ(kDummyCommitTimestamp, pb.commit_timestamp());
+  ASSERT_FALSE(pb.has_flushed_committed_mrs());
+}
+
 // Test that participant ops result in tablet metadata updates that can survive
 // restarts, and that the appropriate anchors are in place as we progress
 // through a transaction's life cycle.
diff --git a/src/kudu/transactions/participant_rpc.cc 
b/src/kudu/transactions/participant_rpc.cc
index 1cc76f6..091a547 100644
--- a/src/kudu/transactions/participant_rpc.cc
+++ b/src/kudu/transactions/participant_rpc.cc
@@ -38,6 +38,7 @@
 #include "kudu/rpc/rpc.h"
 #include "kudu/rpc/rpc_controller.h"
 #include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/tablet/metadata.pb.h"
 #include "kudu/tserver/tserver.pb.h"
 #include "kudu/tserver/tserver_admin.pb.h"
 #include "kudu/tserver/tserver_admin.proxy.h"
@@ -53,6 +54,7 @@ using kudu::rpc::ErrorStatusPB;
 using kudu::rpc::ResponseCallback;
 using kudu::rpc::RetriableRpc;
 using kudu::rpc::RetriableRpcStatus;
+using kudu::tablet::TxnMetadataPB;
 using kudu::tserver::TabletServerErrorPB;
 using std::string;
 using std::unique_ptr;
@@ -67,7 +69,8 @@ ParticipantRpc* ParticipantRpc::NewRpc(
     unique_ptr<TxnParticipantContext> ctx,
     const MonoTime& deadline,
     StatusCallback user_cb,
-    Timestamp* begin_commit_timestamp) {
+    Timestamp* begin_commit_timestamp,
+    TxnMetadataPB* metadata_pb) {
   scoped_refptr<MetaCacheServerPicker> server_picker(
       new MetaCacheServerPicker(ctx->client,
                                 ctx->client->data_->meta_cache_,
@@ -77,20 +80,23 @@ ParticipantRpc* ParticipantRpc::NewRpc(
                             std::move(server_picker),
                             deadline,
                             std::move(user_cb),
-                            begin_commit_timestamp);
+                            begin_commit_timestamp,
+                            metadata_pb);
 }
 
 ParticipantRpc::ParticipantRpc(unique_ptr<TxnParticipantContext> ctx,
                                scoped_refptr<MetaCacheServerPicker> 
replica_picker,
                                const MonoTime& deadline,
                                StatusCallback user_cb,
-                               Timestamp* begin_commit_timestamp)
+                               Timestamp* begin_commit_timestamp,
+                               TxnMetadataPB* metadata_pb)
     : RetriableRpc(std::move(replica_picker), 
ctx->client->data_->request_tracker_,
                    deadline, ctx->client->data_->messenger_),
       client_(ctx->client),
       tablet_(std::move(ctx->tablet)),
       user_cb_(std::move(user_cb)),
-      begin_commit_timestamp_(begin_commit_timestamp) {
+      begin_commit_timestamp_(begin_commit_timestamp),
+      metadata_pb_(metadata_pb) {
   req_.set_tablet_id(tablet_->tablet_id());
   *req_.mutable_op() = std::move(ctx->participant_op);
 }
@@ -223,8 +229,13 @@ RetriableRpcStatus ParticipantRpc::AnalyzeResponse(const 
Status& rpc_cb_status)
 void ParticipantRpc::Finish(const Status& status) {
   // Free memory upon completion.
   unique_ptr<ParticipantRpc> delete_me(this);
-  if (status.ok() && begin_commit_timestamp_ && resp_.has_timestamp()) {
-    *begin_commit_timestamp_ = Timestamp(resp_.timestamp());
+  if (status.ok()) {
+    if (begin_commit_timestamp_ && resp_.has_timestamp()) {
+      *begin_commit_timestamp_ = Timestamp(resp_.timestamp());
+    }
+    if (metadata_pb_ && resp_.has_metadata()) {
+      *metadata_pb_ = resp_.metadata();
+    }
   }
   user_cb_(status);
 }
diff --git a/src/kudu/transactions/participant_rpc.h 
b/src/kudu/transactions/participant_rpc.h
index f35c45f..dd4a8d0 100644
--- a/src/kudu/transactions/participant_rpc.h
+++ b/src/kudu/transactions/participant_rpc.h
@@ -38,6 +38,10 @@ namespace client {
 class KuduClient;
 } // namespace client
 
+namespace tablet {
+class TxnMetadataPB;
+} // namespace tablet
+
 namespace transactions {
 
 // Context to be used when sending RPCs to specific tablets.
@@ -59,7 +63,8 @@ class ParticipantRpc final : public 
rpc::RetriableRpc<client::internal::RemoteTa
   static ParticipantRpc* NewRpc(std::unique_ptr<TxnParticipantContext> ctx,
                                 const MonoTime& deadline,
                                 StatusCallback user_cb,
-                                Timestamp* begin_commit_timestamp = nullptr);
+                                Timestamp* begin_commit_timestamp = nullptr,
+                                tablet::TxnMetadataPB* metadata_pb = nullptr);
   ~ParticipantRpc() {}
   std::string ToString() const override;
 
@@ -79,12 +84,14 @@ class ParticipantRpc final : public 
rpc::RetriableRpc<client::internal::RemoteTa
                  scoped_refptr<client::internal::MetaCacheServerPicker> 
replica_picker,
                  const MonoTime& deadline,
                  StatusCallback user_cb,
-                 Timestamp* begin_commit_timestamp);
+                 Timestamp* begin_commit_timestamp,
+                 tablet::TxnMetadataPB* metadata_pb);
 
   client::KuduClient* client_;
   scoped_refptr<client::internal::RemoteTablet> tablet_;
   const StatusCallback user_cb_;
   Timestamp* begin_commit_timestamp_;
+  tablet::TxnMetadataPB* metadata_pb_;
 };
 
 } // namespace transactions
diff --git a/src/kudu/transactions/txn_system_client.cc 
b/src/kudu/transactions/txn_system_client.cc
index 83e9ead..28bf23d 100644
--- a/src/kudu/transactions/txn_system_client.cc
+++ b/src/kudu/transactions/txn_system_client.cc
@@ -81,6 +81,7 @@ using kudu::master::PingRequestPB;
 using kudu::master::PingResponsePB;
 using kudu::rpc::Messenger;
 using kudu::rpc::RpcController;
+using kudu::tablet::TxnMetadataPB;
 using kudu::tserver::CoordinatorOpPB;
 using kudu::tserver::CoordinatorOpResultPB;
 using kudu::tserver::ParticipantOpPB;
@@ -370,10 +371,11 @@ Status 
TxnSystemClient::CoordinateTransactionAsync(CoordinatorOpPB coordinate_tx
 Status TxnSystemClient::ParticipateInTransaction(const string& tablet_id,
                                                  const ParticipantOpPB& 
participant_op,
                                                  const MonoDelta& timeout,
-                                                 Timestamp* 
begin_commit_timestamp) {
+                                                 Timestamp* 
begin_commit_timestamp,
+                                                 TxnMetadataPB* metadata_pb) {
   Synchronizer sync;
   ParticipateInTransactionAsync(tablet_id, participant_op, timeout,
-                                sync.AsStatusCallback(), 
begin_commit_timestamp);
+                                sync.AsStatusCallback(), 
begin_commit_timestamp, metadata_pb);
   return sync.Wait();
 }
 
@@ -381,7 +383,8 @@ void TxnSystemClient::ParticipateInTransactionAsync(const 
string& tablet_id,
                                                     ParticipantOpPB 
participant_op,
                                                     const MonoDelta& timeout,
                                                     StatusCallback cb,
-                                                    Timestamp* 
begin_commit_timestamp) {
+                                                    Timestamp* 
begin_commit_timestamp,
+                                                    TxnMetadataPB* 
metadata_pb) {
   MonoTime deadline = MonoTime::Now() + timeout;
   unique_ptr<TxnParticipantContext> ctx(
       new TxnParticipantContext({
@@ -396,7 +399,8 @@ void TxnSystemClient::ParticipateInTransactionAsync(const 
string& tablet_id,
   // See https://taylorconor.com/blog/noncopyable-lambdas/ for more details.
   client_->data_->meta_cache_->LookupTabletById(
       client_.get(), tablet_id, deadline, &ctx_raw->tablet,
-      [cb = std::move(cb), deadline, ctx_raw, begin_commit_timestamp] (const 
Status& s) mutable {
+      [cb = std::move(cb), deadline, ctx_raw, begin_commit_timestamp, 
metadata_pb]
+          (const Status& s) mutable {
         unique_ptr<TxnParticipantContext> unique_ctx(ctx_raw);
         if (PREDICT_FALSE(!s.ok())) {
           cb(s);
@@ -406,7 +410,8 @@ void TxnSystemClient::ParticipateInTransactionAsync(const 
string& tablet_id,
             std::move(unique_ctx),
             deadline,
             std::move(cb),
-            begin_commit_timestamp);
+            begin_commit_timestamp,
+            metadata_pb);
         rpc->SendRpc();
       });
 }
diff --git a/src/kudu/transactions/txn_system_client.h 
b/src/kudu/transactions/txn_system_client.h
index 6b4c193..6c919a8 100644
--- a/src/kudu/transactions/txn_system_client.h
+++ b/src/kudu/transactions/txn_system_client.h
@@ -52,6 +52,10 @@ namespace rpc {
 class Messenger;
 } // namespace rpc
 
+namespace tablet {
+class TxnMetadataPB;
+} // namespace tablet
+
 namespace tserver {
 class CoordinatorOpPB;
 class CoordinatorOpResultPB;
@@ -151,12 +155,14 @@ class TxnSystemClient {
   Status ParticipateInTransaction(const std::string& tablet_id,
                                   const tserver::ParticipantOpPB& 
participant_op,
                                   const MonoDelta& timeout,
-                                  Timestamp* begin_commit_timestamp = nullptr);
+                                  Timestamp* begin_commit_timestamp = nullptr,
+                                  tablet::TxnMetadataPB* metadata_pb = 
nullptr);
   void ParticipateInTransactionAsync(const std::string& tablet_id,
                                      tserver::ParticipantOpPB participant_op,
                                      const MonoDelta& timeout,
                                      StatusCallback cb,
-                                     Timestamp* begin_commit_timestamp = 
nullptr);
+                                     Timestamp* begin_commit_timestamp = 
nullptr,
+                                     tablet::TxnMetadataPB* metadata_pb = 
nullptr);
  private:
 
   friend class itest::TxnStatusTableITest;
diff --git a/src/kudu/tserver/tablet_service.cc 
b/src/kudu/tserver/tablet_service.cc
index 2b77a6a..eec5b3c 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -234,6 +234,7 @@ using kudu::tablet::TABLET_DATA_TOMBSTONED;
 using kudu::tablet::Tablet;
 using kudu::tablet::TabletReplica;
 using kudu::tablet::TabletStatePB;
+using kudu::tablet::TxnMetadataPB;;
 using kudu::tablet::WriteAuthorizationContext;
 using kudu::tablet::WriteOpState;
 using kudu::tablet::WritePrivilegeType;
@@ -1336,6 +1337,15 @@ void TabletServiceAdminImpl::CoordinateTransaction(const 
CoordinateTransactionRe
 void TabletServiceAdminImpl::ParticipateInTransaction(const 
ParticipantRequestPB* req,
                                                       ParticipantResponsePB* 
resp,
                                                       RpcContext* context) {
+  if (!req->has_op() || !req->op().has_type() || !req->op().has_txn_id() ||
+      !req->has_tablet_id()) {
+    Status s = Status::InvalidArgument(
+        Substitute("Missing fields in request: $0", 
SecureShortDebugString(*req)));
+    SetupErrorAndRespond(resp->mutable_error(), s,
+                         TabletServerErrorPB::UNKNOWN_ERROR,
+                         context);
+    return;
+  }
   scoped_refptr<TabletReplica> replica;
   if (!LookupRunningTabletReplicaOrRespond(server_->tablet_manager(), 
req->tablet_id(), resp,
                                            context, &replica)) {
@@ -1348,6 +1358,23 @@ void 
TabletServiceAdminImpl::ParticipateInTransaction(const ParticipantRequestPB
     SetupErrorAndRespond(resp->mutable_error(), s, error_code, context);
     return;
   }
+  const auto& op = req->op();
+  if (op.type() == ParticipantOpPB::GET_METADATA) {
+    TxnMetadataPB pb;
+    const auto* meta = tablet->metadata();
+    if (meta->GetTxnMetadataPB(op.txn_id(), &pb)) {
+      *resp->mutable_metadata() = std::move(pb);
+      context->RespondSuccess();
+      return;
+    }
+    SetupErrorAndRespond(
+        resp->mutable_error(),
+        Status::InvalidArgument(Substitute("txn ID $0 has no metadata", 
op.txn_id())),
+        TabletServerErrorPB::UNKNOWN_ERROR,
+        context);
+    return;
+  }
+
   // TODO(awong): consider memory-based throttling?
   // TODO(awong): we should also persist the transaction's owner, and prevent
   // other users from mutating it.
diff --git a/src/kudu/tserver/tserver_admin.proto 
b/src/kudu/tserver/tserver_admin.proto
index 9572318..ea9435b 100644
--- a/src/kudu/tserver/tserver_admin.proto
+++ b/src/kudu/tserver/tserver_admin.proto
@@ -102,6 +102,9 @@ message ParticipantOpPB {
     BEGIN_COMMIT = 2;
     FINALIZE_COMMIT = 3;
     ABORT_TXN = 4;
+
+    // Get metadata about the given participant.
+    GET_METADATA = 5;
   }
   optional ParticipantOpType type = 1;
   optional int64 txn_id = 2;
@@ -120,6 +123,10 @@ message ParticipantResponsePB {
 
   // The timestamp chosen by the server for this participant op.
   optional fixed64 timestamp = 2;
+
+  // Metadata returned for a GET_METADATA op. May be absent if no metadata
+  // existed on the given participant for the given transaction.
+  optional tablet.TxnMetadataPB metadata = 3;
 }
 
 message AlterSchemaRequestPB {

Reply via email to