This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 20fde59 KUDU-2612 p2 (b): add transaction status retrieval
20fde59 is described below
commit 20fde59bca1f9df5a3cdee48f7794e0e8f16784a
Author: Alexey Serbin <[email protected]>
AuthorDate: Tue Sep 22 23:10:13 2020 -0700
KUDU-2612 p2 (b): add transaction status retrieval
After offline discussions with Andrew, it became clear that TxnManager
should provide an asynchronous interface to commit a transaction, i.e.
something similar to CreateTable()/IsCreateTableDone(). To implement
that, the TxnManager needs to check for the status of the transaction
after initiating the commit phase by issuing corresponding call
to TxnStatusManager (that's implemented as CoordinateTransaction() RPC
to TabletServerAdminService with BEGIN_COMMIT_TXN operation type).
This patch introduces the required server-side piece to retrieve the
information on a transaction status from the TxnStatusManager. I'm
planning to introduce corresponding bindings via the TxnSystemClient
in a separate changelist.
This is a follow-up to efd8c4f165460b7fa337b8ebd1856b10bc274311.
Change-Id: I45f099d943f2b7955d6d561a1cb883343c7b79a4
Reviewed-on: http://gerrit.cloudera.org:8080/16495
Reviewed-by: Andrew Wong <[email protected]>
Tested-by: Kudu Jenkins
---
.../integration-tests/ts_tablet_manager-itest.cc | 4 +-
src/kudu/tablet/txn_coordinator.h | 10 ++
src/kudu/transactions/CMakeLists.txt | 10 +-
src/kudu/transactions/txn_status_manager-test.cc | 107 +++++++++++++++++++++
src/kudu/transactions/txn_status_manager.cc | 19 ++++
src/kudu/transactions/txn_status_manager.h | 6 ++
src/kudu/transactions/txn_status_tablet.h | 3 -
src/kudu/tserver/CMakeLists.txt | 1 +
src/kudu/tserver/tablet_service.cc | 9 ++
src/kudu/tserver/tserver_admin.proto | 9 ++
10 files changed, 169 insertions(+), 9 deletions(-)
diff --git a/src/kudu/integration-tests/ts_tablet_manager-itest.cc
b/src/kudu/integration-tests/ts_tablet_manager-itest.cc
index f6cc980..17b97b2 100644
--- a/src/kudu/integration-tests/ts_tablet_manager-itest.cc
+++ b/src/kudu/integration-tests/ts_tablet_manager-itest.cc
@@ -1204,6 +1204,7 @@ TEST_F(TxnStatusTabletManagementTest,
TestTabletServerProxyCalls) {
CoordinatorOpPB::REGISTER_PARTICIPANT,
CoordinatorOpPB::BEGIN_COMMIT_TXN,
CoordinatorOpPB::ABORT_TXN,
+ CoordinatorOpPB::GET_TXN_STATUS,
};
// Perform the series of ops for the given transaction ID as the given user,
// erroring out if an unexpected result is received. If 'user' is empty, use
@@ -1230,7 +1231,8 @@ TEST_F(TxnStatusTabletManagementTest,
TestTabletServerProxyCalls) {
SCOPED_TRACE(SecureDebugString(resp));
if (expect_success) {
ASSERT_FALSE(resp.has_error());
- ASSERT_FALSE(resp.has_op_result());
+ ASSERT_EQ(op_type == CoordinatorOpPB::GET_TXN_STATUS,
+ resp.has_op_result());
} else {
ASSERT_TRUE(s.IsRemoteError()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(), "Not authorized");
diff --git a/src/kudu/tablet/txn_coordinator.h
b/src/kudu/tablet/txn_coordinator.h
index dd84415..33bcea2 100644
--- a/src/kudu/tablet/txn_coordinator.h
+++ b/src/kudu/tablet/txn_coordinator.h
@@ -24,6 +24,11 @@
#include "kudu/util/status.h"
namespace kudu {
+
+namespace transactions {
+class TxnStatusEntryPB;
+} // namespace transactions
+
namespace tserver {
class TabletServerErrorPB;
} // namespace tserver
@@ -77,6 +82,11 @@ class TxnCoordinator {
virtual Status AbortTransaction(int64_t txn_id, const std::string& user,
tserver::TabletServerErrorPB* ts_error) = 0;
+ // Retrieves the status entry for the specified transaction.
+ virtual Status GetTransactionStatus(int64_t txn_id,
+ const std::string& user,
+ transactions::TxnStatusEntryPB*
txn_status) = 0;
+
// Registers a participant tablet ID to the given transaction ID as the given
// user.
//
diff --git a/src/kudu/transactions/CMakeLists.txt
b/src/kudu/transactions/CMakeLists.txt
index 0d679cb..f533499 100644
--- a/src/kudu/transactions/CMakeLists.txt
+++ b/src/kudu/transactions/CMakeLists.txt
@@ -20,14 +20,14 @@ PROTOBUF_GENERATE_CPP(
SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../..
BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../..
PROTO_FILES transactions.proto)
-
-add_library(transactions_proto
- ${TRANSACTIONS_PROTO_SRCS}
- ${TRANSACTIONS_PROTO_HDRS})
-target_link_libraries(transactions_proto
+set(TRANSACTIONS_PROTO_LIBS
protobuf
wire_protocol_proto
)
+ADD_EXPORTABLE_LIBRARY(transactions_proto
+ SRCS ${TRANSACTIONS_PROTO_SRCS}
+ DEPS ${TRANSACTIONS_PROTO_LIBS}
+ NONLINK_DEPS ${TRANSACTIONS_PROTO_TGTS})
set(TRANSACTIONS_SRCS
coordinator_rpc.cc
diff --git a/src/kudu/transactions/txn_status_manager-test.cc
b/src/kudu/transactions/txn_status_manager-test.cc
index e4dba88..9934d41 100644
--- a/src/kudu/transactions/txn_status_manager-test.cc
+++ b/src/kudu/transactions/txn_status_manager-test.cc
@@ -56,6 +56,8 @@
using kudu::consensus::ConsensusBootstrapInfo;
using kudu::tablet::ParticipantIdsByTxnId;
using kudu::tablet::TabletReplicaTestBase;
+using kudu::transactions::TxnStatePB;
+using kudu::transactions::TxnStatusEntryPB;
using kudu::tserver::TabletServerErrorPB;
using std::string;
using std::thread;
@@ -350,6 +352,111 @@ TEST_F(TxnStatusManagerTest, TestUpdateStateConcurrently)
{
}
}
+// This test scenario verifies basic functionality of the
+// TxnStatusManager::GetTransactionStatus() method.
+TEST_F(TxnStatusManagerTest, GetTransactionStatus) {
+ {
+ TabletServerErrorPB ts_error;
+ ASSERT_OK(txn_manager_->BeginTransaction(1, kOwner, &ts_error));
+
+ TxnStatusEntryPB txn_status;
+ ASSERT_OK(txn_manager_->GetTransactionStatus(1, kOwner, &txn_status));
+ ASSERT_TRUE(txn_status.has_state());
+ ASSERT_EQ(TxnStatePB::OPEN, txn_status.state());
+ ASSERT_TRUE(txn_status.has_user());
+ ASSERT_EQ(kOwner, txn_status.user());
+
+ ASSERT_OK(txn_manager_->BeginCommitTransaction(1, kOwner, &ts_error));
+ ASSERT_OK(txn_manager_->GetTransactionStatus(1, kOwner, &txn_status));
+ ASSERT_TRUE(txn_status.has_state());
+ ASSERT_EQ(TxnStatePB::COMMIT_IN_PROGRESS, txn_status.state());
+ ASSERT_TRUE(txn_status.has_user());
+ ASSERT_EQ(kOwner, txn_status.user());
+
+ ASSERT_OK(txn_manager_->FinalizeCommitTransaction(1));
+ ASSERT_OK(txn_manager_->GetTransactionStatus(1, kOwner, &txn_status));
+ ASSERT_TRUE(txn_status.has_state());
+ ASSERT_EQ(TxnStatePB::COMMITTED, txn_status.state());
+ ASSERT_TRUE(txn_status.has_user());
+ ASSERT_EQ(kOwner, txn_status.user());
+ }
+
+ {
+ TabletServerErrorPB ts_error;
+ ASSERT_OK(txn_manager_->BeginTransaction(2, kOwner, &ts_error));
+ ASSERT_OK(txn_manager_->AbortTransaction(2, kOwner, &ts_error));
+
+ TxnStatusEntryPB txn_status;
+ ASSERT_OK(txn_manager_->GetTransactionStatus(2, kOwner, &txn_status));
+ ASSERT_TRUE(txn_status.has_state());
+ ASSERT_EQ(TxnStatePB::ABORTED, txn_status.state());
+ ASSERT_TRUE(txn_status.has_user());
+ ASSERT_EQ(kOwner, txn_status.user());
+ }
+
+ // Start another transaction and start its commit phase.
+ TabletServerErrorPB ts_error;
+ ASSERT_OK(txn_manager_->BeginTransaction(3, kOwner, &ts_error));
+ ASSERT_OK(txn_manager_->BeginCommitTransaction(3, kOwner, &ts_error));
+
+ // Start just another transaction.
+ ASSERT_OK(txn_manager_->BeginTransaction(4, kOwner, &ts_error));
+
+ // Make the TxnStatusManager start from scratch.
+ ASSERT_OK(RestartReplica());
+
+ // Committed, aborted, and in-flight transactions should be known to the
+ // TxnStatusManager even after restarting the underlying replica and
+ // rebuilding the TxnStatusManager from scratch.
+ {
+ TxnStatusEntryPB txn_status;
+ ASSERT_OK(txn_manager_->GetTransactionStatus(1, kOwner, &txn_status));
+ ASSERT_TRUE(txn_status.has_state());
+ ASSERT_EQ(TxnStatePB::COMMITTED, txn_status.state());
+ ASSERT_TRUE(txn_status.has_user());
+ ASSERT_EQ(kOwner, txn_status.user());
+
+ ASSERT_OK(txn_manager_->GetTransactionStatus(2, kOwner, &txn_status));
+ ASSERT_TRUE(txn_status.has_state());
+ ASSERT_EQ(TxnStatePB::ABORTED, txn_status.state());
+ ASSERT_TRUE(txn_status.has_user());
+ ASSERT_EQ(kOwner, txn_status.user());
+
+ ASSERT_OK(txn_manager_->GetTransactionStatus(3, kOwner, &txn_status));
+ ASSERT_TRUE(txn_status.has_state());
+ ASSERT_EQ(TxnStatePB::COMMIT_IN_PROGRESS, txn_status.state());
+ ASSERT_TRUE(txn_status.has_user());
+ ASSERT_EQ(kOwner, txn_status.user());
+
+ ASSERT_OK(txn_manager_->GetTransactionStatus(4, kOwner, &txn_status));
+ ASSERT_TRUE(txn_status.has_state());
+ ASSERT_EQ(TxnStatePB::OPEN, txn_status.state());
+ ASSERT_TRUE(txn_status.has_user());
+ ASSERT_EQ(kOwner, txn_status.user());
+ }
+
+ // Supplying wrong user.
+ {
+ TxnStatusEntryPB txn_status;
+ auto s = txn_manager_->GetTransactionStatus(1, "stranger", &txn_status);
+ ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
+ }
+
+ // Supplying not-yet-used transaction ID.
+ {
+ TxnStatusEntryPB txn_status;
+ auto s = txn_manager_->GetTransactionStatus(0, kOwner, &txn_status);
+ ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+ }
+
+ // Supplying wrong user and not-yet-used transaction ID.
+ {
+ TxnStatusEntryPB txn_status;
+ auto s = txn_manager_->GetTransactionStatus(0, "stranger", &txn_status);
+ ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+ }
+}
+
// Test that performing actions as the wrong user will return errors.
TEST_F(TxnStatusManagerTest, TestWrongUser) {
const string kWrongUser = "stranger";
diff --git a/src/kudu/transactions/txn_status_manager.cc
b/src/kudu/transactions/txn_status_manager.cc
index 070298f..84edfd7 100644
--- a/src/kudu/transactions/txn_status_manager.cc
+++ b/src/kudu/transactions/txn_status_manager.cc
@@ -24,6 +24,7 @@
#include <vector>
#include <boost/optional/optional.hpp>
+#include <glog/logging.h>
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/port.h"
@@ -217,6 +218,24 @@ Status TxnStatusManager::AbortTransaction(int64_t txn_id,
const std::string& use
return Status::OK();
}
+Status TxnStatusManager::GetTransactionStatus(
+ int64_t txn_id,
+ const std::string& user,
+ transactions::TxnStatusEntryPB* txn_status) {
+ DCHECK(txn_status);
+ scoped_refptr<TransactionEntry> txn;
+ RETURN_NOT_OK(GetTransaction(txn_id, user, &txn));
+
+ TransactionEntryLock txn_lock(txn.get(), LockMode::READ);
+ const auto& pb = txn_lock.data().pb;
+ DCHECK(pb.has_user());
+ txn_status->set_user(pb.user());
+ DCHECK(pb.has_state());
+ txn_status->set_state(pb.state());
+
+ return Status::OK();
+}
+
Status TxnStatusManager::RegisterParticipant(int64_t txn_id, const string&
tablet_id,
const string& user,
TabletServerErrorPB* ts_error) {
scoped_refptr<TransactionEntry> txn;
diff --git a/src/kudu/transactions/txn_status_manager.h
b/src/kudu/transactions/txn_status_manager.h
index 4036bb2..b6ee2ae 100644
--- a/src/kudu/transactions/txn_status_manager.h
+++ b/src/kudu/transactions/txn_status_manager.h
@@ -110,6 +110,12 @@ class TxnStatusManager : public tablet::TxnCoordinator {
Status AbortTransaction(int64_t txn_id, const std::string& user,
tserver::TabletServerErrorPB* ts_error) override;
+ // Retrieves the status of the specified transaction, returning an error if
+ // the transaction doesn't exist or isn't owned by the specified user.
+ Status GetTransactionStatus(int64_t txn_id,
+ const std::string& user,
+ transactions::TxnStatusEntryPB* txn_status)
override;
+
// Creates an in-memory participant, writes an entry to the status table, and
// attaches the in-memory participant to the transaction.
//
diff --git a/src/kudu/transactions/txn_status_tablet.h
b/src/kudu/transactions/txn_status_tablet.h
index 6ead705..d87dd9b 100644
--- a/src/kudu/transactions/txn_status_tablet.h
+++ b/src/kudu/transactions/txn_status_tablet.h
@@ -119,9 +119,6 @@ class TxnStatusTablet {
private:
friend class TxnStatusManager;
- tablet::TabletReplica* tablet_replica() const {
- return tablet_replica_;
- }
// Writes 'req' to the underlying tablet replica, populating 'ts_error' and
// returning non-OK if there was a problem replicating the request, or simply
diff --git a/src/kudu/tserver/CMakeLists.txt b/src/kudu/tserver/CMakeLists.txt
index 40e1ac9..872c70a 100644
--- a/src/kudu/tserver/CMakeLists.txt
+++ b/src/kudu/tserver/CMakeLists.txt
@@ -69,6 +69,7 @@ set(TSERVER_ADMIN_KRPC_LIBS
krpc
protobuf
rpc_header_proto
+ transactions_proto
tserver_proto
wire_protocol_proto)
ADD_EXPORTABLE_LIBRARY(tserver_admin_proto
diff --git a/src/kudu/tserver/tablet_service.cc
b/src/kudu/tserver/tablet_service.cc
index a816ccf..ea20845 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -86,6 +86,7 @@
#include "kudu/tablet/tablet_metrics.h"
#include "kudu/tablet/tablet_replica.h"
#include "kudu/tablet/txn_coordinator.h"
+#include "kudu/transactions/transactions.pb.h"
#include "kudu/tserver/scanners.h"
#include "kudu/tserver/tablet_replica_lookup.h"
#include "kudu/tserver/tablet_server.h"
@@ -1194,6 +1195,7 @@ Status ValidateCoordinatorOpFields(const CoordinatorOpPB&
op) {
case CoordinatorOpPB::BEGIN_TXN:
case CoordinatorOpPB::BEGIN_COMMIT_TXN:
case CoordinatorOpPB::ABORT_TXN:
+ case CoordinatorOpPB::GET_TXN_STATUS:
if (!op.has_txn_id()) {
return Status::InvalidArgument(Substitute("Missing txn id: $0",
SecureShortDebugString(op)));
@@ -1244,6 +1246,7 @@ void TabletServiceAdminImpl::CoordinateTransaction(const
CoordinateTransactionRe
// Catch any replication errors in this 'ts_error' so we can return an
// appropriate error to the caller if need be.
TabletServerErrorPB ts_error;
+ transactions::TxnStatusEntryPB txn_status;
const auto& user = op.user();
const auto& txn_id = op.txn_id();
switch (op.type()) {
@@ -1259,6 +1262,9 @@ void TabletServiceAdminImpl::CoordinateTransaction(const
CoordinateTransactionRe
case CoordinatorOpPB::ABORT_TXN:
s = txn_coordinator->AbortTransaction(txn_id, user, &ts_error);
break;
+ case CoordinatorOpPB::GET_TXN_STATUS:
+ s = txn_coordinator->GetTransactionStatus(txn_id, user, &txn_status);
+ break;
default:
s = Status::InvalidArgument(Substitute("Unknown op type: $0",
op.type()));
}
@@ -1270,6 +1276,9 @@ void TabletServiceAdminImpl::CoordinateTransaction(const
CoordinateTransactionRe
// From here on out, errors are considered application errors.
if (PREDICT_FALSE(!s.ok())) {
StatusToPB(s, resp->mutable_op_result()->mutable_op_error());
+ } else if (op.type() == CoordinatorOpPB::GET_TXN_STATUS) {
+ // Populate corresponding field in the response.
+ *(resp->mutable_op_result()->mutable_txn_status()) = std::move(txn_status);
}
context->RespondSuccess();
}
diff --git a/src/kudu/tserver/tserver_admin.proto
b/src/kudu/tserver/tserver_admin.proto
index e7e2177..a8ba20a 100644
--- a/src/kudu/tserver/tserver_admin.proto
+++ b/src/kudu/tserver/tserver_admin.proto
@@ -24,6 +24,7 @@ import "kudu/common/wire_protocol.proto";
import "kudu/consensus/metadata.proto";
import "kudu/rpc/rpc_header.proto";
import "kudu/tablet/metadata.proto";
+import "kudu/transactions/transactions.proto";
import "kudu/tserver/tserver.proto";
message CoordinatorOpPB {
@@ -33,6 +34,7 @@ message CoordinatorOpPB {
REGISTER_PARTICIPANT = 2;
BEGIN_COMMIT_TXN = 3;
ABORT_TXN = 4;
+ GET_TXN_STATUS = 5;
}
optional CoordinatorOpType type = 1;
optional int64 txn_id = 2;
@@ -47,6 +49,13 @@ message CoordinatorOpPB {
message CoordinatorOpResultPB {
optional AppStatusPB op_error = 1;
+ // The status of the transaction as seen at the moment when the request was
+ // processed. Populated only if responding to a request of the GET_TXN_STATUS
+ // operation type.
+ // TODO(aserbin): does it make sense to populate this with the current status
+ // of the transaction for other operation types?
+ optional transactions.TxnStatusEntryPB txn_status = 2;
+
// TODO(awong): populate this with some application-level results, like the
// actual transaction ID assigned, the next highest transaction ID available,
// etc.