This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch branch-1.15.x
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/branch-1.15.x by this push:
new 45551a9 KUDU-2612 register txn Java sessions with client
45551a9 is described below
commit 45551a92dab355feb4d6e4ee0532ee8095120a5f
Author: Alexey Serbin <[email protected]>
AuthorDate: Fri May 14 19:22:24 2021 -0700
KUDU-2612 register txn Java sessions with client
Prior to this patch, the assertion in AsyncKuduClient.removeSession()
would trigger upon closing a transactional session using via
{AsyncKuduSession,KuduSession}.close().
This patch address the issue by registering the newly created
transactional session with corresponding AsyncKuduClient. In addition,
a test scenario is added to catch regressions.
This is a follow-up to 7432a7a8aa0c98f8d2177c25b99576b51ff33a93.
Change-Id: I117abc938bca0e698854d944c3fd6f831d3f9ce0
Reviewed-on: http://gerrit.cloudera.org:8080/17451
Tested-by: Kudu Jenkins
Reviewed-by: Andrew Wong <[email protected]>
(cherry picked from commit b784a41290365d7eacb10048b45ea86708b5b314)
Reviewed-on: http://gerrit.cloudera.org:8080/17453
Reviewed-by: Bankim Bhavsar <[email protected]>
---
.../org/apache/kudu/client/AsyncKuduClient.java | 11 +++++
.../org/apache/kudu/client/KuduTransaction.java | 7 +--
.../apache/kudu/client/TestKuduTransaction.java | 51 ++++++++++++++++++++++
3 files changed, 64 insertions(+), 5 deletions(-)
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index 35f8477..cfa03cd 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -2614,6 +2614,17 @@ public class AsyncKuduClient implements AutoCloseable {
return closeAllSessions().addCallbackDeferring(new DisconnectCB());
}
+ // Create a new transactional session in the context of the transaction
+ // with the specified identifier.
+ AsyncKuduSession newTransactionalSession(long txnId) {
+ checkIsClosed();
+ AsyncKuduSession session = new AsyncKuduSession(this, txnId);
+ synchronized (sessions) {
+ sessions.add(session);
+ }
+ return session;
+ }
+
private void checkIsClosed() {
if (closed) {
throw new IllegalStateException("Cannot proceed, the client has already
been closed");
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java
index 9ea71b9..4abf71f 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java
@@ -241,7 +241,7 @@ public class KuduTransaction implements AutoCloseable {
synchronized (isInFlightSync) {
Preconditions.checkState(isInFlight);
}
- return new AsyncKuduSession(client, txnId);
+ return client.newTransactionalSession(txnId);
}
/**
@@ -253,10 +253,7 @@ public class KuduTransaction implements AutoCloseable {
* @return a new {@link KuduSession} instance
*/
public KuduSession newKuduSession() {
- synchronized (isInFlightSync) {
- Preconditions.checkState(isInFlight);
- }
- return new KuduSession(new AsyncKuduSession(client, txnId));
+ return new KuduSession(newAsyncKuduSession());
}
/**
diff --git
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java
index 0a97e8d..3f753d3 100644
---
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java
+++
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java
@@ -227,6 +227,57 @@ public class TestKuduTransaction {
}
/**
+ * Transactional sessions can be closed as regular ones.
+ */
+ @Test(timeout = 100000)
+ @MasterServerConfig(flags = {
+ "--txn_manager_enabled",
+ })
+ public void testTxnSessionClose() throws Exception {
+ final String TABLE_NAME = "txn_session_close";
+ client.createTable(
+ TABLE_NAME,
+ ClientTestUtil.getBasicSchema(),
+ new CreateTableOptions().addHashPartitions(ImmutableList.of("key"),
2));
+ KuduTable table = client.openTable(TABLE_NAME);
+
+ // Open and close an empty transaction session.
+ {
+ KuduTransaction txn = client.newTransaction();
+ assertNotNull(txn);
+ KuduSession session = txn.newKuduSession();
+ assertNotNull(session);
+ assertFalse(session.isClosed());
+ session.close();
+ assertTrue(session.isClosed());
+ }
+
+ // Open new transaction, insert one row for a session, close the session
+ // and then rollback the transaction. No rows should be persisted.
+ {
+ KuduTransaction txn = client.newTransaction();
+ assertNotNull(txn);
+ KuduSession session = txn.newKuduSession();
+ assertNotNull(session);
+ session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
+
+ Insert insert = createBasicSchemaInsert(table, 1);
+ session.apply(insert);
+ session.close();
+
+ txn.rollback();
+
+ assertTrue(session.isClosed());
+ assertEquals(0, session.countPendingErrors());
+
+ KuduScanner scanner = new KuduScanner.KuduScannerBuilder(asyncClient,
table)
+ .readMode(AsyncKuduScanner.ReadMode.READ_YOUR_WRITES)
+ .build();
+ assertEquals(0, countRowsInScan(scanner));
+ }
+ }
+
+ /**
* Test scenario that starts a new transaction, initiates its commit phase,
* and checks whether the commit is complete using the
* KuduTransaction.isCommitComplete() method.