This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 0875f79471a384efb28453a0aff98e5e2eb6c576
Author: Alexey Serbin <[email protected]>
AuthorDate: Mon May 3 16:15:48 2021 -0700

    KUDU-2612: update C++ client API to commit a transaction
    
    This patch updates the signature of the KuduTransaction::Commit()
    method to address recent feedback on the txn-related API. The idea is
    to make the API easier to use, since txn.Commit(false) looks a bit vague
    and might be confusing as well.
    
    In essence, the 'wait' parameter is gone and now there are two methods:
      * KuduTransaction::Commit()
      * KuduTransaction::StartCommit()
    
    The former starts committing a multi-row transaction and waits for the
    commit phase to finalize.  The latter just starts the commit phase for
    a multi-row transaction and returns, not awaiting for the commit phase
    to finalize.
    
    Change-Id: Iecac338a753e559597a9348e68c9b09813cc8105
    Reviewed-on: http://gerrit.cloudera.org:8080/17392
    Tested-by: Kudu Jenkins
    Reviewed-by: Grant Henke <[email protected]>
    Reviewed-by: Andrew Wong <[email protected]>
---
 src/kudu/client/client-test.cc                     | 20 +++++++-------
 src/kudu/client/client.cc                          |  9 ++++--
 src/kudu/client/client.h                           | 29 ++++++++++++--------
 src/kudu/client/transaction-internal.cc            |  4 +--
 src/kudu/client/transaction-internal.h             | 13 ++++++++-
 src/kudu/integration-tests/txn_commit-itest.cc     | 32 +++++++++++-----------
 .../integration-tests/txn_status_manager-itest.cc  |  2 +-
 src/kudu/integration-tests/txn_write_ops-itest.cc  | 10 +++----
 8 files changed, 71 insertions(+), 48 deletions(-)

diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index 7ab3e85..990e3e2 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -7169,7 +7169,7 @@ TEST_F(ClientTest, TxnBasicOperations) {
     shared_ptr<KuduTransaction> txn;
     ASSERT_OK(client_->NewTransaction(&txn));
     ASSERT_OK(txn->Rollback());
-    auto s = txn->Commit(false /* wait */);
+    auto s = txn->StartCommit();
     ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
     ASSERT_STR_CONTAINS(s.ToString(), "is not open: state: ABORT");
   }
@@ -7256,7 +7256,7 @@ TEST_F(ClientTest, TxnCommit) {
   {
     shared_ptr<KuduTransaction> txn;
     ASSERT_OK(client_->NewTransaction(&txn));
-    ASSERT_OK(txn->Commit(false /* wait */));
+    ASSERT_OK(txn->StartCommit());
     ASSERT_EVENTUALLY([&] {
       bool is_complete = false;
       Status cs;
@@ -7376,7 +7376,7 @@ TEST_F(ClientTest, TxnToken) {
     shared_ptr<KuduSession> session;
     ASSERT_OK(serdes_txn->CreateSession(&session));
     NO_FATALS(InsertTestRows(client_table_.get(), session.get(), kNumRows));
-    ASSERT_OK(serdes_txn->Commit(false /* wait */));
+    ASSERT_OK(serdes_txn->StartCommit());
 
     // The state of a transaction isn't stored in the token, so initiating
     // commit of the transaction doesn't change the result of the 
serialization.
@@ -7438,7 +7438,7 @@ TEST_F(ClientTest, AttemptToControlTxnByOtherUser) {
   ASSERT_OK(KuduTransaction::Deserialize(client, txn_token, &serdes_txn));
   const vector<pair<string, Status>> txn_ctl_results = {
     { "rollback", serdes_txn->Rollback() },
-    { "commit", serdes_txn->Commit(false /* wait */) },
+    { "commit", serdes_txn->StartCommit() },
   };
   for (const auto& op_and_status : txn_ctl_results) {
     SCOPED_TRACE(op_and_status.first);
@@ -7468,7 +7468,7 @@ TEST_F(ClientTest, NoTxnManager) {
 
   const vector<pair<string, Status>> txn_ctl_results = {
     { "rollback", txn->Rollback() },
-    { "commit", txn->Commit(false /* wait */) },
+    { "commit", txn->StartCommit() },
   };
   for (const auto& op_and_status : txn_ctl_results) {
     SCOPED_TRACE(op_and_status.first);
@@ -7537,7 +7537,7 @@ TEST_F(ClientTest, TxnKeepAlive) {
     // of the scope.
     shared_ptr<KuduTransaction> serdes_txn;
     ASSERT_OK(KuduTransaction::Deserialize(client_, txn_token, &serdes_txn));
-    auto s = serdes_txn->Commit(false /* wait */);
+    auto s = serdes_txn->StartCommit();
     ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
     ASSERT_STR_CONTAINS(s.ToString(), "not open: state: ABORT");
   }
@@ -7560,7 +7560,7 @@ TEST_F(ClientTest, TxnKeepAlive) {
 
     SleepFor(MonoDelta::FromMilliseconds(2 * FLAGS_txn_keepalive_interval_ms));
 
-    auto s = serdes_txn->Commit(false /* wait */);
+    auto s = serdes_txn->StartCommit();
     ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
     ASSERT_STR_CONTAINS(s.ToString(), "not open: state: ABORT");
   }
@@ -7626,7 +7626,7 @@ TEST_F(ClientTest, 
TxnKeepAliveAndUnavailableTxnManagerShortTime) {
 
   // An attempt to commit a transaction should fail due to unreachable masters.
   {
-    auto s = txn->Commit(false /* wait */);
+    auto s = txn->StartCommit();
     ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
   }
 
@@ -7663,7 +7663,7 @@ TEST_F(ClientTest, 
TxnKeepAliveAndUnavailableTxnManagerLongTime) {
 
   // An attempt to commit a transaction should fail due to unreachable masters.
   {
-    auto s = txn->Commit(false /* wait */);
+    auto s = txn->StartCommit();
     ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
   }
 
@@ -7678,7 +7678,7 @@ TEST_F(ClientTest, 
TxnKeepAliveAndUnavailableTxnManagerLongTime) {
   // any txn keepalive messages for longer than prescribed by the
   // --txn_keepalive_interval_ms flag.
   {
-    auto s = txn->Commit(false /* wait */);
+    auto s = txn->StartCommit();
     ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
     ASSERT_STR_CONTAINS(s.ToString(), "not open: state: ABORT");
   }
diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index 7246cab..dcae85c 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -141,6 +141,7 @@ using std::unique_ptr;
 using std::vector;
 using strings::Substitute;
 
+
 MAKE_ENUM_LIMITS(kudu::client::KuduSession::FlushMode,
                  kudu::client::KuduSession::AUTO_FLUSH_SYNC,
                  kudu::client::KuduSession::MANUAL_FLUSH);
@@ -439,8 +440,12 @@ Status 
KuduTransaction::CreateSession(sp::shared_ptr<KuduSession>* session) {
   return data_->CreateSession(session);
 }
 
-Status KuduTransaction::Commit(bool wait) {
-  return data_->Commit(wait);
+Status KuduTransaction::Commit() {
+  return data_->Commit(KuduTransaction::Data::CommitMode::WAIT_FOR_COMPLETION);
+}
+
+Status KuduTransaction::StartCommit() {
+  return data_->Commit(KuduTransaction::Data::CommitMode::START_ONLY);
 }
 
 Status KuduTransaction::IsCommitComplete(
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index 437b0c5..59aee75 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -380,18 +380,25 @@ class KUDU_EXPORT KuduTransaction :
 
   /// Commit the transaction.
   ///
-  /// This method initiates committing the transaction, and, depending on the
-  /// @c wait parameter, either returns right after that or awaits
-  /// for the commit to finalize.
+  /// This method initiates committing the transaction and then awaits
+  /// for the transaction's commit phase to finalize.
   ///
-  /// @param [in] wait
-  ///   This parameter controls the way how this method operates:
-  ///     @li @c true means synchronous operation mode
-  ///     @li @c false means asynchronous operation mode
-  ///   In case of asynchronous mode, @c KuduTransaction::IsCommitComplete()
-  ///   can be used to detect whether the commit has successfully finalized.
-  /// @return Operation result status.
-  Status Commit(bool wait = true) WARN_UNUSED_RESULT;
+  /// @return Returns @c Status::OK() if all the stages of the transaction's
+  ///   commit sequence were successful, i.e. the status of various pre-commit
+  ///   work, the status of starting the commit phase, the status of the commit
+  ///   phase itself once it's completed. Returns non-OK status of the very
+  ///   first failed stage of the transaction's commit sequence.
+  Status Commit() WARN_UNUSED_RESULT;
+
+  /// Start committing this transaction, but don't wait for the commit phase
+  /// to finalize.
+  ///
+  /// This method initiates the commit phase for this transaction, not awaiting
+  /// for the commit phase to finalize. To check for the transaction's commit
+  /// status, use the @c KuduTransaction::IsCommitComplete() method.
+  ///
+  /// @return Status of starting the commit phase for this transaction.
+  Status StartCommit() WARN_UNUSED_RESULT;
 
   /// Whether the commit has completed i.e. no longer in progress of 
finalizing.
   ///
diff --git a/src/kudu/client/transaction-internal.cc 
b/src/kudu/client/transaction-internal.cc
index f702716..7113a50 100644
--- a/src/kudu/client/transaction-internal.cc
+++ b/src/kudu/client/transaction-internal.cc
@@ -161,7 +161,7 @@ Status KuduTransaction::Data::Begin(const 
sp::shared_ptr<KuduTransaction>& txn)
   return Status::OK();
 }
 
-Status KuduTransaction::Data::Commit(bool wait) {
+Status KuduTransaction::Data::Commit(CommitMode mode) {
   DCHECK(txn_id_.IsValid());
   auto c = weak_client_.lock();
   if (!c) {
@@ -185,7 +185,7 @@ Status KuduTransaction::Data::Commit(bool wait) {
   if (resp.has_error()) {
     return StatusFromPB(resp.error().status());
   }
-  if (wait) {
+  if (mode == CommitMode::WAIT_FOR_COMPLETION) {
     RETURN_NOT_OK(WaitForTxnCommitToFinalize(c.get(), deadline, txn_id_));
   }
   return Status::OK();
diff --git a/src/kudu/client/transaction-internal.h 
b/src/kudu/client/transaction-internal.h
index 9835f4a..f4471dd 100644
--- a/src/kudu/client/transaction-internal.h
+++ b/src/kudu/client/transaction-internal.h
@@ -59,8 +59,19 @@ class KuduTransaction::Data {
   Status CreateSession(sp::shared_ptr<KuduSession>* session);
 
   Status Begin(const sp::shared_ptr<KuduTransaction>& txn);
-  Status Commit(bool wait);
+
+  // Transaction commit mode.
+  enum class CommitMode {
+    // Only start/initiate the commit phase, don't wait for the completion.
+    START_ONLY,
+
+    // Start the commit phase and wait until it succeeds or fails.
+    WAIT_FOR_COMPLETION,
+  };
+
+  Status Commit(CommitMode mode);
   Status IsCommitComplete(bool* is_complete, Status* completion_status);
+
   Status Rollback();
 
   Status Serialize(std::string* serialized_txn,
diff --git a/src/kudu/integration-tests/txn_commit-itest.cc 
b/src/kudu/integration-tests/txn_commit-itest.cc
index db2e913..5fba278 100644
--- a/src/kudu/integration-tests/txn_commit-itest.cc
+++ b/src/kudu/integration-tests/txn_commit-itest.cc
@@ -413,7 +413,7 @@ TEST_F(TxnCommitITest, 
TestCommitWhileDeletingTxnStatusManager) {
   ASSERT_OK(BeginTransaction(&txn, &txn_session));
   ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn));
 
-  ASSERT_OK(txn->Commit(/*wait*/false));
+  ASSERT_OK(txn->StartCommit());
   
ASSERT_OK(cluster_->mini_tablet_server(0)->server()->tablet_manager()->DeleteTablet(
       tsm_id_, tablet::TABLET_DATA_TOMBSTONED, boost::none));
 
@@ -429,7 +429,7 @@ TEST_F(TxnCommitITest, TestCommitAfterDeletingParticipant) {
   ASSERT_OK(BeginTransaction(&txn, &txn_session));
   ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn));
   ASSERT_OK(client_->DeleteTable(table_name_));
-  ASSERT_OK(txn->Commit(/*wait*/false));
+  ASSERT_OK(txn->StartCommit());
 
   // The transaction should eventually fail, treating the deleted participant
   // as a fatal error.
@@ -453,7 +453,7 @@ TEST_F(TxnCommitITest, 
TestCommitAfterDroppingRangeParticipant) {
   alterer->DropRangePartition(schema.NewRow(), schema.NewRow());
   alterer.reset();
 
-  ASSERT_OK(txn->Commit(/*wait*/false));
+  ASSERT_OK(txn->StartCommit());
 
   // The transaction should eventually abort, treating the deleted participant
   // as fatal, resulting in an aborted transaction.
@@ -472,7 +472,7 @@ TEST_F(TxnCommitITest, TestRestartingWhileCommitting) {
   shared_ptr<KuduSession> txn_session;
   ASSERT_OK(BeginTransaction(&txn, &txn_session));
   ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn));
-  ASSERT_OK(txn->Commit(/*wait*/false));
+  ASSERT_OK(txn->StartCommit());
   // Stop the tserver without allowing the finalize commit to complete.
   cluster_->mini_tablet_server(0)->Shutdown();
 
@@ -533,7 +533,7 @@ TEST_F(TxnCommitITest, TestAbortRacingWithBotchedCommit) {
                             { table_name_, kSecondTableName }));
   ASSERT_OK(client_->DeleteTable(kSecondTableName));
   FLAGS_txn_status_manager_inject_latency_finalize_commit_ms = 2000;
-  ASSERT_OK(txn->Commit(/*wait*/false));
+  ASSERT_OK(txn->StartCommit());
   ASSERT_OK(txn->Rollback());
   ASSERT_EVENTUALLY([&] {
     Status completion_status;
@@ -588,7 +588,7 @@ TEST_F(TxnCommitITest, 
TestRestartingWhileCommittingAndDeleting) {
                             { table_name_, kSecondTableName }));
   ASSERT_OK(client_->DeleteTable(kSecondTableName));
   FLAGS_txn_status_manager_inject_latency_finalize_commit_ms = 10000;
-  ASSERT_OK(txn->Commit(/*wait*/false));
+  ASSERT_OK(txn->StartCommit());
 
   // Shut down without giving time for the commit to complete.
   auto* mts = cluster_->mini_tablet_server(0);
@@ -674,7 +674,7 @@ TEST_F(TxnCommitITest, TestCommitAfterParticipantAbort) {
       participant_ids_[0], op_pb, MonoDelta::FromSeconds(3)));
 
   // When we try to commit, we should end up not completing.
-  ASSERT_OK(txn->Commit(/*wait*/false));
+  ASSERT_OK(txn->StartCommit());
 
   SleepFor(MonoDelta::FromSeconds(3));
   Status completion_status;
@@ -705,7 +705,7 @@ TEST_F(TxnCommitITest, TestConcurrentCommitCalls) {
   vector<Status> results(kNumTxns);
   for (int i = 0; i < kNumTxns; i++) {
     threads.emplace_back([&, i] {
-      results[i] = txns[i]->Commit(/*wait*/false);
+      results[i] = txns[i]->StartCommit();
     });
   }
   for (auto& t : threads) {
@@ -751,7 +751,7 @@ TEST_F(TxnCommitITest, TestConcurrentAbortsAndCommits) {
   for (int i = 0; i < kNumTxns; i++) {
     threads.emplace_back([&, i] {
       SleepFor(MonoDelta::FromMilliseconds(rand() % kMaxSleepMs));
-      Status s = txns[i]->Commit(/*wait*/true);
+      Status s = txns[i]->Commit();
       if (s.ok()) {
         num_committed_txns++;
       }
@@ -805,7 +805,7 @@ TEST_F(TxnCommitITest, TestConcurrentRepeatedCommitCalls) {
   vector<Status> results(kNumThreads);
   for (int i = 0; i < kNumThreads; i++) {
     threads.emplace_back([&, i] {
-      results[i] = txn->Commit(/*wait*/false);
+      results[i] = txn->StartCommit();
     });
   }
   for (auto& t : threads) {
@@ -833,7 +833,7 @@ TEST_F(TxnCommitITest, 
TestDontBackgroundAbortIfCommitInProgress) {
     shared_ptr<KuduSession> txn_session;
     ASSERT_OK(BeginTransaction(&txn, &txn_session));
     ASSERT_OK(InsertToSession(txn_session, initial_row_count_, 
kNumRowsPerTxn));
-    ASSERT_OK(txn->Commit(/*wait*/false));
+    ASSERT_OK(txn->StartCommit());
     ASSERT_OK(txn->Serialize(&serialized_txn));
   }
   // Wait a bit to allow would-be background aborts to happen.
@@ -861,7 +861,7 @@ TEST_F(TxnCommitITest, TestAbortIfCommitInProgress) {
   shared_ptr<KuduSession> txn_session;
   ASSERT_OK(BeginTransaction(&txn, &txn_session));
   ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn));
-  ASSERT_OK(txn->Commit(/*wait*/false));
+  ASSERT_OK(txn->StartCommit());
   ASSERT_OK(txn->Rollback());
   ASSERT_EVENTUALLY([&] {
     Status completion_status;
@@ -919,7 +919,7 @@ TEST_F(TwoNodeTxnCommitITest, 
TestCommitWhenParticipantsAreDown) {
   ASSERT_OK(BeginTransaction(&txn, &txn_session));
   ASSERT_OK(InsertToSession(txn_session, initial_row_count_, kNumRowsPerTxn));
   prt_ts_->Shutdown();
-  ASSERT_OK(txn->Commit(/*wait*/false));
+  ASSERT_OK(txn->StartCommit());
 
   // Since our participant is down, we can't proceed with the commit.
   Status completion_status;
@@ -965,7 +965,7 @@ TEST_F(TwoNodeTxnCommitITest, TestStartTasksDuringStartup) {
 
   // Shut down our participant's tserver so our commit task keeps retrying.
   prt_ts_->Shutdown();
-  ASSERT_OK(committed_txn->Commit(/*wait*/false));
+  ASSERT_OK(committed_txn->StartCommit());
   ASSERT_OK(aborted_txn->Rollback());
 
   Status completion_status;
@@ -1012,7 +1012,7 @@ TEST_F(TwoNodeTxnCommitITest, 
TestCommitWhileShuttingDownTxnStatusManager) {
   shared_ptr<KuduSession> txn_session;
   ASSERT_OK(BeginTransaction(&txn, &txn_session));
 
-  ASSERT_OK(txn->Commit(/*wait*/false));
+  ASSERT_OK(txn->StartCommit());
   tsm_ts_->Shutdown();
   ASSERT_OK(tsm_ts_->Restart());
 
@@ -1064,7 +1064,7 @@ TEST_F(ThreeNodeTxnCommitITest, 
TestCommitTasksReloadOnLeadershipChange) {
     ASSERT_OK(BeginTransaction(&aborted_txn, &txn_session));
     ASSERT_OK(InsertToSession(txn_session, initial_row_count_ + 
kNumRowsPerTxn, kNumRowsPerTxn));
   }
-  ASSERT_OK(committed_txn->Commit(/*wait*/ false));
+  ASSERT_OK(committed_txn->StartCommit());
   Status completion_status;
   bool is_complete = false;
   ASSERT_OK(committed_txn->IsCommitComplete(&is_complete, &completion_status));
diff --git a/src/kudu/integration-tests/txn_status_manager-itest.cc 
b/src/kudu/integration-tests/txn_status_manager-itest.cc
index baef6be..024942f 100644
--- a/src/kudu/integration-tests/txn_status_manager-itest.cc
+++ b/src/kudu/integration-tests/txn_status_manager-itest.cc
@@ -483,7 +483,7 @@ TEST_F(TxnStatusManagerITest, 
TxnKeptAliveByClientIfStatusManagerRestarted) {
 
   SleepFor(MonoDelta::FromMilliseconds(5 * kTxnKeepaliveIntervalMs));
 
-  ASSERT_OK(txn->Commit(false /* wait */));
+  ASSERT_OK(txn->StartCommit());
   NO_FATALS(cluster_->AssertNoCrashes());
 }
 
diff --git a/src/kudu/integration-tests/txn_write_ops-itest.cc 
b/src/kudu/integration-tests/txn_write_ops-itest.cc
index 1aabbc1..5c238f9 100644
--- a/src/kudu/integration-tests/txn_write_ops-itest.cc
+++ b/src/kudu/integration-tests/txn_write_ops-itest.cc
@@ -338,7 +338,7 @@ TEST_F(TxnWriteOpsITest, CommitTimestampPropagation) {
     ASSERT_EQ(0, session->CountPendingErrors());
 
     const auto ts_before_commit = client_->GetLatestObservedTimestamp();
-    ASSERT_OK(txn->Commit(false));
+    ASSERT_OK(txn->StartCommit());
     const auto ts_after_commit_async = client_->GetLatestObservedTimestamp();
     ASSERT_EQ(ts_before_commit, ts_after_commit_async);
 
@@ -475,7 +475,7 @@ TEST_F(TxnWriteOpsITest, DeadlockPrevention) {
         }
         if (!needs_retry) {
           succeeded = true;
-          ASSERT_OK(txn->Commit(/*wait*/true));
+          ASSERT_OK(txn->Commit());
         }
       }
     });
@@ -579,7 +579,7 @@ TEST_F(TxnWriteOpsITest, FrequentElections) {
           CHECK_OK(session->Apply(op.release()));
         }
         if (iter % 8 == 0) {
-          CHECK_OK(txn->Commit(false/*wait*/));
+          CHECK_OK(txn->StartCommit());
           row_count += kNumRowsPerTxn;
         } else {
           CHECK_OK(txn->Rollback());
@@ -1881,7 +1881,7 @@ TEST_F(TxnOpDispatcherITest, 
CommitWithWriteOpPendingParticipantNotYetRegistered
     SleepFor(MonoDelta::FromMilliseconds(kDelayMs));
     // Initiate committing the transaction after the delay, but don't wait
     // for the commit to finalize.
-    commit_init_status = txn->Commit(false/*wait*/);
+    commit_init_status = txn->StartCommit();
   });
   auto cleanup = MakeScopedCleanup([&]() {
     committer.join();
@@ -1938,7 +1938,7 @@ TEST_F(TxnOpDispatcherITest, 
CommitWithWriteOpPendingParticipantRegistered) {
     SleepFor(MonoDelta::FromMilliseconds(kDelayMs));
     // Initiate committing the transaction after the delay, but don't wait
     // for the commit to finalize.
-    commit_init_status = txn->Commit(false/*wait*/);
+    commit_init_status = txn->StartCommit();
   });
   auto cleanup = MakeScopedCleanup([&]() {
     committer.join();

Reply via email to