This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 45ca93f KUDU-2612 automatically flush sessions on txn commit
45ca93f is described below
commit 45ca93f0ef90a8524d0fcd9c3e56e10d5328ca24
Author: Alexey Serbin <[email protected]>
AuthorDate: Wed May 12 00:34:48 2021 -0700
KUDU-2612 automatically flush sessions on txn commit
With this patch, all transactional sessions created off a transction
handle are automatically flushed upon calling Commit() on the handle.
As for the KuduTransaction::StartCommit() method, it's now necessary
to flush all the transactional sessions created off the transaction
handle before calling the method, otherwise Status::IllegalState()
would be returned.
As Andrew and I discussed offline, it might be an option to return an
error from KuduSession::Apply() for a transactional session whose
transaction has already started committing. However, after looking at
this closer, I realized that it would require either an atomic or an
extra synchronization primitive, bringing more complexity into the hot
path of applying write operations in the context of a session. So,
I opted not to perform the consistency check as a part of the
KuduSession::Apply() method, rather relying on the logic of
KuduSession::Flush() and KuduSession::FlushAsync() methods instead.
Another design detail worth pointing at is that a KuduTransaction handle
keeps shared, not weak pointers to transaction sessions originated off
the handle (I did several back-and-forth iterations on this, though).
Even if using shared_ptr, not weak_ptr, no circular dependencies are
introduced since a transactional session doesn't keep a reference
to the corresponding transactional handle. The shared_ptr-based
approach looks better than one with weak_ptr because
(1) It might prevent a data loss due to a mistake in an application
code, and it takes time to find and fix those.
(2) It looks more portable and consistent if thinking about similar
functionality to implement in the Java client.
This patch also contains several test scenarios for the newly introduced
functionality.
Change-Id: I2480129a99fb19d16868e14f9b9e33c83e3d8e7f
Reviewed-on: http://gerrit.cloudera.org:8080/17431
Tested-by: Alexey Serbin <[email protected]>
Reviewed-by: Andrew Wong <[email protected]>
---
src/kudu/client/client-test.cc | 559 +++++++++++++++++++++-
src/kudu/client/client.h | 28 +-
src/kudu/client/transaction-internal.cc | 36 +-
src/kudu/client/transaction-internal.h | 15 +
src/kudu/integration-tests/txn_write_ops-itest.cc | 82 ++--
5 files changed, 658 insertions(+), 62 deletions(-)
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index 380bc3a..72725af 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -876,6 +876,22 @@ class ClientTest : public KuduTest {
return Status::OK();
}
+ static string FlushModeToString(KuduSession::FlushMode mode) {
+ string mode_str = "UNKNOWN";
+ switch (mode) {
+ case KuduSession::AUTO_FLUSH_BACKGROUND:
+ mode_str = "AUTO_FLUSH_BACKGROND";
+ break;
+ case KuduSession::AUTO_FLUSH_SYNC:
+ mode_str = "AUTO_FLUSH_SYNC";
+ break;
+ case KuduSession::MANUAL_FLUSH:
+ mode_str = "MANUAL_FLUSH";
+ break;
+ }
+ return mode_str;
+ }
+
enum WhichServerToKill {
DEAD_MASTER,
DEAD_TSERVER
@@ -3732,20 +3748,7 @@ void ClientTest::TimeInsertOpBatch(
size_t run_num,
const vector<size_t>& string_sizes,
CpuTimes* elapsed) {
-
- string mode_str = "unknown";
- switch (mode) {
- case KuduSession::AUTO_FLUSH_BACKGROUND:
- mode_str = "AUTO_FLUSH_BACKGROND";
- break;
- case KuduSession::AUTO_FLUSH_SYNC:
- mode_str = "AUTO_FLUSH_SYNC";
- break;
- case KuduSession::MANUAL_FLUSH:
- mode_str = "MANUAL_FLUSH";
- break;
- }
-
+ const string mode_str = FlushModeToString(mode);
const size_t row_num = string_sizes.size();
shared_ptr<KuduSession> session(client_->NewSession());
ASSERT_OK(session->SetMutationBufferSpace(buffer_size));
@@ -7341,6 +7344,534 @@ TEST_F(ClientTest, TxnCommit) {
}
}
+// Make sure transactional sessions are automatically flushed upon committing
+// the corresponding transaction.
+TEST_F(ClientTest, FlushTxnSessionsOnCommit) {
+ constexpr auto kNumRows = 64;
+ auto rows_inserted = 0;
+
+ for (auto mode : { KuduSession::AUTO_FLUSH_BACKGROUND,
+ KuduSession::AUTO_FLUSH_SYNC,
+ KuduSession::MANUAL_FLUSH, }) {
+ SCOPED_TRACE(Substitute("session flush mode $0", FlushModeToString(mode)));
+ shared_ptr<KuduTransaction> txn;
+ ASSERT_OK(client_->NewTransaction(&txn));
+ shared_ptr<KuduSession> session;
+ ASSERT_OK(txn->CreateSession(&session));
+ ASSERT_NE(nullptr, session.get());
+ ASSERT_OK(session->SetFlushMode(mode));
+
+ NO_FATALS(InsertTestRows(
+ client_table_.get(), session.get(), kNumRows, rows_inserted));
+ rows_inserted += kNumRows;
+
+ if (mode == KuduSession::MANUAL_FLUSH) {
+ // In AUTO_FLUSH_SYNC there will be no pending operations since ops are
+ // flushed upon KuduSession::Apply(). Due to scheduling anomalies,
+ // in AUTO_FLUSH_BACKGROUND there might be a case when the background
+ // flushing thread flushes the accumulated operations if the main test
+ // thread is put off CPU for a long enough time: to avoid flakiness,
+ // check for the pending operations only in case of MANUAL_FLUSH.
+ ASSERT_TRUE(session->HasPendingOperations());
+ }
+ ASSERT_OK(txn->Commit());
+ ASSERT_FALSE(session->HasPendingOperations());
+ ASSERT_EQ(0, session->CountPendingErrors());
+ // Just in case, check how it works when closing the session explicitly
+ // after it's been automatically flushed by committing its transaction.
+ ASSERT_OK(session->Close());
+
+ const auto row_count = CountRowsFromClient(client_table_.get(),
+ KuduClient::LEADER_ONLY,
+ KuduScanner::READ_YOUR_WRITES);
+ ASSERT_EQ(rows_inserted, row_count);
+ }
+
+ // Make sure that all the sessions originated from a transaction are flushed
+ // automatically upon committing a transaction, even if there are many.
+ {
+ shared_ptr<KuduTransaction> txn;
+ ASSERT_OK(client_->NewTransaction(&txn));
+
+ vector<shared_ptr<KuduSession>> sessions;
+ for (auto i = 0; i < 10; ++i) {
+ shared_ptr<KuduSession> session;
+ ASSERT_OK(txn->CreateSession(&session));
+ ASSERT_NE(nullptr, session.get());
+ ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
+
+ NO_FATALS(InsertTestRows(
+ client_table_.get(), session.get(), kNumRows, rows_inserted));
+ rows_inserted += kNumRows;
+ ASSERT_TRUE(session->HasPendingOperations());
+
+ sessions.emplace_back(std::move(session));
+ }
+
+ ASSERT_OK(txn->Commit());
+ for (const auto& session : sessions) {
+ ASSERT_FALSE(session->HasPendingOperations());
+ ASSERT_EQ(0, session->CountPendingErrors());
+ }
+
+ const auto row_count = CountRowsFromClient(client_table_.get(),
+ KuduClient::LEADER_ONLY,
+ KuduScanner::READ_YOUR_WRITES);
+ ASSERT_EQ(rows_inserted, row_count);
+ }
+
+ // A sub-scenario where the handle of a non-flushed session has gone
+ // out of the scope when its originating transaction starts committing.
+ // This is scenario is added to be explicit on what happens in such a case.
+ {
+ shared_ptr<KuduTransaction> txn;
+ ASSERT_OK(client_->NewTransaction(&txn));
+ {
+ shared_ptr<KuduSession> session;
+ ASSERT_OK(txn->CreateSession(&session));
+ ASSERT_NE(nullptr, session.get());
+ ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
+ NO_FATALS(InsertTestRows(
+ client_table_.get(), session.get(), kNumRows, rows_inserted));
+ rows_inserted += kNumRows;
+ ASSERT_TRUE(session->HasPendingOperations());
+ }
+ ASSERT_OK(txn->Commit());
+ const auto row_count = CountRowsFromClient(client_table_.get(),
+ KuduClient::LEADER_ONLY,
+ KuduScanner::READ_YOUR_WRITES);
+ // The rows inserted above should be there even if the session handle went
+ // out of the scope.
+ ASSERT_EQ(rows_inserted, row_count);
+ }
+}
+
+// Make sure it's possible to retry committing a transaction with rows from
+// multiple sessions, even if one session failed to flush its rows on the first
+// commit attempt. Since the first attempt to commit failed due to an error
+// while flushing a session's pending operations, the transaction is still
open.
+// It should be still possible to insert more rows and commit those rows
+// originated from already existing transactional sessions.
+TEST_F(ClientTest, TxnRetryCommitAfterSessionFlushErrors) {
+ shared_ptr<KuduTransaction> txn;
+ ASSERT_OK(client_->NewTransaction(&txn));
+
+ shared_ptr<KuduSession> session;
+ ASSERT_OK(txn->CreateSession(&session));
+ ASSERT_NE(nullptr, session.get());
+ ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
+
+ NO_FATALS(InsertTestRows(client_table_.get(), session.get(), 1));
+ auto rows_inserted = 1;
+
+ // Try to insert a row with the same key. With a duplicate row, at attempt
+ // to flush this session should fail.
+ NO_FATALS(InsertTestRows(client_table_.get(), session.get(), 1));
+ ASSERT_TRUE(session->HasPendingOperations());
+
+ // Attempt to commit the transaction.
+ const auto s = txn->Commit();
+ const auto errmsg = s.ToString();
+ ASSERT_TRUE(s.IsIOError()) << errmsg;
+ ASSERT_STR_MATCHES(errmsg, "Some errors occurred");
+
+ ASSERT_FALSE(session->HasPendingOperations());
+ ASSERT_EQ(1, session->CountPendingErrors());
+ {
+ vector<KuduError*> errors;
+ ElementDeleter drop(&errors);
+ session->GetPendingErrors(&errors, nullptr);
+ ASSERT_EQ(1, errors.size());
+ EXPECT_TRUE(errors[0]->status().IsAlreadyPresent());
+ }
+
+ // Nothing is committed yet.
+ auto row_count = CountRowsFromClient(client_table_.get(),
+ KuduClient::LEADER_ONLY,
+ KuduScanner::READ_YOUR_WRITES);
+ ASSERT_EQ(0, row_count);
+
+ // However, it's not possible to start another transactional session once
+ // KuduTransaction::{Commit,StartCommit}() has already been called.
+ {
+ shared_ptr<KuduSession> null_session;
+ const auto s = txn->CreateSession(&null_session);
+ ASSERT_EQ(nullptr, null_session.get());
+ }
+
+ // Retrying the commit with one row from the first session and extra rows
+ // from the new session. Since the first attempt to commit failed due
+ // to an error while flushing first session's pending operations,
+ // the transaction is still open. It should be still possible to insert more
+ // rows and commit those along with the rows successfully flushed in the
+ // context of the first session.
+ NO_FATALS(InsertTestRows(client_table_.get(), session.get(), 10, 1));
+ rows_inserted += 10;
+ ASSERT_OK(txn->Commit());
+
+ // There should be no pending operations for either session. All pending
+ // errors have been handled for the first session, no new ones should appear.
+ ASSERT_FALSE(session->HasPendingOperations());
+ ASSERT_EQ(0, session->CountPendingErrors());
+
+ row_count = CountRowsFromClient(client_table_.get(),
+ KuduClient::LEADER_ONLY,
+ KuduScanner::READ_YOUR_WRITES);
+ ASSERT_EQ(rows_inserted, row_count);
+}
+
+// Make sure KuduTransaction::StartCommit() succeeds when called on
+// a transaction handle which has all of its transactional sessions flushed.
+TEST_F(ClientTest, StartCommitWithFlushedTxnSessions) {
+ constexpr auto kNumRows = 1024;
+ auto rows_inserted = 0;
+
+ for (auto mode : { KuduSession::AUTO_FLUSH_BACKGROUND,
+ KuduSession::AUTO_FLUSH_SYNC,
+ KuduSession::MANUAL_FLUSH, }) {
+ SCOPED_TRACE(Substitute("session flush mode $0", FlushModeToString(mode)));
+ shared_ptr<KuduTransaction> txn;
+ ASSERT_OK(client_->NewTransaction(&txn));
+ shared_ptr<KuduSession> session;
+ ASSERT_OK(txn->CreateSession(&session));
+ ASSERT_NE(nullptr, session.get());
+ ASSERT_OK(session->SetFlushMode(mode));
+
+ NO_FATALS(InsertTestRows(
+ client_table_.get(), session.get(), kNumRows, rows_inserted));
+ rows_inserted += kNumRows;
+
+ ASSERT_OK(session->Flush());
+ ASSERT_FALSE(session->HasPendingOperations());
+ ASSERT_EQ(0, session->CountPendingErrors());
+ ASSERT_OK(txn->StartCommit());
+
+ // Wait for the transaction to finalize its commit phase.
+ ASSERT_EVENTUALLY([&] {
+ bool is_complete = false;
+ Status commit_status;
+ ASSERT_OK(txn->IsCommitComplete(&is_complete, &commit_status));
+ ASSERT_OK(commit_status);
+ ASSERT_TRUE(is_complete);
+ });
+
+ const auto row_count = CountRowsFromClient(client_table_.get(),
+ KuduClient::LEADER_ONLY,
+ KuduScanner::READ_YOUR_WRITES);
+ ASSERT_EQ(rows_inserted, row_count);
+ }
+
+ // A sub-scenario where the handle of an already flushed session has gone
+ // out of the scope when its originating transaction starts committing.
+ {
+ shared_ptr<KuduTransaction> txn;
+ ASSERT_OK(client_->NewTransaction(&txn));
+ {
+ shared_ptr<KuduSession> session;
+ ASSERT_OK(txn->CreateSession(&session));
+ ASSERT_NE(nullptr, session.get());
+ ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
+ NO_FATALS(InsertTestRows(
+ client_table_.get(), session.get(), kNumRows, rows_inserted));
+ rows_inserted += kNumRows;
+ ASSERT_OK(session->Flush());
+ ASSERT_FALSE(session->HasPendingOperations());
+ }
+ ASSERT_OK(txn->StartCommit());
+
+ // Wait for the transaction to finalize its commit phase.
+ ASSERT_EVENTUALLY([&] {
+ bool is_complete = false;
+ Status commit_status;
+ ASSERT_OK(txn->IsCommitComplete(&is_complete, &commit_status));
+ ASSERT_OK(commit_status);
+ ASSERT_TRUE(is_complete);
+ });
+
+ const auto row_count = CountRowsFromClient(client_table_.get(),
+ KuduClient::LEADER_ONLY,
+ KuduScanner::READ_YOUR_WRITES);
+ ASSERT_EQ(rows_inserted, row_count);
+ }
+}
+
+// Check the behavior of KuduTransaction::StartCommit() when there are
+// transactional non-flushed sessions started off a transaction handle.
+TEST_F(ClientTest, TxnNonFlushedSessionsOnStartCommit) {
+ constexpr auto kNumRows = 1024;
+ auto rows_inserted = 0;
+
+ // It should not be possible to call KuduTransaction::StartCommit()
+ // when there are transactional sessions with pending write operations.
+ {
+ shared_ptr<KuduTransaction> txn;
+ ASSERT_OK(client_->NewTransaction(&txn));
+ shared_ptr<KuduSession> session;
+ ASSERT_OK(txn->CreateSession(&session));
+ ASSERT_NE(nullptr, session.get());
+ ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
+
+ NO_FATALS(InsertTestRows(
+ client_table_.get(), session.get(), kNumRows, rows_inserted));
+ rows_inserted += kNumRows;
+
+ ASSERT_TRUE(session->HasPendingOperations());
+ ASSERT_EQ(0, session->CountPendingErrors());
+ const auto s = txn->StartCommit();
+ const auto errmsg = s.ToString();
+ ASSERT_TRUE(s.IsIllegalState()) << errmsg;
+ ASSERT_STR_MATCHES(errmsg,
+ "cannot start committing transaction: "
+ "at least one transactional session has write operations pending");
+ }
+
+ // A sub-scenario where the handle of a non-flushed session has gone
+ // out of the scope when its originating transaction starts committing.
+ {
+ shared_ptr<KuduTransaction> txn;
+ ASSERT_OK(client_->NewTransaction(&txn));
+ {
+ shared_ptr<KuduSession> session;
+ ASSERT_OK(txn->CreateSession(&session));
+ ASSERT_NE(nullptr, session.get());
+ ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
+ NO_FATALS(InsertTestRows(
+ client_table_.get(), session.get(), kNumRows, rows_inserted));
+ ASSERT_TRUE(session->HasPendingOperations());
+ }
+ const auto s = txn->StartCommit();
+ const auto errmsg = s.ToString();
+ ASSERT_TRUE(s.IsIllegalState()) << errmsg;
+ ASSERT_STR_MATCHES(errmsg,
+ "cannot start committing transaction: "
+ "at least one transactional session has write operations pending");
+ }
+}
+
+// Verify the behavior of KuduTransaction::CreateSession() when the commit
+// process has already been started for the corresponding transaction.
+TEST_F(ClientTest, TxnCreateSessionAfterCommit) {
+ // An empty transaction case.
+ {
+ shared_ptr<KuduTransaction> txn;
+ ASSERT_OK(client_->NewTransaction(&txn));
+ ASSERT_OK(txn->Commit());
+
+ shared_ptr<KuduSession> session;
+ const auto s = txn->CreateSession(&session);
+ const auto errmsg = s.ToString();
+ ASSERT_TRUE(s.IsIllegalState()) << errmsg;
+ ASSERT_STR_MATCHES(errmsg, "transaction commit has already started");
+ ASSERT_EQ(nullptr, session.get());
+ }
+
+ // A non-empty transaction case: a transaction with one empty session.
+ {
+ shared_ptr<KuduTransaction> txn;
+ ASSERT_OK(client_->NewTransaction(&txn));
+ {
+ shared_ptr<KuduSession> session;
+ ASSERT_OK(txn->CreateSession(&session));
+ ASSERT_NE(nullptr, session.get());
+ ASSERT_OK(txn->Commit());
+ ASSERT_FALSE(session->HasPendingOperations());
+ ASSERT_EQ(0, session->CountPendingErrors());
+ }
+ const auto row_count = CountRowsFromClient(client_table_.get(),
+ KuduClient::LEADER_ONLY,
+ KuduScanner::READ_YOUR_WRITES);
+ ASSERT_EQ(0, row_count);
+
+ // Now attempt to create a new session based off the committed transaction.
+ {
+ shared_ptr<KuduSession> session;
+ const auto s = txn->CreateSession(&session);
+ const auto errmsg = s.ToString();
+ ASSERT_TRUE(s.IsIllegalState()) << errmsg;
+ ASSERT_STR_MATCHES(errmsg, "transaction commit has already started");
+ ASSERT_EQ(nullptr, session.get());
+ }
+ }
+
+ // A non-empty transaction case: a transaction with a session that inserts
+ // at least one row in a table.
+ {
+ constexpr auto kNumRows = 1;
+
+ shared_ptr<KuduTransaction> txn;
+ ASSERT_OK(client_->NewTransaction(&txn));
+ shared_ptr<KuduSession> session;
+ ASSERT_OK(txn->CreateSession(&session));
+ ASSERT_NE(nullptr, session.get());
+ ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
+ NO_FATALS(InsertTestRows(client_table_.get(), session.get(), kNumRows));
+ ASSERT_OK(txn->Commit());
+
+ ASSERT_FALSE(session->HasPendingOperations());
+ ASSERT_EQ(0, session->CountPendingErrors());
+ const auto row_count = CountRowsFromClient(client_table_.get(),
+ KuduClient::LEADER_ONLY,
+ KuduScanner::READ_YOUR_WRITES);
+ ASSERT_EQ(kNumRows, row_count);
+
+ // Now try to create a new session based off the committed transaction.
+ {
+ shared_ptr<KuduSession> session;
+ const auto s = txn->CreateSession(&session);
+ const auto errmsg = s.ToString();
+ ASSERT_TRUE(s.IsIllegalState()) << errmsg;
+ ASSERT_STR_MATCHES(errmsg, "transaction commit has already started");
+ ASSERT_EQ(nullptr, session.get());
+ }
+ }
+}
+
+// Similar to the TxnCreateSessionAfterCommit scenario above, but calls
+// KuduTransaction::StartCommit() instead of KuduTransaction::Commit().
+TEST_F(ClientTest, TxnCreateSessionAfterStartCommit) {
+ // An empty transaction case.
+ {
+ shared_ptr<KuduTransaction> txn;
+ ASSERT_OK(client_->NewTransaction(&txn));
+ ASSERT_OK(txn->StartCommit());
+
+ shared_ptr<KuduSession> session;
+ const auto s = txn->CreateSession(&session);
+ const auto errmsg = s.ToString();
+ ASSERT_TRUE(s.IsIllegalState()) << errmsg;
+ ASSERT_STR_MATCHES(errmsg, "transaction commit has already started");
+ ASSERT_EQ(nullptr, session.get());
+ }
+
+ // A non-empty transaction case: a transaction with one empty session.
+ {
+ shared_ptr<KuduTransaction> txn;
+ ASSERT_OK(client_->NewTransaction(&txn));
+ {
+ shared_ptr<KuduSession> session;
+ ASSERT_OK(txn->CreateSession(&session));
+ ASSERT_NE(nullptr, session.get());
+ ASSERT_OK(txn->StartCommit());
+ }
+
+ // Now attempt to create a new session based off the transaction that
+ // has started committing.
+ {
+ shared_ptr<KuduSession> session;
+ const auto s = txn->CreateSession(&session);
+ const auto errmsg = s.ToString();
+ ASSERT_TRUE(s.IsIllegalState()) << errmsg;
+ ASSERT_STR_MATCHES(errmsg, "transaction commit has already started");
+ ASSERT_EQ(nullptr, session.get());
+ }
+
+ // Wait for the transaction to finalize its commit phase.
+ ASSERT_EVENTUALLY([&] {
+ bool is_complete = false;
+ Status commit_status;
+ ASSERT_OK(txn->IsCommitComplete(&is_complete, &commit_status));
+ ASSERT_OK(commit_status);
+ ASSERT_TRUE(is_complete);
+ });
+ const auto row_count = CountRowsFromClient(client_table_.get(),
+ KuduClient::LEADER_ONLY,
+ KuduScanner::READ_YOUR_WRITES);
+ ASSERT_EQ(0, row_count);
+ }
+
+ // A non-empty transaction case: a transaction with a session that inserts
+ // at least one row in a table.
+ {
+ constexpr auto kNumRows = 1;
+
+ shared_ptr<KuduTransaction> txn;
+ ASSERT_OK(client_->NewTransaction(&txn));
+ shared_ptr<KuduSession> session;
+ ASSERT_OK(txn->CreateSession(&session));
+ ASSERT_NE(nullptr, session.get());
+ ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
+ NO_FATALS(InsertTestRows(client_table_.get(), session.get(), kNumRows));
+ ASSERT_OK(session->Flush());
+ ASSERT_FALSE(session->HasPendingOperations());
+ ASSERT_OK(txn->StartCommit());
+
+ // Now try to create a new session based off the committed transaction.
+ {
+ shared_ptr<KuduSession> session;
+ const auto s = txn->CreateSession(&session);
+ const auto errmsg = s.ToString();
+ ASSERT_TRUE(s.IsIllegalState()) << errmsg;
+ ASSERT_STR_MATCHES(errmsg, "transaction commit has already started");
+ ASSERT_EQ(nullptr, session.get());
+ }
+
+ // Wait for the transaction to finalize its commit phase.
+ ASSERT_EVENTUALLY([&] {
+ bool is_complete = false;
+ Status commit_status;
+ ASSERT_OK(txn->IsCommitComplete(&is_complete, &commit_status));
+ ASSERT_OK(commit_status);
+ ASSERT_TRUE(is_complete);
+ });
+ ASSERT_EQ(0, session->CountPendingErrors());
+ const auto row_count = CountRowsFromClient(client_table_.get(),
+ KuduClient::LEADER_ONLY,
+ KuduScanner::READ_YOUR_WRITES);
+ ASSERT_EQ(kNumRows, row_count);
+ }
+}
+
+// A test scenario to verify the behavior of the client API when a write
+// operation submitted into a transaction session after the transaction
+// has been committed.
+TEST_F(ClientTest, SubmitWriteOpAfterTxnCommit) {
+ constexpr auto kNumRows = 1024;
+ auto rows_inserted = 0;
+ {
+ shared_ptr<KuduTransaction> txn;
+ ASSERT_OK(client_->NewTransaction(&txn));
+ shared_ptr<KuduSession> session;
+ ASSERT_OK(txn->CreateSession(&session));
+ ASSERT_NE(nullptr, session.get());
+ ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
+
+ NO_FATALS(InsertTestRows(
+ client_table_.get(), session.get(), kNumRows, rows_inserted));
+ rows_inserted += kNumRows;
+ ASSERT_TRUE(session->HasPendingOperations());
+ ASSERT_OK(txn->Commit());
+ ASSERT_FALSE(session->HasPendingOperations());
+ ASSERT_EQ(0, session->CountPendingErrors());
+
+ auto row_count = CountRowsFromClient(client_table_.get(),
+ KuduClient::LEADER_ONLY,
+ KuduScanner::READ_YOUR_WRITES);
+ ASSERT_EQ(kNumRows, row_count);
+
+ // Try to insert one more row.
+ NO_FATALS(InsertTestRows(
+ client_table_.get(), session.get(), 1, rows_inserted));
+ const auto s = session->Flush();
+ ASSERT_TRUE(s.IsIOError()) << s.ToString();
+ {
+ vector<KuduError*> errors;
+ ElementDeleter drop(&errors);
+ session->GetPendingErrors(&errors, nullptr);
+ ASSERT_EQ(1, errors.size());
+ const auto& s = errors[0]->status();
+ const auto errmsg = s.ToString();
+ ASSERT_TRUE(s.IsIllegalState()) << errmsg;
+ ASSERT_STR_MATCHES(errmsg, "transaction ID .* not open: COMMITTED");
+ }
+
+ // Row count stays the same as before, of course.
+ row_count = CountRowsFromClient(client_table_.get(),
+ KuduClient::LEADER_ONLY,
+ KuduScanner::READ_YOUR_WRITES);
+ ASSERT_EQ(kNumRows, row_count);
+ }
+}
+
// This test verifies the behavior of KuduTransaction instance when the bound
// KuduClient instance gets out of scope.
TEST_F(ClientTest, TxnHandleLifecycle) {
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index 8742738..da8c26f 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -380,8 +380,14 @@ class KUDU_EXPORT KuduTransaction :
/// Commit the transaction.
///
- /// This method initiates committing the transaction and then awaits
- /// for the transaction's commit phase to finalize.
+ /// This method automatically flushes all transactional sessions created off
+ /// this transaction handle via @c KuduTransaction::CreateSession(),
initiates
+ /// committing the transaction, and then waits for the commit phase to
+ /// finalize. The flushing of all the derivative transactional sessions helps
+ /// avoiding unintentional data loss when those sessions are not flushed
+ /// explicitly before committing. No new operations should be pushed into the
+ /// derivative transactional sessions created off this handle
+ /// once the method has been called.
///
/// @return Returns @c Status::OK() if all the stages of the transaction's
/// commit sequence were successful, i.e. the status of various pre-commit
@@ -393,11 +399,17 @@ class KUDU_EXPORT KuduTransaction :
/// Start committing this transaction, but don't wait for the commit phase
/// to finalize.
///
- /// This method initiates the commit phase for this transaction, not awaiting
- /// for the commit phase to finalize. To check for the transaction's commit
- /// status, use the @c KuduTransaction::IsCommitComplete() method.
- ///
- /// @return Status of starting the commit phase for this transaction.
+ /// This method initiates the commit phase for this transaction, not waiting
+ /// for the commit phase to finalize. It requires all the transactional
+ /// sessions created off this handle via @c KuduTransaction::CreateSession()
+ /// to be flushed already. No new operations should be pushed into the
+ /// derivative transactional sessions created off this handle once the method
+ /// has been called. To check for the transaction's commit status, use the
+ /// @c KuduTransaction::IsCommitComplete() method.
+ ///
+ /// @return Status of starting the commit phase for this transaction if all
+ /// the transactional sessions created off this handle are flushed,
+ /// otherwise returns @c Status::IllegalState().
Status StartCommit() WARN_UNUSED_RESULT;
/// Whether the commit has completed i.e. no longer in progress of
finalizing.
@@ -2269,7 +2281,7 @@ class KUDU_EXPORT KuduSession : public
sp::enable_shared_from_this<KuduSession>
/// Flush any pending writes.
///
/// This method initiates flushing of the current batch of buffered
- /// write operations, if any, and then awaits for completion of all
+ /// write operations, if any, and then waits for the completion of all
/// pending operations of the session. I.e., after successful return
/// from this method no pending operations should be left in the session.
///
diff --git a/src/kudu/client/transaction-internal.cc
b/src/kudu/client/transaction-internal.cc
index 7113a50..3960aed 100644
--- a/src/kudu/client/transaction-internal.cc
+++ b/src/kudu/client/transaction-internal.cc
@@ -20,6 +20,7 @@
#include <algorithm>
#include <functional>
#include <memory>
+#include <mutex>
#include <ostream>
#include <string>
@@ -75,7 +76,8 @@ MonoTime GetRpcDeadline(const KuduClient* c) {
KuduTransaction::Data::Data(const sp::shared_ptr<KuduClient>& client)
: weak_client_(client),
- txn_keep_alive_ms_(0) {
+ txn_keep_alive_ms_(0),
+ commit_started_(false) {
CHECK(client);
}
@@ -112,6 +114,18 @@ Status
KuduTransaction::Data::CreateSession(sp::shared_ptr<KuduSession>* session
// there isn't much sense duplicating that at the client side.
sp::shared_ptr<KuduSession> ret(new KuduSession(c, txn_id_));
ret->data_->Init(ret);
+
+ {
+ std::lock_guard<simple_spinlock> l(commit_started_lock_);
+ if (PREDICT_FALSE(commit_started_)) {
+ return Status::IllegalState("transaction commit has already started");
+ }
+ // Store the information about the newly created session: this is needed
+ // to automatically flush every associated session upon call of
+ // KuduTransaction::Commit() for the corresponding transaction handle.
+ txn_sessions_.emplace_back(ret);
+ }
+
*session = std::move(ret);
return Status::OK();
}
@@ -168,6 +182,26 @@ Status KuduTransaction::Data::Commit(CommitMode mode) {
return Status::IllegalState("associated KuduClient is gone");
}
+ {
+ std::lock_guard<simple_spinlock> l(commit_started_lock_);
+ commit_started_ = true;
+ }
+ // In case of 'synchronous' commit mode (i.e. if waiting for the transaction
+ // to finalize), flush all transactional sessions created out of this handle.
+ // In case of 'asynchronous' commit mode, make sure no transactional session
+ // contains pending operations.
+ for (auto& session : txn_sessions_) {
+ if (mode == CommitMode::WAIT_FOR_COMPLETION) {
+ RETURN_NOT_OK(session->Flush());
+ } else {
+ if (session->HasPendingOperations()) {
+ return Status::IllegalState(
+ "cannot start committing transaction: at least one transactional "
+ "session has write operations pending");
+ }
+ }
+ }
+
const auto deadline = GetRpcDeadline(c.get());
CommitTransactionResponsePB resp;
{
diff --git a/src/kudu/client/transaction-internal.h
b/src/kudu/client/transaction-internal.h
index f4471dd..fc1de3f 100644
--- a/src/kudu/client/transaction-internal.h
+++ b/src/kudu/client/transaction-internal.h
@@ -17,6 +17,7 @@
#pragma once
#include <cstdint>
+#include <list>
#include <memory>
#include <string>
@@ -24,6 +25,7 @@
#include "kudu/client/shared_ptr.h" // IWYU pragma: keep
#include "kudu/common/txn_id.h"
#include "kudu/gutil/macros.h"
+#include "kudu/util/locks.h"
#include "kudu/util/monotime.h"
#include "kudu/util/status.h"
@@ -111,6 +113,19 @@ class KuduTransaction::Data {
uint32_t txn_keep_alive_ms_;
TxnId txn_id_;
+ // Sessions created in the context of this transaction. This per-transaction
+ // information is retained to flush all sessions created in the context
+ // of this transaction upon calling Commit().
+ std::list<sp::shared_ptr<KuduSession>> txn_sessions_;
+
+ // Whether the commit of the transaction identified by 'txn_id_' is about
+ // to start or has already started. Guarded by 'commit_started_lock_'.
+ bool commit_started_;
+
+ // Protects against concurrent access to 'commit_started_' and
+ // 'txn_sessions_' above.
+ simple_spinlock commit_started_lock_;
+
DISALLOW_COPY_AND_ASSIGN(Data);
};
diff --git a/src/kudu/integration-tests/txn_write_ops-itest.cc
b/src/kudu/integration-tests/txn_write_ops-itest.cc
index 6acfbaf..319d216 100644
--- a/src/kudu/integration-tests/txn_write_ops-itest.cc
+++ b/src/kudu/integration-tests/txn_write_ops-itest.cc
@@ -1561,9 +1561,19 @@ TEST_F(TxnOpDispatcherITest, ErrorInProcessingWriteOp) {
ASSERT_EQ(2, GetTxnOpDispatchersTotalCount());
// Try to insert rows with duplicate keys.
- int64_t duplicate_key = 0;
- s = InsertRows(txn.get(), kNumRows, &duplicate_key);
- ASSERT_TRUE(s.IsIOError()) << s.ToString();
+ {
+ int64_t duplicate_key = 0;
+ shared_ptr<KuduSession> session;
+ s = InsertRows(txn.get(), kNumRows, &duplicate_key, &session);
+ ASSERT_TRUE(s.IsIOError()) << s.ToString();
+ vector<KuduError*> errors;
+ ElementDeleter drop(&errors);
+ session->GetPendingErrors(&errors, nullptr);
+ for (const auto* e : errors) {
+ const auto& s = e->status();
+ EXPECT_TRUE(s.IsAlreadyPresent()) << s.ToString();
+ }
+ }
// Same TxnOpDispatchers should be handling all the write operations.
ASSERT_EQ(2, GetTxnOpDispatchersTotalCount());
@@ -1888,11 +1898,11 @@ TEST_F(TxnOpDispatcherITest,
DuplicateTxnParticipantRegistration) {
ASSERT_EQ(0, GetTxnOpDispatchersTotalCount());
}
-// This scenario exercises the case when a request to commit a transaction
+// This scenario exercises the case when a request to rollback a transaction
// arrives while TxnOpDispatcher still has pending write requests in its queue.
-// Neither the registration of the txn participant is complete nor BEGIN_TXN is
-// sent when client issues the commit request.
-TEST_F(TxnOpDispatcherITest,
CommitWithWriteOpPendingParticipantNotYetRegistered) {
+// The registration of the txn participant is not yet complete when the
rollback
+// request arrives.
+TEST_F(TxnOpDispatcherITest,
RollbackWriteOpPendingParticipantNotYetRegistered) {
SKIP_IF_SLOW_NOT_ALLOWED();
constexpr auto kDelayMs = 1000;
@@ -1903,15 +1913,13 @@ TEST_F(TxnOpDispatcherITest,
CommitWithWriteOpPendingParticipantNotYetRegistered
shared_ptr<KuduTransaction> txn;
ASSERT_OK(client_->NewTransaction(&txn));
- Status commit_init_status;
- thread committer([&txn, &commit_init_status]{
+ Status rollback_status;
+ thread rollback([&txn, &rollback_status]{
SleepFor(MonoDelta::FromMilliseconds(kDelayMs));
- // Initiate committing the transaction after the delay, but don't wait
- // for the commit to finalize.
- commit_init_status = txn->StartCommit();
+ rollback_status = txn->Rollback();
});
auto cleanup = MakeScopedCleanup([&]() {
- committer.join();
+ rollback.join();
});
shared_ptr<KuduSession> session;
@@ -1921,15 +1929,19 @@ TEST_F(TxnOpDispatcherITest,
CommitWithWriteOpPendingParticipantNotYetRegistered
const auto row_status = GetSingleRowError(session.get());
ASSERT_TRUE(row_status.IsIllegalState()) << s.ToString();
- committer.join();
+ rollback.join();
cleanup.cancel();
- ASSERT_OK(commit_init_status);
+ ASSERT_OK(rollback_status);
bool is_complete = false;
Status completion_status;
- ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status));
- ASSERT_TRUE(is_complete);
- ASSERT_OK(completion_status);
+ ASSERT_EVENTUALLY([&] {
+ ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status));
+ ASSERT_TRUE(is_complete);
+ });
+ const auto errmsg = completion_status.ToString();
+ ASSERT_TRUE(completion_status.IsAborted()) << errmsg;
+ ASSERT_STR_MATCHES(errmsg, "transaction has been aborted");
size_t num_rows;
ASSERT_OK(CountRows(table_.get(), &num_rows));
@@ -1941,15 +1953,12 @@ TEST_F(TxnOpDispatcherITest,
CommitWithWriteOpPendingParticipantNotYetRegistered
ASSERT_EQ(0, GetTxnOpDispatchersTotalCount());
}
-// This scenario exercises the case when a request to commit a transaction
+// This scenario exercises the case when a request to rollback a transaction
// arrives while TxnOpDispatcher still has pending write requests in its queue
// but it has already completed the registration of the txn participant.
-// BEGIN_TXN hasn't yet been sent for the participant tablet when the commit
-// request is issued by the client.
-//
-// TODO(aserbin): enable the scenario after the follow-up for
-// https://gerrit.cloudera.org/#/c/17127/ is merged
-TEST_F(TxnOpDispatcherITest, CommitWithWriteOpPendingParticipantRegistered) {
+// BEGIN_TXN hasn't yet been sent for the participant tablet when the rollback
+// request arrives.
+TEST_F(TxnOpDispatcherITest, RollbackWriteOpPendingParticipantRegistered) {
SKIP_IF_SLOW_NOT_ALLOWED();
constexpr auto kDelayMs = 1000;
@@ -1960,15 +1969,13 @@ TEST_F(TxnOpDispatcherITest,
CommitWithWriteOpPendingParticipantRegistered) {
shared_ptr<KuduTransaction> txn;
ASSERT_OK(client_->NewTransaction(&txn));
- Status commit_init_status;
- thread committer([&txn, &commit_init_status]{
+ Status rollback_status;
+ thread rollback([&txn, &rollback_status]{
SleepFor(MonoDelta::FromMilliseconds(kDelayMs));
- // Initiate committing the transaction after the delay, but don't wait
- // for the commit to finalize.
- commit_init_status = txn->StartCommit();
+ rollback_status = txn->Rollback();
});
auto cleanup = MakeScopedCleanup([&]() {
- committer.join();
+ rollback.join();
});
shared_ptr<KuduSession> session;
@@ -1976,26 +1983,23 @@ TEST_F(TxnOpDispatcherITest,
CommitWithWriteOpPendingParticipantRegistered) {
Status s = InsertRows(txn.get(), 1, &key, &session);
ASSERT_TRUE(s.IsIOError()) << s.ToString();
- // Since we tried to commit without allowing all participants to quiesce ops,
- // the transaction should automatically fail.
- committer.join();
+ rollback.join();
cleanup.cancel();
- ASSERT_OK(commit_init_status);
+ ASSERT_OK(rollback_status);
bool is_complete = false;
Status completion_status;
ASSERT_EVENTUALLY([&] {
ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status));
ASSERT_TRUE(is_complete);
- ASSERT_TRUE(completion_status.IsAborted()) << completion_status.ToString();
});
+ const auto errmsg = completion_status.ToString();
+ ASSERT_TRUE(completion_status.IsAborted()) << errmsg;
+ ASSERT_STR_MATCHES(errmsg, "transaction has been aborted");
size_t num_rows = 0;
ASSERT_OK(CountRows(table_.get(), &num_rows));
ASSERT_EQ(0, num_rows);
-
- // Since the commit has been successfully finalized, there should be no
- // TxnOpDispatcher for the transaction.
ASSERT_EQ(0, GetTxnOpDispatchersTotalCount());
}