This is an automated email from the ASF dual-hosted git repository.

kenhuuu pushed a commit to branch tx-reuse-it
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit 36ec2928f253b99d12b48d82da4cd68be1eb210e
Author: Ken Hu <[email protected]>
AuthorDate: Tue Jan 13 16:57:09 2026 -0800

    Add integration testing for session connection reuse CTR.
    
    The LogCaptor is added per test to prevent polluting the output of the
    integration tests with lots of unnecessary server logs. There are some
    inherited tests (namely shouldOpenAndCloseObsceneAmountOfSessions())
    which can generate tens of thousands of lines of output.
---
 CHANGELOG.asciidoc                                 |   2 +
 docs/src/reference/gremlin-variants.asciidoc       |   1 +
 .../apache/tinkerpop/gremlin/driver/Client.java    |   2 +
 .../driver/remote/DriverRemoteConnection.java      |   5 +-
 .../server/GremlinSessionReuseTxIntegrateTest.java | 294 ++++++++++++++++++++-
 5 files changed, 302 insertions(+), 2 deletions(-)

diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 271bc6d921..05c4cad26b 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -25,6 +25,8 @@ 
image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima
 
 * Integrated Python driver examples into automated build process to ensure 
examples remain functional.
 * Added `closeSessionPostGraphOp` to the Gremlin Server settings to indicate 
that the `Session` should be closed on either a successful commit or rollback.
+* Added `SessionedChildClient` that borrows connections from a different 
`Client` for use with `Sessions`.
+* Added `reuseConnectionsForSessions` to Java GLV settings to decide whether 
to use `SessionedChildClient` for remote transactions.
 
 [[release-3-7-5]]
 === TinkerPop 3.7.5 (Release Date: November 12, 2025)
diff --git a/docs/src/reference/gremlin-variants.asciidoc 
b/docs/src/reference/gremlin-variants.asciidoc
index 100c0013a4..99f50f5793 100644
--- a/docs/src/reference/gremlin-variants.asciidoc
+++ b/docs/src/reference/gremlin-variants.asciidoc
@@ -864,6 +864,7 @@ The following table describes the various configuration 
options for the Gremlin
 |path |The URL path to the Gremlin Server. |_/gremlin_
 |port |The port of the Gremlin Server to connect to. The same port will be 
applied for all hosts. |8192
 |protocol |Sets the `AuthProperties.Property.PROTOCOL` properties for 
authentication to Gremlin Server. |_none_
+|reuseConnectionsForSessions |Uses a `Client` that will attempt to reuse 
`Connections` when managing remote transactions with a 
`DriverRemoteConnection`. |false
 |serializer.className |The fully qualified class name of the 
`MessageSerializer` that will be used to communicate with the server. Note that 
the serializer configured on the client should be supported by the server 
configuration. |_none_
 |serializer.config |A `Map` of configuration settings for the serializer. 
|_none_
 |username |The username to submit on requests that require authentication. 
|_none_
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
index 258c63b2d7..82c325aa0f 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
@@ -820,6 +820,7 @@ public abstract class Client {
             if (borrowedConnection == null) {
                 //Borrow from parentClient's pool instead of creating new 
connection
                 borrowedConnection = parentClient.chooseConnection(msg);
+                logger.debug("SessionedChildClient choosing {}", 
borrowedConnection);
             }
             //Increment everytime, the connection is chosen, all these will be 
decremented when transaction is commited/rolledback
             borrowedConnection.borrowed.incrementAndGet();
@@ -833,6 +834,7 @@ public abstract class Client {
                 //Decrement borrowed one last time which was incremented by 
parentClient when the connection is borrowed initially
                 //returnToPool() does this
                 borrowedConnection.returnToPool();
+                logger.debug("Session closed for {} with count {}", 
borrowedConnection, borrowedConnection.borrowed.get());
 
                 borrowedConnection = null;
             }
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java
index 9eb6ad5382..dc1c1c8b06 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java
@@ -235,7 +235,10 @@ public class DriverRemoteConnection implements 
RemoteConnection {
      */
     Optional<String> getSessionId() {
         if (client instanceof Client.SessionedClient) {
-            Client.SessionedClient c = (Client.SessionedClient) client;
+            final Client.SessionedClient c = (Client.SessionedClient) client;
+            return Optional.of(c.getSessionId());
+        } else if (client instanceof Client.SessionedChildClient) {
+            final Client.SessionedChildClient c = 
(Client.SessionedChildClient) client;
             return Optional.of(c.getSessionId());
         }
 
diff --git 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinSessionReuseTxIntegrateTest.java
 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinSessionReuseTxIntegrateTest.java
index e49349dee6..44766fb7d5 100644
--- 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinSessionReuseTxIntegrateTest.java
+++ 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinSessionReuseTxIntegrateTest.java
@@ -18,7 +18,23 @@
  */
 package org.apache.tinkerpop.gremlin.server;
 
+import nl.altindag.log.LogCaptor;
 import org.apache.tinkerpop.gremlin.driver.Cluster;
+import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection;
+import 
org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static 
org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource.traversal;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeFalse;
 
 /**
  * Integration tests for gremlin-driver and bytecode sessions where the 
underlying connection can be re-used for
@@ -26,9 +42,11 @@ import org.apache.tinkerpop.gremlin.driver.Cluster;
  */
 public class GremlinSessionReuseTxIntegrateTest extends 
AbstractSessionTxIntegrateTest {
 
+    public static Pattern CHANNEL_ID_PATTERN = 
Pattern.compile("SessionedChildClient choosing.*\\{channel=(.*)\\}");
+
     @Override
     protected Cluster createCluster() {
-        return TestClientFactory.build().create();
+        return 
TestClientFactory.build().reuseConnectionsForSessions(true).create();
     }
 
     /**
@@ -43,4 +61,278 @@ public class GremlinSessionReuseTxIntegrateTest extends 
AbstractSessionTxIntegra
 
         return settings;
     }
+
+    @Test
+    public void shouldCleanupResourcesAfterSuccessfulCommit() throws Exception 
{
+        assumeFalse("Test not supported on deprecated UnifiedChannelizer", 
isUsingUnifiedChannelizer());
+
+        final LogCaptor logCaptor = LogCaptor.forRoot();
+        final Cluster cluster = createCluster();
+        try {
+            logCaptor.setLogLevelToDebug();
+            final GraphTraversalSource g = 
traversal().withRemote(DriverRemoteConnection.using(cluster));
+
+            assertEquals(0, (long) g.V().count().next());
+
+            final GraphTraversalSource gtx = g.tx().begin();
+            gtx.addV("person").iterate();
+            gtx.tx().commit();
+
+            // Check the both client and server side "sessions" are closed.
+            assertClientAndServerSessionResourcesClosed(logCaptor.getLogs());
+            assertEquals(1, (long) g.V().count().next());
+        } finally {
+            cluster.close();
+            resetLogCaptor(logCaptor);
+        }
+    }
+
+    @Test
+    public void shouldCleanupResourcesAfterSuccessfulRollback() throws 
Exception {
+        assumeFalse("Test not supported on deprecated UnifiedChannelizer", 
isUsingUnifiedChannelizer());
+
+        final LogCaptor logCaptor = LogCaptor.forRoot();
+        final Cluster cluster = createCluster();
+        try {
+            logCaptor.setLogLevelToDebug();
+            final GraphTraversalSource g = 
traversal().withRemote(DriverRemoteConnection.using(cluster));
+
+            assertEquals(0, (long) g.V().count().next());
+
+            final GraphTraversalSource gtx = g.tx().begin();
+            gtx.addV("person").iterate();
+            gtx.tx().rollback();
+
+            // Check the both client and server side "sessions" are closed.
+            assertClientAndServerSessionResourcesClosed(logCaptor.getLogs());
+            assertEquals(0, (long) g.V().count().next());
+        } finally {
+            cluster.close();
+            resetLogCaptor(logCaptor);
+        }
+    }
+
+    @Test
+    public void shouldNotAllowCommittedGtxToBeReused() throws Exception {
+        final Cluster cluster = createCluster();
+        final GraphTraversalSource g = 
traversal().withRemote(DriverRemoteConnection.using(cluster));
+
+        final GraphTraversalSource gtx = g.tx().begin();
+        gtx.addV("person").iterate();
+        gtx.tx().commit();
+
+        try {
+            gtx.addV("software").iterate();
+            gtx.tx().commit();
+            fail("gtx that has already been committed shouldn't be reusable.");
+        } catch (Exception e) {
+            // Consider any exception to be correct behavior.
+        }
+
+        cluster.close();
+    }
+
+    @Test
+    public void shouldAllowMultipleTransactionsOnDifferentConnection() throws 
Exception {
+        final LogCaptor logCaptor = LogCaptor.forRoot();
+        final Cluster cluster = createCluster();
+        try {
+            logCaptor.setLogLevelToDebug();
+
+            final Pattern channelCountPattern = Pattern.compile("Session 
closed for.*count\\s(.*)");
+            final GraphTraversalSource g = 
traversal().withRemote(DriverRemoteConnection.using(cluster));
+
+            final GraphTraversalSource gtx1 = g.tx().begin();
+            final GraphTraversalSource gtx2 = g.tx().begin();
+
+            gtx1.addV("person").iterate();
+            assertEquals(1, (long) gtx1.V().count().next());
+
+            gtx2.addV("software").iterate();
+            assertEquals(1, (long) gtx1.V().count().next());
+
+            gtx1.tx().commit();
+            gtx2.tx().commit();
+
+            assertEquals(2, (long) g.V().count().next());
+
+            // Ensure that two different underlying connections were used.
+            final Set<String> channelIds = new HashSet<>();
+            final List<String> lines = logCaptor.getLogs();
+            for (String line : lines) {
+                final Matcher idMatcher = CHANNEL_ID_PATTERN.matcher(line);
+                final Matcher countMatcher = channelCountPattern.matcher(line);
+                if (idMatcher.find()) {
+                    channelIds.add(idMatcher.group(1));
+                } else if (countMatcher.find()) {
+                    // Check that the client properly updates the borrowed 
count so that it's reset to zero after session
+                    // closed, but make the check a bit fuzzy since returning 
is async and so it may not hit 0 in time
+                    assertTrue(Integer.parseInt(countMatcher.group(1)) < 2);
+                }
+            }
+
+            assertEquals(2, channelIds.size());
+        } finally {
+            cluster.close();
+            resetLogCaptor(logCaptor);
+        }
+    }
+
+    @Test
+    public void shouldAllowMultipleTransactionsOnSameConnection() throws 
Exception {
+        assumeFalse("Test not supported on deprecated UnifiedChannelizer", 
isUsingUnifiedChannelizer());
+        final LogCaptor logCaptor = LogCaptor.forRoot();
+        // Cluster setup that has a single connection so simultaneous 
transactions must share it.
+        final Cluster cluster = TestClientFactory.
+                build().
+                reuseConnectionsForSessions(true).
+                minConnectionPoolSize(1).
+                maxConnectionPoolSize(1).
+                create();
+        try {
+            logCaptor.setLogLevelToDebug();
+
+            final GraphTraversalSource g = 
traversal().withRemote(DriverRemoteConnection.using(cluster));
+
+            final GraphTraversalSource gtx1 = g.tx().begin();
+            final GraphTraversalSource gtx2 = g.tx().begin();
+
+            gtx1.addV("person").iterate();
+            gtx2.addV("software").iterate();
+            gtx1.addV("place").iterate();
+
+            assertEquals(0, (long) g.V().count().next());
+            gtx1.tx().commit();
+            assertEquals(2, (long) g.V().count().next());
+            assertClientAndServerSessionResourcesClosed(logCaptor.getLogs());
+
+            gtx2.tx().commit();
+            assertEquals(3, (long) g.V().count().next());
+            assertClientAndServerSessionResourcesClosed(logCaptor.getLogs());
+        } finally {
+            cluster.close();
+            resetLogCaptor(logCaptor);
+        }
+    }
+
+    @Test
+    public void shouldReuseSameConnectionForSubsequentTransactionAfterCommit() 
throws Exception {
+        assumeFalse("Test not supported on deprecated UnifiedChannelizer", 
isUsingUnifiedChannelizer());
+        final LogCaptor logCaptor = LogCaptor.forRoot();
+        // Cluster setup with single connection to ensure that transaction 
state isn't shared even though connection
+        // is reused.
+        final Cluster cluster = TestClientFactory.build().
+                minConnectionPoolSize(1).
+                maxConnectionPoolSize(1).
+                reuseConnectionsForSessions(true).
+                create();
+        try {
+            logCaptor.setLogLevelToDebug();
+
+            String channelId = "";
+            final GraphTraversalSource g = 
traversal().withRemote(DriverRemoteConnection.using(cluster));
+
+            final GraphTraversalSource gtx1 = g.tx().begin();
+            gtx1.addV("person1").iterate();
+            gtx1.tx().commit();
+
+            List<String> lines = logCaptor.getLogs();
+            for (final String line : lines) {
+                final Matcher idMatcher = CHANNEL_ID_PATTERN.matcher(line);
+                if (idMatcher.find()) {
+                    // Save the channelId used for this transaction for 
comparison with the subsequent one.
+                    channelId = idMatcher.group(1);
+                }
+            }
+            assertClientAndServerSessionResourcesClosed(lines);
+
+            final GraphTraversalSource gtx2 = g.tx().begin();
+            gtx2.addV("person2").iterate();
+            gtx2.tx().commit();
+
+            lines = logCaptor.getLogs();
+            for (final String line : lines) {
+                final Matcher idMatcher = CHANNEL_ID_PATTERN.matcher(line);
+                if (idMatcher.find()) {
+                    // Ensure that same connection was used for both 
transactions.
+                    assertEquals(channelId, idMatcher.group(1));
+                }
+            }
+            assertClientAndServerSessionResourcesClosed(lines);
+
+            assertEquals(2, (long) g.V().count().next());
+        } finally {
+            cluster.close();
+            resetLogCaptor(logCaptor);
+        }
+    }
+
+    @Test
+    public void 
shouldReuseSameConnectionForSubsequentTransactionAfterRollback() throws 
Exception {
+        assumeFalse("Test not supported on deprecated UnifiedChannelizer", 
isUsingUnifiedChannelizer());
+        final LogCaptor logCaptor = LogCaptor.forRoot();
+        final Cluster cluster = TestClientFactory.build().
+                minConnectionPoolSize(1).
+                maxConnectionPoolSize(1).
+                reuseConnectionsForSessions(true).
+                create();
+        try {
+            logCaptor.setLogLevelToDebug();
+
+            String channelId = "";
+            final GraphTraversalSource g = 
traversal().withRemote(DriverRemoteConnection.using(cluster));
+
+            final GraphTraversalSource gtx1 = g.tx().begin();
+            gtx1.addV("person1").iterate();
+            gtx1.tx().rollback();
+
+            List<String> lines = logCaptor.getLogs();
+            for (final String line : lines) {
+                final Matcher idMatcher = CHANNEL_ID_PATTERN.matcher(line);
+                if (idMatcher.find()) {
+                    channelId = idMatcher.group(1);
+                }
+            }
+            assertClientAndServerSessionResourcesClosed(lines);
+
+            final GraphTraversalSource gtx2 = g.tx().begin();
+            gtx2.addV("person2").iterate();
+            gtx2.tx().commit();
+
+            lines = logCaptor.getLogs();
+            for (final String line : lines) {
+                final Matcher idMatcher = CHANNEL_ID_PATTERN.matcher(line);
+                if (idMatcher.find()) {
+                    assertEquals(channelId, idMatcher.group(1));
+                }
+            }
+            assertClientAndServerSessionResourcesClosed(lines);
+
+            assertEquals(1, (long) g.V().count().next());
+        } finally {
+            cluster.close();
+            resetLogCaptor(logCaptor);
+        }
+    }
+
+    // Needed to prevent other long running tests that don't require the 
LogCaptor from accidentally logging.
+    private void resetLogCaptor(final LogCaptor logCaptor) {
+        logCaptor.resetLogLevel();
+        logCaptor.clearLogs();
+        logCaptor.close();
+    }
+
+    private void assertClientAndServerSessionResourcesClosed(final 
List<String> logLines) {
+        boolean clientSessionClosed = false;
+        boolean serverSessionClosed = false;
+        for (final String line : logLines) {
+            if (line.matches("Session.*closed$")) {
+                serverSessionClosed = true;
+            } else if (line.matches("Session closed for Connection.*")) {
+                clientSessionClosed = true;
+            }
+        }
+        assertTrue(clientSessionClosed);
+        assertTrue(serverSessionClosed);
+    }
 }

Reply via email to