This is an automated email from the ASF dual-hosted git repository. kenhuuu pushed a commit to branch tx-reuse-it in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit 36ec2928f253b99d12b48d82da4cd68be1eb210e Author: Ken Hu <[email protected]> AuthorDate: Tue Jan 13 16:57:09 2026 -0800 Add integration testing for session connection reuse CTR. The LogCaptor is added per test to prevent polluting the output of the integration tests with lots of unnecessary server logs. There are some inherited tests (namely shouldOpenAndCloseObsceneAmountOfSessions()) which can generate tens of thousands of lines of output. --- CHANGELOG.asciidoc | 2 + docs/src/reference/gremlin-variants.asciidoc | 1 + .../apache/tinkerpop/gremlin/driver/Client.java | 2 + .../driver/remote/DriverRemoteConnection.java | 5 +- .../server/GremlinSessionReuseTxIntegrateTest.java | 294 ++++++++++++++++++++- 5 files changed, 302 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 271bc6d921..05c4cad26b 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -25,6 +25,8 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima * Integrated Python driver examples into automated build process to ensure examples remain functional. * Added `closeSessionPostGraphOp` to the Gremlin Server settings to indicate that the `Session` should be closed on either a successful commit or rollback. +* Added `SessionedChildClient` that borrows connections from a different `Client` for use with `Sessions`. +* Added `reuseConnectionsForSessions` to Java GLV settings to decide whether to use `SessionedChildClient` for remote transactions. [[release-3-7-5]] === TinkerPop 3.7.5 (Release Date: November 12, 2025) diff --git a/docs/src/reference/gremlin-variants.asciidoc b/docs/src/reference/gremlin-variants.asciidoc index 100c0013a4..99f50f5793 100644 --- a/docs/src/reference/gremlin-variants.asciidoc +++ b/docs/src/reference/gremlin-variants.asciidoc @@ -864,6 +864,7 @@ The following table describes the various configuration options for the Gremlin |path |The URL path to the Gremlin Server. |_/gremlin_ |port |The port of the Gremlin Server to connect to. The same port will be applied for all hosts. |8192 |protocol |Sets the `AuthProperties.Property.PROTOCOL` properties for authentication to Gremlin Server. |_none_ +|reuseConnectionsForSessions |Uses a `Client` that will attempt to reuse `Connections` when managing remote transactions with a `DriverRemoteConnection`. |false |serializer.className |The fully qualified class name of the `MessageSerializer` that will be used to communicate with the server. Note that the serializer configured on the client should be supported by the server configuration. |_none_ |serializer.config |A `Map` of configuration settings for the serializer. |_none_ |username |The username to submit on requests that require authentication. |_none_ diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java index 258c63b2d7..82c325aa0f 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java @@ -820,6 +820,7 @@ public abstract class Client { if (borrowedConnection == null) { //Borrow from parentClient's pool instead of creating new connection borrowedConnection = parentClient.chooseConnection(msg); + logger.debug("SessionedChildClient choosing {}", borrowedConnection); } //Increment everytime, the connection is chosen, all these will be decremented when transaction is commited/rolledback borrowedConnection.borrowed.incrementAndGet(); @@ -833,6 +834,7 @@ public abstract class Client { //Decrement borrowed one last time which was incremented by parentClient when the connection is borrowed initially //returnToPool() does this borrowedConnection.returnToPool(); + logger.debug("Session closed for {} with count {}", borrowedConnection, borrowedConnection.borrowed.get()); borrowedConnection = null; } diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java index 9eb6ad5382..dc1c1c8b06 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java @@ -235,7 +235,10 @@ public class DriverRemoteConnection implements RemoteConnection { */ Optional<String> getSessionId() { if (client instanceof Client.SessionedClient) { - Client.SessionedClient c = (Client.SessionedClient) client; + final Client.SessionedClient c = (Client.SessionedClient) client; + return Optional.of(c.getSessionId()); + } else if (client instanceof Client.SessionedChildClient) { + final Client.SessionedChildClient c = (Client.SessionedChildClient) client; return Optional.of(c.getSessionId()); } diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinSessionReuseTxIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinSessionReuseTxIntegrateTest.java index e49349dee6..44766fb7d5 100644 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinSessionReuseTxIntegrateTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinSessionReuseTxIntegrateTest.java @@ -18,7 +18,23 @@ */ package org.apache.tinkerpop.gremlin.server; +import nl.altindag.log.LogCaptor; import org.apache.tinkerpop.gremlin.driver.Cluster; +import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection; +import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; +import org.junit.Test; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource.traversal; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.junit.Assume.assumeFalse; /** * Integration tests for gremlin-driver and bytecode sessions where the underlying connection can be re-used for @@ -26,9 +42,11 @@ import org.apache.tinkerpop.gremlin.driver.Cluster; */ public class GremlinSessionReuseTxIntegrateTest extends AbstractSessionTxIntegrateTest { + public static Pattern CHANNEL_ID_PATTERN = Pattern.compile("SessionedChildClient choosing.*\\{channel=(.*)\\}"); + @Override protected Cluster createCluster() { - return TestClientFactory.build().create(); + return TestClientFactory.build().reuseConnectionsForSessions(true).create(); } /** @@ -43,4 +61,278 @@ public class GremlinSessionReuseTxIntegrateTest extends AbstractSessionTxIntegra return settings; } + + @Test + public void shouldCleanupResourcesAfterSuccessfulCommit() throws Exception { + assumeFalse("Test not supported on deprecated UnifiedChannelizer", isUsingUnifiedChannelizer()); + + final LogCaptor logCaptor = LogCaptor.forRoot(); + final Cluster cluster = createCluster(); + try { + logCaptor.setLogLevelToDebug(); + final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster)); + + assertEquals(0, (long) g.V().count().next()); + + final GraphTraversalSource gtx = g.tx().begin(); + gtx.addV("person").iterate(); + gtx.tx().commit(); + + // Check the both client and server side "sessions" are closed. + assertClientAndServerSessionResourcesClosed(logCaptor.getLogs()); + assertEquals(1, (long) g.V().count().next()); + } finally { + cluster.close(); + resetLogCaptor(logCaptor); + } + } + + @Test + public void shouldCleanupResourcesAfterSuccessfulRollback() throws Exception { + assumeFalse("Test not supported on deprecated UnifiedChannelizer", isUsingUnifiedChannelizer()); + + final LogCaptor logCaptor = LogCaptor.forRoot(); + final Cluster cluster = createCluster(); + try { + logCaptor.setLogLevelToDebug(); + final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster)); + + assertEquals(0, (long) g.V().count().next()); + + final GraphTraversalSource gtx = g.tx().begin(); + gtx.addV("person").iterate(); + gtx.tx().rollback(); + + // Check the both client and server side "sessions" are closed. + assertClientAndServerSessionResourcesClosed(logCaptor.getLogs()); + assertEquals(0, (long) g.V().count().next()); + } finally { + cluster.close(); + resetLogCaptor(logCaptor); + } + } + + @Test + public void shouldNotAllowCommittedGtxToBeReused() throws Exception { + final Cluster cluster = createCluster(); + final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster)); + + final GraphTraversalSource gtx = g.tx().begin(); + gtx.addV("person").iterate(); + gtx.tx().commit(); + + try { + gtx.addV("software").iterate(); + gtx.tx().commit(); + fail("gtx that has already been committed shouldn't be reusable."); + } catch (Exception e) { + // Consider any exception to be correct behavior. + } + + cluster.close(); + } + + @Test + public void shouldAllowMultipleTransactionsOnDifferentConnection() throws Exception { + final LogCaptor logCaptor = LogCaptor.forRoot(); + final Cluster cluster = createCluster(); + try { + logCaptor.setLogLevelToDebug(); + + final Pattern channelCountPattern = Pattern.compile("Session closed for.*count\\s(.*)"); + final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster)); + + final GraphTraversalSource gtx1 = g.tx().begin(); + final GraphTraversalSource gtx2 = g.tx().begin(); + + gtx1.addV("person").iterate(); + assertEquals(1, (long) gtx1.V().count().next()); + + gtx2.addV("software").iterate(); + assertEquals(1, (long) gtx1.V().count().next()); + + gtx1.tx().commit(); + gtx2.tx().commit(); + + assertEquals(2, (long) g.V().count().next()); + + // Ensure that two different underlying connections were used. + final Set<String> channelIds = new HashSet<>(); + final List<String> lines = logCaptor.getLogs(); + for (String line : lines) { + final Matcher idMatcher = CHANNEL_ID_PATTERN.matcher(line); + final Matcher countMatcher = channelCountPattern.matcher(line); + if (idMatcher.find()) { + channelIds.add(idMatcher.group(1)); + } else if (countMatcher.find()) { + // Check that the client properly updates the borrowed count so that it's reset to zero after session + // closed, but make the check a bit fuzzy since returning is async and so it may not hit 0 in time + assertTrue(Integer.parseInt(countMatcher.group(1)) < 2); + } + } + + assertEquals(2, channelIds.size()); + } finally { + cluster.close(); + resetLogCaptor(logCaptor); + } + } + + @Test + public void shouldAllowMultipleTransactionsOnSameConnection() throws Exception { + assumeFalse("Test not supported on deprecated UnifiedChannelizer", isUsingUnifiedChannelizer()); + final LogCaptor logCaptor = LogCaptor.forRoot(); + // Cluster setup that has a single connection so simultaneous transactions must share it. + final Cluster cluster = TestClientFactory. + build(). + reuseConnectionsForSessions(true). + minConnectionPoolSize(1). + maxConnectionPoolSize(1). + create(); + try { + logCaptor.setLogLevelToDebug(); + + final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster)); + + final GraphTraversalSource gtx1 = g.tx().begin(); + final GraphTraversalSource gtx2 = g.tx().begin(); + + gtx1.addV("person").iterate(); + gtx2.addV("software").iterate(); + gtx1.addV("place").iterate(); + + assertEquals(0, (long) g.V().count().next()); + gtx1.tx().commit(); + assertEquals(2, (long) g.V().count().next()); + assertClientAndServerSessionResourcesClosed(logCaptor.getLogs()); + + gtx2.tx().commit(); + assertEquals(3, (long) g.V().count().next()); + assertClientAndServerSessionResourcesClosed(logCaptor.getLogs()); + } finally { + cluster.close(); + resetLogCaptor(logCaptor); + } + } + + @Test + public void shouldReuseSameConnectionForSubsequentTransactionAfterCommit() throws Exception { + assumeFalse("Test not supported on deprecated UnifiedChannelizer", isUsingUnifiedChannelizer()); + final LogCaptor logCaptor = LogCaptor.forRoot(); + // Cluster setup with single connection to ensure that transaction state isn't shared even though connection + // is reused. + final Cluster cluster = TestClientFactory.build(). + minConnectionPoolSize(1). + maxConnectionPoolSize(1). + reuseConnectionsForSessions(true). + create(); + try { + logCaptor.setLogLevelToDebug(); + + String channelId = ""; + final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster)); + + final GraphTraversalSource gtx1 = g.tx().begin(); + gtx1.addV("person1").iterate(); + gtx1.tx().commit(); + + List<String> lines = logCaptor.getLogs(); + for (final String line : lines) { + final Matcher idMatcher = CHANNEL_ID_PATTERN.matcher(line); + if (idMatcher.find()) { + // Save the channelId used for this transaction for comparison with the subsequent one. + channelId = idMatcher.group(1); + } + } + assertClientAndServerSessionResourcesClosed(lines); + + final GraphTraversalSource gtx2 = g.tx().begin(); + gtx2.addV("person2").iterate(); + gtx2.tx().commit(); + + lines = logCaptor.getLogs(); + for (final String line : lines) { + final Matcher idMatcher = CHANNEL_ID_PATTERN.matcher(line); + if (idMatcher.find()) { + // Ensure that same connection was used for both transactions. + assertEquals(channelId, idMatcher.group(1)); + } + } + assertClientAndServerSessionResourcesClosed(lines); + + assertEquals(2, (long) g.V().count().next()); + } finally { + cluster.close(); + resetLogCaptor(logCaptor); + } + } + + @Test + public void shouldReuseSameConnectionForSubsequentTransactionAfterRollback() throws Exception { + assumeFalse("Test not supported on deprecated UnifiedChannelizer", isUsingUnifiedChannelizer()); + final LogCaptor logCaptor = LogCaptor.forRoot(); + final Cluster cluster = TestClientFactory.build(). + minConnectionPoolSize(1). + maxConnectionPoolSize(1). + reuseConnectionsForSessions(true). + create(); + try { + logCaptor.setLogLevelToDebug(); + + String channelId = ""; + final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster)); + + final GraphTraversalSource gtx1 = g.tx().begin(); + gtx1.addV("person1").iterate(); + gtx1.tx().rollback(); + + List<String> lines = logCaptor.getLogs(); + for (final String line : lines) { + final Matcher idMatcher = CHANNEL_ID_PATTERN.matcher(line); + if (idMatcher.find()) { + channelId = idMatcher.group(1); + } + } + assertClientAndServerSessionResourcesClosed(lines); + + final GraphTraversalSource gtx2 = g.tx().begin(); + gtx2.addV("person2").iterate(); + gtx2.tx().commit(); + + lines = logCaptor.getLogs(); + for (final String line : lines) { + final Matcher idMatcher = CHANNEL_ID_PATTERN.matcher(line); + if (idMatcher.find()) { + assertEquals(channelId, idMatcher.group(1)); + } + } + assertClientAndServerSessionResourcesClosed(lines); + + assertEquals(1, (long) g.V().count().next()); + } finally { + cluster.close(); + resetLogCaptor(logCaptor); + } + } + + // Needed to prevent other long running tests that don't require the LogCaptor from accidentally logging. + private void resetLogCaptor(final LogCaptor logCaptor) { + logCaptor.resetLogLevel(); + logCaptor.clearLogs(); + logCaptor.close(); + } + + private void assertClientAndServerSessionResourcesClosed(final List<String> logLines) { + boolean clientSessionClosed = false; + boolean serverSessionClosed = false; + for (final String line : logLines) { + if (line.matches("Session.*closed$")) { + serverSessionClosed = true; + } else if (line.matches("Session closed for Connection.*")) { + clientSessionClosed = true; + } + } + assertTrue(clientSessionClosed); + assertTrue(serverSessionClosed); + } }
