This is an automated email from the ASF dual-hosted git repository. kenhuuu pushed a commit to branch tx-reuse-it in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit 78f8bb2e7daa8ca0587a42f5d86a6b675d6810e1 Author: Ken Hu <[email protected]> AuthorDate: Tue Jan 13 16:57:09 2026 -0800 Add integration testing for session connection reuse CTR. --- CHANGELOG.asciidoc | 2 + docs/src/reference/gremlin-variants.asciidoc | 1 + .../apache/tinkerpop/gremlin/driver/Client.java | 2 + .../driver/remote/DriverRemoteConnection.java | 5 +- .../server/GremlinSessionReuseTxIntegrateTest.java | 273 ++++++++++++++++++++- 5 files changed, 281 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 271bc6d921..05c4cad26b 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -25,6 +25,8 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima * Integrated Python driver examples into automated build process to ensure examples remain functional. * Added `closeSessionPostGraphOp` to the Gremlin Server settings to indicate that the `Session` should be closed on either a successful commit or rollback. +* Added `SessionedChildClient` that borrows connections from a different `Client` for use with `Sessions`. +* Added `reuseConnectionsForSessions` to Java GLV settings to decide whether to use `SessionedChildClient` for remote transactions. [[release-3-7-5]] === TinkerPop 3.7.5 (Release Date: November 12, 2025) diff --git a/docs/src/reference/gremlin-variants.asciidoc b/docs/src/reference/gremlin-variants.asciidoc index 100c0013a4..99f50f5793 100644 --- a/docs/src/reference/gremlin-variants.asciidoc +++ b/docs/src/reference/gremlin-variants.asciidoc @@ -864,6 +864,7 @@ The following table describes the various configuration options for the Gremlin |path |The URL path to the Gremlin Server. |_/gremlin_ |port |The port of the Gremlin Server to connect to. The same port will be applied for all hosts. |8192 |protocol |Sets the `AuthProperties.Property.PROTOCOL` properties for authentication to Gremlin Server. |_none_ +|reuseConnectionsForSessions |Uses a `Client` that will attempt to reuse `Connections` when managing remote transactions with a `DriverRemoteConnection`. |false |serializer.className |The fully qualified class name of the `MessageSerializer` that will be used to communicate with the server. Note that the serializer configured on the client should be supported by the server configuration. |_none_ |serializer.config |A `Map` of configuration settings for the serializer. |_none_ |username |The username to submit on requests that require authentication. |_none_ diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java index 258c63b2d7..82c325aa0f 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java @@ -820,6 +820,7 @@ public abstract class Client { if (borrowedConnection == null) { //Borrow from parentClient's pool instead of creating new connection borrowedConnection = parentClient.chooseConnection(msg); + logger.debug("SessionedChildClient choosing {}", borrowedConnection); } //Increment everytime, the connection is chosen, all these will be decremented when transaction is commited/rolledback borrowedConnection.borrowed.incrementAndGet(); @@ -833,6 +834,7 @@ public abstract class Client { //Decrement borrowed one last time which was incremented by parentClient when the connection is borrowed initially //returnToPool() does this borrowedConnection.returnToPool(); + logger.debug("Session closed for {} with count {}", borrowedConnection, borrowedConnection.borrowed.get()); borrowedConnection = null; } diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java index 9eb6ad5382..dc1c1c8b06 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java @@ -235,7 +235,10 @@ public class DriverRemoteConnection implements RemoteConnection { */ Optional<String> getSessionId() { if (client instanceof Client.SessionedClient) { - Client.SessionedClient c = (Client.SessionedClient) client; + final Client.SessionedClient c = (Client.SessionedClient) client; + return Optional.of(c.getSessionId()); + } else if (client instanceof Client.SessionedChildClient) { + final Client.SessionedChildClient c = (Client.SessionedChildClient) client; return Optional.of(c.getSessionId()); } diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinSessionReuseTxIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinSessionReuseTxIntegrateTest.java index e49349dee6..02c64a0b95 100644 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinSessionReuseTxIntegrateTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinSessionReuseTxIntegrateTest.java @@ -18,7 +18,22 @@ */ package org.apache.tinkerpop.gremlin.server; +import nl.altindag.log.LogCaptor; import org.apache.tinkerpop.gremlin.driver.Cluster; +import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection; +import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; +import org.junit.Test; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource.traversal; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** * Integration tests for gremlin-driver and bytecode sessions where the underlying connection can be re-used for @@ -26,9 +41,11 @@ import org.apache.tinkerpop.gremlin.driver.Cluster; */ public class GremlinSessionReuseTxIntegrateTest extends AbstractSessionTxIntegrateTest { + public static Pattern CHANNEL_ID_PATTERN = Pattern.compile("SessionedChildClient choosing.*\\{channel=(.*)\\}"); + @Override protected Cluster createCluster() { - return TestClientFactory.build().create(); + return TestClientFactory.build().reuseConnectionsForSessions(true).create(); } /** @@ -43,4 +60,258 @@ public class GremlinSessionReuseTxIntegrateTest extends AbstractSessionTxIntegra return settings; } + + @Test + public void shouldCleanupResourcesAfterSuccessfulCommit() throws Exception { + final LogCaptor logCaptor = LogCaptor.forRoot(); + logCaptor.setLogLevelToDebug(); + + final Cluster cluster = createCluster(); + final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster)); + + assertEquals(0, (long) g.V().count().next()); + + final GraphTraversalSource gtx = g.tx().begin(); + gtx.addV("person").iterate(); + gtx.tx().commit(); + + // Check the both client and server side "sessions" are closed. + assertClientAndServerSessionResourcesClosed(logCaptor.getLogs()); + assertEquals(1, (long) g.V().count().next()); + + cluster.close(); + logCaptor.close(); + } + + @Test + public void shouldCleanupResourcesAfterSuccessfulRollback() throws Exception { + final LogCaptor logCaptor = LogCaptor.forRoot(); + logCaptor.setLogLevelToDebug(); + + final Cluster cluster = createCluster(); + final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster)); + + assertEquals(0, (long) g.V().count().next()); + + final GraphTraversalSource gtx = g.tx().begin(); + gtx.addV("person").iterate(); + gtx.tx().rollback(); + + // Check the both client and server side "sessions" are closed. + assertClientAndServerSessionResourcesClosed(logCaptor.getLogs()); + assertEquals(0, (long) g.V().count().next()); + + cluster.close(); + logCaptor.close(); + } + + @Test + public void shouldNotAllowCommittedGtxToBeReused() throws Exception { + final Cluster cluster = createCluster(); + final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster)); + + final GraphTraversalSource gtx = g.tx().begin(); + gtx.addV("person").iterate(); + gtx.tx().commit(); + + try { + gtx.addV("software").iterate(); + gtx.tx().commit(); + fail("gtx that has already been committed shouldn't be reusable."); + } catch (Exception e) { + // Consider any exception to be correct behavior. + } + + cluster.close(); + } + + @Test + public void shouldAllowMultipleTransactionsOnDifferentConnection() throws Exception { + final LogCaptor logCaptor = LogCaptor.forRoot(); + logCaptor.setLogLevelToDebug(); + final Pattern channelCountPattern = Pattern.compile("Session closed for.*count\\s(.*)"); + + final Cluster cluster = createCluster(); + final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster)); + + final GraphTraversalSource gtx1 = g.tx().begin(); + final GraphTraversalSource gtx2 = g.tx().begin(); + + gtx1.addV("person").iterate(); + assertEquals(1, (long) gtx1.V().count().next()); + + gtx2.addV("software").iterate(); + assertEquals(1, (long) gtx1.V().count().next()); + + gtx1.tx().commit(); + gtx2.tx().commit(); + + assertEquals(2, (long) g.V().count().next()); + + // Ensure that two different underlying connections were used. + final Set<String> channelIds = new HashSet<>(); + final List<String> lines = logCaptor.getLogs(); + for (String line : lines) { + final Matcher idMatcher = CHANNEL_ID_PATTERN.matcher(line); + final Matcher countMatcher = channelCountPattern.matcher(line); + if (idMatcher.find()) { + channelIds.add(idMatcher.group(1)); + } else if (countMatcher.find()) { + // Check that the client properly updates the borrowed count so that it's reset to zero after session closed + assertEquals(0, Integer.parseInt(countMatcher.group(1))); + } + } + + assertEquals(2, channelIds.size()); + + cluster.close(); + logCaptor.close(); + } + + @Test + public void shouldAllowMultipleTransactionsOnSameConnection() throws Exception { + final LogCaptor logCaptor = LogCaptor.forRoot(); + logCaptor.setLogLevelToDebug(); + + // Cluster setup that has a single connection so simultaneous transactions must share it. + final Cluster cluster = TestClientFactory. + build(). + reuseConnectionsForSessions(true). + minConnectionPoolSize(1). + maxConnectionPoolSize(1). + create(); + + final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster)); + + final GraphTraversalSource gtx1 = g.tx().begin(); + final GraphTraversalSource gtx2 = g.tx().begin(); + + gtx1.addV("person").iterate(); + gtx2.addV("software").iterate(); + gtx1.addV("place").iterate(); + + assertEquals(0, (long) g.V().count().next()); + gtx1.tx().commit(); + assertEquals(2, (long) g.V().count().next()); + assertClientAndServerSessionResourcesClosed(logCaptor.getLogs()); + + gtx2.tx().commit(); + assertEquals(3, (long) g.V().count().next()); + assertClientAndServerSessionResourcesClosed(logCaptor.getLogs()); + + cluster.close(); + logCaptor.close(); + } + + @Test + public void shouldReuseSameConnectionForSubsequentTransactionAfterCommit() throws Exception { + final LogCaptor logCaptor = LogCaptor.forRoot(); + logCaptor.setLogLevelToDebug(); + + String channelId = ""; + + // Cluster setup with single connection to ensure that transaction state isn't shared even though connection + // is reused. + final Cluster cluster = TestClientFactory.build(). + minConnectionPoolSize(1). + maxConnectionPoolSize(1). + reuseConnectionsForSessions(true). + create(); + + final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster)); + + final GraphTraversalSource gtx1 = g.tx().begin(); + gtx1.addV("person1").iterate(); + gtx1.tx().commit(); + + List<String> lines = logCaptor.getLogs(); + for (final String line : lines) { + final Matcher idMatcher = CHANNEL_ID_PATTERN.matcher(line); + if (idMatcher.find()) { + // Save the channelId used for this transaction for comparison with the subsequent one. + channelId = idMatcher.group(1); + } + } + assertClientAndServerSessionResourcesClosed(lines); + + final GraphTraversalSource gtx2 = g.tx().begin(); + gtx2.addV("person2").iterate(); + gtx2.tx().commit(); + + lines = logCaptor.getLogs(); + for (final String line : lines) { + final Matcher idMatcher = CHANNEL_ID_PATTERN.matcher(line); + if (idMatcher.find()) { + // Ensure that same connection was used for both transactions. + assertEquals(channelId, idMatcher.group(1)); + } + } + assertClientAndServerSessionResourcesClosed(lines); + + assertEquals(2, (long) g.V().count().next()); + + cluster.close(); + logCaptor.close(); + } + + @Test + public void shouldReuseSameConnectionForSubsequentTransactionAfterRollback() throws Exception { + final LogCaptor logCaptor = LogCaptor.forRoot(); + logCaptor.setLogLevelToDebug(); + + String channelId = ""; + + final Cluster cluster = TestClientFactory.build(). + minConnectionPoolSize(1). + maxConnectionPoolSize(1). + reuseConnectionsForSessions(true). + create(); + + final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster)); + + final GraphTraversalSource gtx1 = g.tx().begin(); + gtx1.addV("person1").iterate(); + gtx1.tx().rollback(); + + List<String> lines = logCaptor.getLogs(); + for (final String line : lines) { + final Matcher idMatcher = CHANNEL_ID_PATTERN.matcher(line); + if (idMatcher.find()) { + channelId = idMatcher.group(1); + } + } + assertClientAndServerSessionResourcesClosed(lines); + + final GraphTraversalSource gtx2 = g.tx().begin(); + gtx2.addV("person2").iterate(); + gtx2.tx().commit(); + + lines = logCaptor.getLogs(); + for (final String line : lines) { + final Matcher idMatcher = CHANNEL_ID_PATTERN.matcher(line); + if (idMatcher.find()) { + assertEquals(channelId, idMatcher.group(1)); + } + } + assertClientAndServerSessionResourcesClosed(lines); + + assertEquals(1, (long) g.V().count().next()); + + cluster.close(); + logCaptor.close(); + } + + private void assertClientAndServerSessionResourcesClosed(final List<String> logLines) { + boolean clientSessionClosed = false; + boolean serverSessionClosed = false; + for (final String line : logLines) { + if (line.matches("Session.*closed$")) { + serverSessionClosed = true; + } else if (line.matches("Session closed for Connection.*")) { + clientSessionClosed = true; + } + } + assertTrue(clientSessionClosed); + assertTrue(serverSessionClosed); + } }
