This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 20fde59  KUDU-2612 p2 (b): add transaction status retrieval
20fde59 is described below

commit 20fde59bca1f9df5a3cdee48f7794e0e8f16784a
Author: Alexey Serbin <[email protected]>
AuthorDate: Tue Sep 22 23:10:13 2020 -0700

    KUDU-2612 p2 (b): add transaction status retrieval
    
    After offline discussions with Andrew, it became clear that TxnManager
    should provide an asynchronous interface to commit a transaction, i.e.
    something similar to CreateTable()/IsCreateTableDone().  To implement
    that, the TxnManager needs to check for the status of the transaction
    after initiating the commit phase by issuing corresponding call
    to TxnStatusManager (that's implemented as CoordinateTransaction() RPC
    to TabletServerAdminService with BEGIN_COMMIT_TXN operation type).
    
    This patch introduces the required server-side piece to retrieve the
    information on a transaction status from the TxnStatusManager.  I'm
    planning to introduce corresponding bindings via the TxnSystemClient
    in a separate changelist.
    
    This is a follow-up to efd8c4f165460b7fa337b8ebd1856b10bc274311.
    
    Change-Id: I45f099d943f2b7955d6d561a1cb883343c7b79a4
    Reviewed-on: http://gerrit.cloudera.org:8080/16495
    Reviewed-by: Andrew Wong <[email protected]>
    Tested-by: Kudu Jenkins
---
 .../integration-tests/ts_tablet_manager-itest.cc   |   4 +-
 src/kudu/tablet/txn_coordinator.h                  |  10 ++
 src/kudu/transactions/CMakeLists.txt               |  10 +-
 src/kudu/transactions/txn_status_manager-test.cc   | 107 +++++++++++++++++++++
 src/kudu/transactions/txn_status_manager.cc        |  19 ++++
 src/kudu/transactions/txn_status_manager.h         |   6 ++
 src/kudu/transactions/txn_status_tablet.h          |   3 -
 src/kudu/tserver/CMakeLists.txt                    |   1 +
 src/kudu/tserver/tablet_service.cc                 |   9 ++
 src/kudu/tserver/tserver_admin.proto               |   9 ++
 10 files changed, 169 insertions(+), 9 deletions(-)

diff --git a/src/kudu/integration-tests/ts_tablet_manager-itest.cc 
b/src/kudu/integration-tests/ts_tablet_manager-itest.cc
index f6cc980..17b97b2 100644
--- a/src/kudu/integration-tests/ts_tablet_manager-itest.cc
+++ b/src/kudu/integration-tests/ts_tablet_manager-itest.cc
@@ -1204,6 +1204,7 @@ TEST_F(TxnStatusTabletManagementTest, 
TestTabletServerProxyCalls) {
     CoordinatorOpPB::REGISTER_PARTICIPANT,
     CoordinatorOpPB::BEGIN_COMMIT_TXN,
     CoordinatorOpPB::ABORT_TXN,
+    CoordinatorOpPB::GET_TXN_STATUS,
   };
   // Perform the series of ops for the given transaction ID as the given user,
   // erroring out if an unexpected result is received. If 'user' is empty, use
@@ -1230,7 +1231,8 @@ TEST_F(TxnStatusTabletManagementTest, 
TestTabletServerProxyCalls) {
       SCOPED_TRACE(SecureDebugString(resp));
       if (expect_success) {
         ASSERT_FALSE(resp.has_error());
-        ASSERT_FALSE(resp.has_op_result());
+        ASSERT_EQ(op_type == CoordinatorOpPB::GET_TXN_STATUS,
+                  resp.has_op_result());
       } else {
         ASSERT_TRUE(s.IsRemoteError()) << s.ToString();
         ASSERT_STR_CONTAINS(s.ToString(), "Not authorized");
diff --git a/src/kudu/tablet/txn_coordinator.h 
b/src/kudu/tablet/txn_coordinator.h
index dd84415..33bcea2 100644
--- a/src/kudu/tablet/txn_coordinator.h
+++ b/src/kudu/tablet/txn_coordinator.h
@@ -24,6 +24,11 @@
 #include "kudu/util/status.h"
 
 namespace kudu {
+
+namespace transactions {
+class TxnStatusEntryPB;
+} // namespace transactions
+
 namespace tserver {
 class TabletServerErrorPB;
 } // namespace tserver
@@ -77,6 +82,11 @@ class TxnCoordinator {
   virtual Status AbortTransaction(int64_t txn_id, const std::string& user,
                                   tserver::TabletServerErrorPB* ts_error) = 0;
 
+  // Retrieves the status entry for the specified transaction.
+  virtual Status GetTransactionStatus(int64_t txn_id,
+                                      const std::string& user,
+                                      transactions::TxnStatusEntryPB* 
txn_status) = 0;
+
   // Registers a participant tablet ID to the given transaction ID as the given
   // user.
   //
diff --git a/src/kudu/transactions/CMakeLists.txt 
b/src/kudu/transactions/CMakeLists.txt
index 0d679cb..f533499 100644
--- a/src/kudu/transactions/CMakeLists.txt
+++ b/src/kudu/transactions/CMakeLists.txt
@@ -20,14 +20,14 @@ PROTOBUF_GENERATE_CPP(
   SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../..
   BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../..
   PROTO_FILES transactions.proto)
-
-add_library(transactions_proto
-  ${TRANSACTIONS_PROTO_SRCS}
-  ${TRANSACTIONS_PROTO_HDRS})
-target_link_libraries(transactions_proto
+set(TRANSACTIONS_PROTO_LIBS
   protobuf
   wire_protocol_proto
 )
+ADD_EXPORTABLE_LIBRARY(transactions_proto
+  SRCS ${TRANSACTIONS_PROTO_SRCS}
+  DEPS ${TRANSACTIONS_PROTO_LIBS}
+  NONLINK_DEPS ${TRANSACTIONS_PROTO_TGTS})
 
 set(TRANSACTIONS_SRCS
   coordinator_rpc.cc
diff --git a/src/kudu/transactions/txn_status_manager-test.cc 
b/src/kudu/transactions/txn_status_manager-test.cc
index e4dba88..9934d41 100644
--- a/src/kudu/transactions/txn_status_manager-test.cc
+++ b/src/kudu/transactions/txn_status_manager-test.cc
@@ -56,6 +56,8 @@
 using kudu::consensus::ConsensusBootstrapInfo;
 using kudu::tablet::ParticipantIdsByTxnId;
 using kudu::tablet::TabletReplicaTestBase;
+using kudu::transactions::TxnStatePB;
+using kudu::transactions::TxnStatusEntryPB;
 using kudu::tserver::TabletServerErrorPB;
 using std::string;
 using std::thread;
@@ -350,6 +352,111 @@ TEST_F(TxnStatusManagerTest, TestUpdateStateConcurrently) 
{
   }
 }
 
+// This test scenario verifies basic functionality of the
+// TxnStatusManager::GetTransactionStatus() method.
+TEST_F(TxnStatusManagerTest, GetTransactionStatus) {
+  {
+    TabletServerErrorPB ts_error;
+    ASSERT_OK(txn_manager_->BeginTransaction(1, kOwner, &ts_error));
+
+    TxnStatusEntryPB txn_status;
+    ASSERT_OK(txn_manager_->GetTransactionStatus(1, kOwner, &txn_status));
+    ASSERT_TRUE(txn_status.has_state());
+    ASSERT_EQ(TxnStatePB::OPEN, txn_status.state());
+    ASSERT_TRUE(txn_status.has_user());
+    ASSERT_EQ(kOwner, txn_status.user());
+
+    ASSERT_OK(txn_manager_->BeginCommitTransaction(1, kOwner, &ts_error));
+    ASSERT_OK(txn_manager_->GetTransactionStatus(1, kOwner, &txn_status));
+    ASSERT_TRUE(txn_status.has_state());
+    ASSERT_EQ(TxnStatePB::COMMIT_IN_PROGRESS, txn_status.state());
+    ASSERT_TRUE(txn_status.has_user());
+    ASSERT_EQ(kOwner, txn_status.user());
+
+    ASSERT_OK(txn_manager_->FinalizeCommitTransaction(1));
+    ASSERT_OK(txn_manager_->GetTransactionStatus(1, kOwner, &txn_status));
+    ASSERT_TRUE(txn_status.has_state());
+    ASSERT_EQ(TxnStatePB::COMMITTED, txn_status.state());
+    ASSERT_TRUE(txn_status.has_user());
+    ASSERT_EQ(kOwner, txn_status.user());
+  }
+
+  {
+    TabletServerErrorPB ts_error;
+    ASSERT_OK(txn_manager_->BeginTransaction(2, kOwner, &ts_error));
+    ASSERT_OK(txn_manager_->AbortTransaction(2, kOwner, &ts_error));
+
+    TxnStatusEntryPB txn_status;
+    ASSERT_OK(txn_manager_->GetTransactionStatus(2, kOwner, &txn_status));
+    ASSERT_TRUE(txn_status.has_state());
+    ASSERT_EQ(TxnStatePB::ABORTED, txn_status.state());
+    ASSERT_TRUE(txn_status.has_user());
+    ASSERT_EQ(kOwner, txn_status.user());
+  }
+
+  // Start another transaction and start its commit phase.
+  TabletServerErrorPB ts_error;
+  ASSERT_OK(txn_manager_->BeginTransaction(3, kOwner, &ts_error));
+  ASSERT_OK(txn_manager_->BeginCommitTransaction(3, kOwner, &ts_error));
+
+  // Start just another transaction.
+  ASSERT_OK(txn_manager_->BeginTransaction(4, kOwner, &ts_error));
+
+  // Make the TxnStatusManager start from scratch.
+  ASSERT_OK(RestartReplica());
+
+  // Committed, aborted, and in-flight transactions should be known to the
+  // TxnStatusManager even after restarting the underlying replica and
+  // rebuilding the TxnStatusManager from scratch.
+  {
+    TxnStatusEntryPB txn_status;
+    ASSERT_OK(txn_manager_->GetTransactionStatus(1, kOwner, &txn_status));
+    ASSERT_TRUE(txn_status.has_state());
+    ASSERT_EQ(TxnStatePB::COMMITTED, txn_status.state());
+    ASSERT_TRUE(txn_status.has_user());
+    ASSERT_EQ(kOwner, txn_status.user());
+
+    ASSERT_OK(txn_manager_->GetTransactionStatus(2, kOwner, &txn_status));
+    ASSERT_TRUE(txn_status.has_state());
+    ASSERT_EQ(TxnStatePB::ABORTED, txn_status.state());
+    ASSERT_TRUE(txn_status.has_user());
+    ASSERT_EQ(kOwner, txn_status.user());
+
+    ASSERT_OK(txn_manager_->GetTransactionStatus(3, kOwner, &txn_status));
+    ASSERT_TRUE(txn_status.has_state());
+    ASSERT_EQ(TxnStatePB::COMMIT_IN_PROGRESS, txn_status.state());
+    ASSERT_TRUE(txn_status.has_user());
+    ASSERT_EQ(kOwner, txn_status.user());
+
+    ASSERT_OK(txn_manager_->GetTransactionStatus(4, kOwner, &txn_status));
+    ASSERT_TRUE(txn_status.has_state());
+    ASSERT_EQ(TxnStatePB::OPEN, txn_status.state());
+    ASSERT_TRUE(txn_status.has_user());
+    ASSERT_EQ(kOwner, txn_status.user());
+  }
+
+  // Supplying wrong user.
+  {
+    TxnStatusEntryPB txn_status;
+    auto s = txn_manager_->GetTransactionStatus(1, "stranger", &txn_status);
+    ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
+  }
+
+  // Supplying not-yet-used transaction ID.
+  {
+    TxnStatusEntryPB txn_status;
+    auto s = txn_manager_->GetTransactionStatus(0, kOwner, &txn_status);
+    ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+  }
+
+  // Supplying wrong user and not-yet-used transaction ID.
+  {
+    TxnStatusEntryPB txn_status;
+    auto s = txn_manager_->GetTransactionStatus(0, "stranger", &txn_status);
+    ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+  }
+}
+
 // Test that performing actions as the wrong user will return errors.
 TEST_F(TxnStatusManagerTest, TestWrongUser) {
   const string kWrongUser = "stranger";
diff --git a/src/kudu/transactions/txn_status_manager.cc 
b/src/kudu/transactions/txn_status_manager.cc
index 070298f..84edfd7 100644
--- a/src/kudu/transactions/txn_status_manager.cc
+++ b/src/kudu/transactions/txn_status_manager.cc
@@ -24,6 +24,7 @@
 #include <vector>
 
 #include <boost/optional/optional.hpp>
+#include <glog/logging.h>
 
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/port.h"
@@ -217,6 +218,24 @@ Status TxnStatusManager::AbortTransaction(int64_t txn_id, 
const std::string& use
   return Status::OK();
 }
 
+Status TxnStatusManager::GetTransactionStatus(
+    int64_t txn_id,
+    const std::string& user,
+    transactions::TxnStatusEntryPB* txn_status) {
+  DCHECK(txn_status);
+  scoped_refptr<TransactionEntry> txn;
+  RETURN_NOT_OK(GetTransaction(txn_id, user, &txn));
+
+  TransactionEntryLock txn_lock(txn.get(), LockMode::READ);
+  const auto& pb = txn_lock.data().pb;
+  DCHECK(pb.has_user());
+  txn_status->set_user(pb.user());
+  DCHECK(pb.has_state());
+  txn_status->set_state(pb.state());
+
+  return Status::OK();
+}
+
 Status TxnStatusManager::RegisterParticipant(int64_t txn_id, const string& 
tablet_id,
                                              const string& user, 
TabletServerErrorPB* ts_error) {
   scoped_refptr<TransactionEntry> txn;
diff --git a/src/kudu/transactions/txn_status_manager.h 
b/src/kudu/transactions/txn_status_manager.h
index 4036bb2..b6ee2ae 100644
--- a/src/kudu/transactions/txn_status_manager.h
+++ b/src/kudu/transactions/txn_status_manager.h
@@ -110,6 +110,12 @@ class TxnStatusManager : public tablet::TxnCoordinator {
   Status AbortTransaction(int64_t txn_id, const std::string& user,
                           tserver::TabletServerErrorPB* ts_error) override;
 
+  // Retrieves the status of the specified transaction, returning an error if
+  // the transaction doesn't exist or isn't owned by the specified user.
+  Status GetTransactionStatus(int64_t txn_id,
+                              const std::string& user,
+                              transactions::TxnStatusEntryPB* txn_status) 
override;
+
   // Creates an in-memory participant, writes an entry to the status table, and
   // attaches the in-memory participant to the transaction.
   //
diff --git a/src/kudu/transactions/txn_status_tablet.h 
b/src/kudu/transactions/txn_status_tablet.h
index 6ead705..d87dd9b 100644
--- a/src/kudu/transactions/txn_status_tablet.h
+++ b/src/kudu/transactions/txn_status_tablet.h
@@ -119,9 +119,6 @@ class TxnStatusTablet {
 
  private:
   friend class TxnStatusManager;
-  tablet::TabletReplica* tablet_replica() const {
-    return tablet_replica_;
-  }
 
   // Writes 'req' to the underlying tablet replica, populating 'ts_error' and
   // returning non-OK if there was a problem replicating the request, or simply
diff --git a/src/kudu/tserver/CMakeLists.txt b/src/kudu/tserver/CMakeLists.txt
index 40e1ac9..872c70a 100644
--- a/src/kudu/tserver/CMakeLists.txt
+++ b/src/kudu/tserver/CMakeLists.txt
@@ -69,6 +69,7 @@ set(TSERVER_ADMIN_KRPC_LIBS
   krpc
   protobuf
   rpc_header_proto
+  transactions_proto
   tserver_proto
   wire_protocol_proto)
 ADD_EXPORTABLE_LIBRARY(tserver_admin_proto
diff --git a/src/kudu/tserver/tablet_service.cc 
b/src/kudu/tserver/tablet_service.cc
index a816ccf..ea20845 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -86,6 +86,7 @@
 #include "kudu/tablet/tablet_metrics.h"
 #include "kudu/tablet/tablet_replica.h"
 #include "kudu/tablet/txn_coordinator.h"
+#include "kudu/transactions/transactions.pb.h"
 #include "kudu/tserver/scanners.h"
 #include "kudu/tserver/tablet_replica_lookup.h"
 #include "kudu/tserver/tablet_server.h"
@@ -1194,6 +1195,7 @@ Status ValidateCoordinatorOpFields(const CoordinatorOpPB& 
op) {
     case CoordinatorOpPB::BEGIN_TXN:
     case CoordinatorOpPB::BEGIN_COMMIT_TXN:
     case CoordinatorOpPB::ABORT_TXN:
+    case CoordinatorOpPB::GET_TXN_STATUS:
       if (!op.has_txn_id()) {
         return Status::InvalidArgument(Substitute("Missing txn id: $0",
                                                   SecureShortDebugString(op)));
@@ -1244,6 +1246,7 @@ void TabletServiceAdminImpl::CoordinateTransaction(const 
CoordinateTransactionRe
   // Catch any replication errors in this 'ts_error' so we can return an
   // appropriate error to the caller if need be.
   TabletServerErrorPB ts_error;
+  transactions::TxnStatusEntryPB txn_status;
   const auto& user = op.user();
   const auto& txn_id = op.txn_id();
   switch (op.type()) {
@@ -1259,6 +1262,9 @@ void TabletServiceAdminImpl::CoordinateTransaction(const 
CoordinateTransactionRe
     case CoordinatorOpPB::ABORT_TXN:
       s = txn_coordinator->AbortTransaction(txn_id, user, &ts_error);
       break;
+    case CoordinatorOpPB::GET_TXN_STATUS:
+      s = txn_coordinator->GetTransactionStatus(txn_id, user, &txn_status);
+      break;
     default:
       s = Status::InvalidArgument(Substitute("Unknown op type: $0", 
op.type()));
   }
@@ -1270,6 +1276,9 @@ void TabletServiceAdminImpl::CoordinateTransaction(const 
CoordinateTransactionRe
   // From here on out, errors are considered application errors.
   if (PREDICT_FALSE(!s.ok())) {
     StatusToPB(s, resp->mutable_op_result()->mutable_op_error());
+  } else if (op.type() == CoordinatorOpPB::GET_TXN_STATUS) {
+    // Populate corresponding field in the response.
+    *(resp->mutable_op_result()->mutable_txn_status()) = std::move(txn_status);
   }
   context->RespondSuccess();
 }
diff --git a/src/kudu/tserver/tserver_admin.proto 
b/src/kudu/tserver/tserver_admin.proto
index e7e2177..a8ba20a 100644
--- a/src/kudu/tserver/tserver_admin.proto
+++ b/src/kudu/tserver/tserver_admin.proto
@@ -24,6 +24,7 @@ import "kudu/common/wire_protocol.proto";
 import "kudu/consensus/metadata.proto";
 import "kudu/rpc/rpc_header.proto";
 import "kudu/tablet/metadata.proto";
+import "kudu/transactions/transactions.proto";
 import "kudu/tserver/tserver.proto";
 
 message CoordinatorOpPB {
@@ -33,6 +34,7 @@ message CoordinatorOpPB {
     REGISTER_PARTICIPANT = 2;
     BEGIN_COMMIT_TXN = 3;
     ABORT_TXN = 4;
+    GET_TXN_STATUS = 5;
   }
   optional CoordinatorOpType type = 1;
   optional int64 txn_id = 2;
@@ -47,6 +49,13 @@ message CoordinatorOpPB {
 message CoordinatorOpResultPB {
   optional AppStatusPB op_error = 1;
 
+  // The status of the transaction as seen at the moment when the request was
+  // processed. Populated only if responding to a request of the GET_TXN_STATUS
+  // operation type.
+  // TODO(aserbin): does it make sense to populate this with the current status
+  //                of the transaction for other operation types?
+  optional transactions.TxnStatusEntryPB txn_status = 2;
+
   // TODO(awong): populate this with some application-level results, like the
   // actual transaction ID assigned, the next highest transaction ID available,
   // etc.

Reply via email to