This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 0875f79471a384efb28453a0aff98e5e2eb6c576 Author: Alexey Serbin <[email protected]> AuthorDate: Mon May 3 16:15:48 2021 -0700 KUDU-2612: update C++ client API to commit a transaction This patch updates the signature of the KuduTransaction::Commit() method to address recent feedback on the txn-related API. The idea is to make the API easier to use, since txn.Commit(false) looks a bit vague and might be confusing as well. In essence, the 'wait' parameter is gone and now there are two methods: * KuduTransaction::Commit() * KuduTransaction::StartCommit() The former starts committing a multi-row transaction and waits for the commit phase to finalize. The latter just starts the commit phase for a multi-row transaction and returns, not awaiting for the commit phase to finalize. Change-Id: Iecac338a753e559597a9348e68c9b09813cc8105 Reviewed-on: http://gerrit.cloudera.org:8080/17392 Tested-by: Kudu Jenkins Reviewed-by: Grant Henke <[email protected]> Reviewed-by: Andrew Wong <[email protected]> --- src/kudu/client/client-test.cc | 20 +++++++------- src/kudu/client/client.cc | 9 ++++-- src/kudu/client/client.h | 29 ++++++++++++-------- src/kudu/client/transaction-internal.cc | 4 +-- src/kudu/client/transaction-internal.h | 13 ++++++++- src/kudu/integration-tests/txn_commit-itest.cc | 32 +++++++++++----------- .../integration-tests/txn_status_manager-itest.cc | 2 +- src/kudu/integration-tests/txn_write_ops-itest.cc | 10 +++---- 8 files changed, 71 insertions(+), 48 deletions(-) diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc index 7ab3e85..990e3e2 100644 --- a/src/kudu/client/client-test.cc +++ b/src/kudu/client/client-test.cc @@ -7169,7 +7169,7 @@ TEST_F(ClientTest, TxnBasicOperations) { shared_ptr<KuduTransaction> txn; ASSERT_OK(client_->NewTransaction(&txn)); ASSERT_OK(txn->Rollback()); - auto s = txn->Commit(false /* wait */); + auto s = txn->StartCommit(); ASSERT_TRUE(s.IsIllegalState()) << s.ToString(); ASSERT_STR_CONTAINS(s.ToString(), "is not open: state: ABORT"); } @@ -7256,7 +7256,7 @@ TEST_F(ClientTest, TxnCommit) { { shared_ptr<KuduTransaction> txn; ASSERT_OK(client_->NewTransaction(&txn)); - ASSERT_OK(txn->Commit(false /* wait */)); + ASSERT_OK(txn->StartCommit()); ASSERT_EVENTUALLY([&] { bool is_complete = false; Status cs; @@ -7376,7 +7376,7 @@ TEST_F(ClientTest, TxnToken) { shared_ptr<KuduSession> session; ASSERT_OK(serdes_txn->CreateSession(&session)); NO_FATALS(InsertTestRows(client_table_.get(), session.get(), kNumRows)); - ASSERT_OK(serdes_txn->Commit(false /* wait */)); + ASSERT_OK(serdes_txn->StartCommit()); // The state of a transaction isn't stored in the token, so initiating // commit of the transaction doesn't change the result of the serialization. @@ -7438,7 +7438,7 @@ TEST_F(ClientTest, AttemptToControlTxnByOtherUser) { ASSERT_OK(KuduTransaction::Deserialize(client, txn_token, &serdes_txn)); const vector<pair<string, Status>> txn_ctl_results = { { "rollback", serdes_txn->Rollback() }, - { "commit", serdes_txn->Commit(false /* wait */) }, + { "commit", serdes_txn->StartCommit() }, }; for (const auto& op_and_status : txn_ctl_results) { SCOPED_TRACE(op_and_status.first); @@ -7468,7 +7468,7 @@ TEST_F(ClientTest, NoTxnManager) { const vector<pair<string, Status>> txn_ctl_results = { { "rollback", txn->Rollback() }, - { "commit", txn->Commit(false /* wait */) }, + { "commit", txn->StartCommit() }, }; for (const auto& op_and_status : txn_ctl_results) { SCOPED_TRACE(op_and_status.first); @@ -7537,7 +7537,7 @@ TEST_F(ClientTest, TxnKeepAlive) { // of the scope. shared_ptr<KuduTransaction> serdes_txn; ASSERT_OK(KuduTransaction::Deserialize(client_, txn_token, &serdes_txn)); - auto s = serdes_txn->Commit(false /* wait */); + auto s = serdes_txn->StartCommit(); ASSERT_TRUE(s.IsIllegalState()) << s.ToString(); ASSERT_STR_CONTAINS(s.ToString(), "not open: state: ABORT"); } @@ -7560,7 +7560,7 @@ TEST_F(ClientTest, TxnKeepAlive) { SleepFor(MonoDelta::FromMilliseconds(2 * FLAGS_txn_keepalive_interval_ms)); - auto s = serdes_txn->Commit(false /* wait */); + auto s = serdes_txn->StartCommit(); ASSERT_TRUE(s.IsIllegalState()) << s.ToString(); ASSERT_STR_CONTAINS(s.ToString(), "not open: state: ABORT"); } @@ -7626,7 +7626,7 @@ TEST_F(ClientTest, TxnKeepAliveAndUnavailableTxnManagerShortTime) { // An attempt to commit a transaction should fail due to unreachable masters. { - auto s = txn->Commit(false /* wait */); + auto s = txn->StartCommit(); ASSERT_TRUE(s.IsTimedOut()) << s.ToString(); } @@ -7663,7 +7663,7 @@ TEST_F(ClientTest, TxnKeepAliveAndUnavailableTxnManagerLongTime) { // An attempt to commit a transaction should fail due to unreachable masters. { - auto s = txn->Commit(false /* wait */); + auto s = txn->StartCommit(); ASSERT_TRUE(s.IsTimedOut()) << s.ToString(); } @@ -7678,7 +7678,7 @@ TEST_F(ClientTest, TxnKeepAliveAndUnavailableTxnManagerLongTime) { // any txn keepalive messages for longer than prescribed by the // --txn_keepalive_interval_ms flag. { - auto s = txn->Commit(false /* wait */); + auto s = txn->StartCommit(); ASSERT_TRUE(s.IsIllegalState()) << s.ToString(); ASSERT_STR_CONTAINS(s.ToString(), "not open: state: ABORT"); } diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc index 7246cab..dcae85c 100644 --- a/src/kudu/client/client.cc +++ b/src/kudu/client/client.cc @@ -141,6 +141,7 @@ using std::unique_ptr; using std::vector; using strings::Substitute; + MAKE_ENUM_LIMITS(kudu::client::KuduSession::FlushMode, kudu::client::KuduSession::AUTO_FLUSH_SYNC, kudu::client::KuduSession::MANUAL_FLUSH); @@ -439,8 +440,12 @@ Status KuduTransaction::CreateSession(sp::shared_ptr<KuduSession>* session) { return data_->CreateSession(session); } -Status KuduTransaction::Commit(bool wait) { - return data_->Commit(wait); +Status KuduTransaction::Commit() { + return data_->Commit(KuduTransaction::Data::CommitMode::WAIT_FOR_COMPLETION); +} + +Status KuduTransaction::StartCommit() { + return data_->Commit(KuduTransaction::Data::CommitMode::START_ONLY); } Status KuduTransaction::IsCommitComplete( diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h index 437b0c5..59aee75 100644 --- a/src/kudu/client/client.h +++ b/src/kudu/client/client.h @@ -380,18 +380,25 @@ class KUDU_EXPORT KuduTransaction : /// Commit the transaction. /// - /// This method initiates committing the transaction, and, depending on the - /// @c wait parameter, either returns right after that or awaits - /// for the commit to finalize. + /// This method initiates committing the transaction and then awaits + /// for the transaction's commit phase to finalize. /// - /// @param [in] wait - /// This parameter controls the way how this method operates: - /// @li @c true means synchronous operation mode - /// @li @c false means asynchronous operation mode - /// In case of asynchronous mode, @c KuduTransaction::IsCommitComplete() - /// can be used to detect whether the commit has successfully finalized. - /// @return Operation result status. - Status Commit(bool wait = true) WARN_UNUSED_RESULT; + /// @return Returns @c Status::OK() if all the stages of the transaction's + /// commit sequence were successful, i.e. the status of various pre-commit + /// work, the status of starting the commit phase, the status of the commit + /// phase itself once it's completed. Returns non-OK status of the very + /// first failed stage of the transaction's commit sequence. + Status Commit() WARN_UNUSED_RESULT; + + /// Start committing this transaction, but don't wait for the commit phase + /// to finalize. + /// + /// This method initiates the commit phase for this transaction, not awaiting + /// for the commit phase to finalize. To check for the transaction's commit + /// status, use the @c KuduTransaction::IsCommitComplete() method. + /// + /// @return Status of starting the commit phase for this transaction. + Status StartCommit() WARN_UNUSED_RESULT; /// Whether the commit has completed i.e. no longer in progress of finalizing. /// diff --git a/src/kudu/client/transaction-internal.cc b/src/kudu/client/transaction-internal.cc index f702716..7113a50 100644 --- a/src/kudu/client/transaction-internal.cc +++ b/src/kudu/client/transaction-internal.cc @@ -161,7 +161,7 @@ Status KuduTransaction::Data::Begin(const sp::shared_ptr<KuduTransaction>& txn) return Status::OK(); } -Status KuduTransaction::Data::Commit(bool wait) { +Status KuduTransaction::Data::Commit(CommitMode mode) { DCHECK(txn_id_.IsValid()); auto c = weak_client_.lock(); if (!c) { @@ -185,7 +185,7 @@ Status KuduTransaction::Data::Commit(bool wait) { if (resp.has_error()) { return StatusFromPB(resp.error().status()); } - if (wait) { + if (mode == CommitMode::WAIT_FOR_COMPLETION) { RETURN_NOT_OK(WaitForTxnCommitToFinalize(c.get(), deadline, txn_id_)); } return Status::OK(); diff --git a/src/kudu/client/transaction-internal.h b/src/kudu/client/transaction-internal.h index 9835f4a..f4471dd 100644 --- a/src/kudu/client/transaction-internal.h +++ b/src/kudu/client/transaction-internal.h @@ -59,8 +59,19 @@ class KuduTransaction::Data { Status CreateSession(sp::shared_ptr<KuduSession>* session); Status Begin(const sp::shared_ptr<KuduTransaction>& txn); - Status Commit(bool wait); + + // Transaction commit mode. + enum class CommitMode { + // Only start/initiate the commit phase, don't wait for the completion. + START_ONLY, + + // Start the commit phase and wait until it succeeds or fails. + WAIT_FOR_COMPLETION, + }; + + Status Commit(CommitMode mode); Status IsCommitComplete(bool* is_complete, Status* completion_status); + Status Rollback(); Status Serialize(std::string* serialized_txn, diff --git a/src/kudu/integration-tests/txn_commit-itest.cc b/src/kudu/integration-tests/txn_commit-itest.cc index db2e913..5fba278 100644 --- a/src/kudu/integration-tests/txn_commit-itest.cc +++ b/src/kudu/integration-tests/txn_commit-itest.cc @@ -413,7 +413,7 @@ TEST_F(TxnCommitITest, TestCommitWhileDeletingTxnStatusManager) { ASSERT_OK(BeginTransaction(&txn, &txn_session)); ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn)); - ASSERT_OK(txn->Commit(/*wait*/false)); + ASSERT_OK(txn->StartCommit()); ASSERT_OK(cluster_->mini_tablet_server(0)->server()->tablet_manager()->DeleteTablet( tsm_id_, tablet::TABLET_DATA_TOMBSTONED, boost::none)); @@ -429,7 +429,7 @@ TEST_F(TxnCommitITest, TestCommitAfterDeletingParticipant) { ASSERT_OK(BeginTransaction(&txn, &txn_session)); ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn)); ASSERT_OK(client_->DeleteTable(table_name_)); - ASSERT_OK(txn->Commit(/*wait*/false)); + ASSERT_OK(txn->StartCommit()); // The transaction should eventually fail, treating the deleted participant // as a fatal error. @@ -453,7 +453,7 @@ TEST_F(TxnCommitITest, TestCommitAfterDroppingRangeParticipant) { alterer->DropRangePartition(schema.NewRow(), schema.NewRow()); alterer.reset(); - ASSERT_OK(txn->Commit(/*wait*/false)); + ASSERT_OK(txn->StartCommit()); // The transaction should eventually abort, treating the deleted participant // as fatal, resulting in an aborted transaction. @@ -472,7 +472,7 @@ TEST_F(TxnCommitITest, TestRestartingWhileCommitting) { shared_ptr<KuduSession> txn_session; ASSERT_OK(BeginTransaction(&txn, &txn_session)); ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn)); - ASSERT_OK(txn->Commit(/*wait*/false)); + ASSERT_OK(txn->StartCommit()); // Stop the tserver without allowing the finalize commit to complete. cluster_->mini_tablet_server(0)->Shutdown(); @@ -533,7 +533,7 @@ TEST_F(TxnCommitITest, TestAbortRacingWithBotchedCommit) { { table_name_, kSecondTableName })); ASSERT_OK(client_->DeleteTable(kSecondTableName)); FLAGS_txn_status_manager_inject_latency_finalize_commit_ms = 2000; - ASSERT_OK(txn->Commit(/*wait*/false)); + ASSERT_OK(txn->StartCommit()); ASSERT_OK(txn->Rollback()); ASSERT_EVENTUALLY([&] { Status completion_status; @@ -588,7 +588,7 @@ TEST_F(TxnCommitITest, TestRestartingWhileCommittingAndDeleting) { { table_name_, kSecondTableName })); ASSERT_OK(client_->DeleteTable(kSecondTableName)); FLAGS_txn_status_manager_inject_latency_finalize_commit_ms = 10000; - ASSERT_OK(txn->Commit(/*wait*/false)); + ASSERT_OK(txn->StartCommit()); // Shut down without giving time for the commit to complete. auto* mts = cluster_->mini_tablet_server(0); @@ -674,7 +674,7 @@ TEST_F(TxnCommitITest, TestCommitAfterParticipantAbort) { participant_ids_[0], op_pb, MonoDelta::FromSeconds(3))); // When we try to commit, we should end up not completing. - ASSERT_OK(txn->Commit(/*wait*/false)); + ASSERT_OK(txn->StartCommit()); SleepFor(MonoDelta::FromSeconds(3)); Status completion_status; @@ -705,7 +705,7 @@ TEST_F(TxnCommitITest, TestConcurrentCommitCalls) { vector<Status> results(kNumTxns); for (int i = 0; i < kNumTxns; i++) { threads.emplace_back([&, i] { - results[i] = txns[i]->Commit(/*wait*/false); + results[i] = txns[i]->StartCommit(); }); } for (auto& t : threads) { @@ -751,7 +751,7 @@ TEST_F(TxnCommitITest, TestConcurrentAbortsAndCommits) { for (int i = 0; i < kNumTxns; i++) { threads.emplace_back([&, i] { SleepFor(MonoDelta::FromMilliseconds(rand() % kMaxSleepMs)); - Status s = txns[i]->Commit(/*wait*/true); + Status s = txns[i]->Commit(); if (s.ok()) { num_committed_txns++; } @@ -805,7 +805,7 @@ TEST_F(TxnCommitITest, TestConcurrentRepeatedCommitCalls) { vector<Status> results(kNumThreads); for (int i = 0; i < kNumThreads; i++) { threads.emplace_back([&, i] { - results[i] = txn->Commit(/*wait*/false); + results[i] = txn->StartCommit(); }); } for (auto& t : threads) { @@ -833,7 +833,7 @@ TEST_F(TxnCommitITest, TestDontBackgroundAbortIfCommitInProgress) { shared_ptr<KuduSession> txn_session; ASSERT_OK(BeginTransaction(&txn, &txn_session)); ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn)); - ASSERT_OK(txn->Commit(/*wait*/false)); + ASSERT_OK(txn->StartCommit()); ASSERT_OK(txn->Serialize(&serialized_txn)); } // Wait a bit to allow would-be background aborts to happen. @@ -861,7 +861,7 @@ TEST_F(TxnCommitITest, TestAbortIfCommitInProgress) { shared_ptr<KuduSession> txn_session; ASSERT_OK(BeginTransaction(&txn, &txn_session)); ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn)); - ASSERT_OK(txn->Commit(/*wait*/false)); + ASSERT_OK(txn->StartCommit()); ASSERT_OK(txn->Rollback()); ASSERT_EVENTUALLY([&] { Status completion_status; @@ -919,7 +919,7 @@ TEST_F(TwoNodeTxnCommitITest, TestCommitWhenParticipantsAreDown) { ASSERT_OK(BeginTransaction(&txn, &txn_session)); ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn)); prt_ts_->Shutdown(); - ASSERT_OK(txn->Commit(/*wait*/false)); + ASSERT_OK(txn->StartCommit()); // Since our participant is down, we can't proceed with the commit. Status completion_status; @@ -965,7 +965,7 @@ TEST_F(TwoNodeTxnCommitITest, TestStartTasksDuringStartup) { // Shut down our participant's tserver so our commit task keeps retrying. prt_ts_->Shutdown(); - ASSERT_OK(committed_txn->Commit(/*wait*/false)); + ASSERT_OK(committed_txn->StartCommit()); ASSERT_OK(aborted_txn->Rollback()); Status completion_status; @@ -1012,7 +1012,7 @@ TEST_F(TwoNodeTxnCommitITest, TestCommitWhileShuttingDownTxnStatusManager) { shared_ptr<KuduSession> txn_session; ASSERT_OK(BeginTransaction(&txn, &txn_session)); - ASSERT_OK(txn->Commit(/*wait*/false)); + ASSERT_OK(txn->StartCommit()); tsm_ts_->Shutdown(); ASSERT_OK(tsm_ts_->Restart()); @@ -1064,7 +1064,7 @@ TEST_F(ThreeNodeTxnCommitITest, TestCommitTasksReloadOnLeadershipChange) { ASSERT_OK(BeginTransaction(&aborted_txn, &txn_session)); ASSERT_OK(InsertToSession(txn_session, initial_row_count_ + kNumRowsPerTxn, kNumRowsPerTxn)); } - ASSERT_OK(committed_txn->Commit(/*wait*/ false)); + ASSERT_OK(committed_txn->StartCommit()); Status completion_status; bool is_complete = false; ASSERT_OK(committed_txn->IsCommitComplete(&is_complete, &completion_status)); diff --git a/src/kudu/integration-tests/txn_status_manager-itest.cc b/src/kudu/integration-tests/txn_status_manager-itest.cc index baef6be..024942f 100644 --- a/src/kudu/integration-tests/txn_status_manager-itest.cc +++ b/src/kudu/integration-tests/txn_status_manager-itest.cc @@ -483,7 +483,7 @@ TEST_F(TxnStatusManagerITest, TxnKeptAliveByClientIfStatusManagerRestarted) { SleepFor(MonoDelta::FromMilliseconds(5 * kTxnKeepaliveIntervalMs)); - ASSERT_OK(txn->Commit(false /* wait */)); + ASSERT_OK(txn->StartCommit()); NO_FATALS(cluster_->AssertNoCrashes()); } diff --git a/src/kudu/integration-tests/txn_write_ops-itest.cc b/src/kudu/integration-tests/txn_write_ops-itest.cc index 1aabbc1..5c238f9 100644 --- a/src/kudu/integration-tests/txn_write_ops-itest.cc +++ b/src/kudu/integration-tests/txn_write_ops-itest.cc @@ -338,7 +338,7 @@ TEST_F(TxnWriteOpsITest, CommitTimestampPropagation) { ASSERT_EQ(0, session->CountPendingErrors()); const auto ts_before_commit = client_->GetLatestObservedTimestamp(); - ASSERT_OK(txn->Commit(false)); + ASSERT_OK(txn->StartCommit()); const auto ts_after_commit_async = client_->GetLatestObservedTimestamp(); ASSERT_EQ(ts_before_commit, ts_after_commit_async); @@ -475,7 +475,7 @@ TEST_F(TxnWriteOpsITest, DeadlockPrevention) { } if (!needs_retry) { succeeded = true; - ASSERT_OK(txn->Commit(/*wait*/true)); + ASSERT_OK(txn->Commit()); } } }); @@ -579,7 +579,7 @@ TEST_F(TxnWriteOpsITest, FrequentElections) { CHECK_OK(session->Apply(op.release())); } if (iter % 8 == 0) { - CHECK_OK(txn->Commit(false/*wait*/)); + CHECK_OK(txn->StartCommit()); row_count += kNumRowsPerTxn; } else { CHECK_OK(txn->Rollback()); @@ -1881,7 +1881,7 @@ TEST_F(TxnOpDispatcherITest, CommitWithWriteOpPendingParticipantNotYetRegistered SleepFor(MonoDelta::FromMilliseconds(kDelayMs)); // Initiate committing the transaction after the delay, but don't wait // for the commit to finalize. - commit_init_status = txn->Commit(false/*wait*/); + commit_init_status = txn->StartCommit(); }); auto cleanup = MakeScopedCleanup([&]() { committer.join(); @@ -1938,7 +1938,7 @@ TEST_F(TxnOpDispatcherITest, CommitWithWriteOpPendingParticipantRegistered) { SleepFor(MonoDelta::FromMilliseconds(kDelayMs)); // Initiate committing the transaction after the delay, but don't wait // for the commit to finalize. - commit_init_status = txn->Commit(false/*wait*/); + commit_init_status = txn->StartCommit(); }); auto cleanup = MakeScopedCleanup([&]() { committer.join();
