This is an automated email from the ASF dual-hosted git repository.
kenhuuu pushed a commit to branch 3.7-dev
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
The following commit(s) were added to refs/heads/3.7-dev by this push:
new 274fc53287 Add integration testing for session connection reuse CTR.
274fc53287 is described below
commit 274fc532870049e49c6d656728ba258edbb4013e
Author: Ken Hu <[email protected]>
AuthorDate: Tue Jan 13 16:57:09 2026 -0800
Add integration testing for session connection reuse CTR.
The LogCaptor is added per test to prevent polluting the output of the
integration tests with lots of unnecessary server logs. There are some
inherited tests (namely shouldOpenAndCloseObsceneAmountOfSessions())
which can generate tens of thousands of lines of output.
---
CHANGELOG.asciidoc | 2 +
docs/src/reference/gremlin-variants.asciidoc | 1 +
.../apache/tinkerpop/gremlin/driver/Client.java | 2 +
.../driver/remote/DriverRemoteConnection.java | 5 +-
.../server/GremlinSessionReuseTxIntegrateTest.java | 294 ++++++++++++++++++++-
5 files changed, 302 insertions(+), 2 deletions(-)
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 2eb4933d47..40d3eab415 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -27,6 +27,8 @@
image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima
* Fixed bug in `SubgraphStrategy` where specifying `edges` and `vertices`
filters that had `map`-type steps could generate an error.
* Fixed bug in `ReservedKeysVerificationStrategy` where `AddPropertyStep` was
not triggering proper validations.
* Added `closeSessionPostGraphOp` to the Gremlin Server settings to indicate
that the `Session` should be closed on either a successful commit or rollback.
+* Added `SessionedChildClient` that borrows connections from a different
`Client` for use with `Sessions`.
+* Added `reuseConnectionsForSessions` to Java GLV settings to decide whether
to use `SessionedChildClient` for remote transactions.
[[release-3-7-5]]
=== TinkerPop 3.7.5 (Release Date: November 12, 2025)
diff --git a/docs/src/reference/gremlin-variants.asciidoc
b/docs/src/reference/gremlin-variants.asciidoc
index 100c0013a4..99f50f5793 100644
--- a/docs/src/reference/gremlin-variants.asciidoc
+++ b/docs/src/reference/gremlin-variants.asciidoc
@@ -864,6 +864,7 @@ The following table describes the various configuration
options for the Gremlin
|path |The URL path to the Gremlin Server. |_/gremlin_
|port |The port of the Gremlin Server to connect to. The same port will be
applied for all hosts. |8192
|protocol |Sets the `AuthProperties.Property.PROTOCOL` properties for
authentication to Gremlin Server. |_none_
+|reuseConnectionsForSessions |Uses a `Client` that will attempt to reuse
`Connections` when managing remote transactions with a
`DriverRemoteConnection`. |false
|serializer.className |The fully qualified class name of the
`MessageSerializer` that will be used to communicate with the server. Note that
the serializer configured on the client should be supported by the server
configuration. |_none_
|serializer.config |A `Map` of configuration settings for the serializer.
|_none_
|username |The username to submit on requests that require authentication.
|_none_
diff --git
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
index 258c63b2d7..82c325aa0f 100644
---
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
+++
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
@@ -820,6 +820,7 @@ public abstract class Client {
if (borrowedConnection == null) {
//Borrow from parentClient's pool instead of creating new
connection
borrowedConnection = parentClient.chooseConnection(msg);
+ logger.debug("SessionedChildClient choosing {}",
borrowedConnection);
}
//Increment everytime, the connection is chosen, all these will be
decremented when transaction is commited/rolledback
borrowedConnection.borrowed.incrementAndGet();
@@ -833,6 +834,7 @@ public abstract class Client {
//Decrement borrowed one last time which was incremented by
parentClient when the connection is borrowed initially
//returnToPool() does this
borrowedConnection.returnToPool();
+ logger.debug("Session closed for {} with count {}",
borrowedConnection, borrowedConnection.borrowed.get());
borrowedConnection = null;
}
diff --git
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java
index 9eb6ad5382..dc1c1c8b06 100644
---
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java
+++
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java
@@ -235,7 +235,10 @@ public class DriverRemoteConnection implements
RemoteConnection {
*/
Optional<String> getSessionId() {
if (client instanceof Client.SessionedClient) {
- Client.SessionedClient c = (Client.SessionedClient) client;
+ final Client.SessionedClient c = (Client.SessionedClient) client;
+ return Optional.of(c.getSessionId());
+ } else if (client instanceof Client.SessionedChildClient) {
+ final Client.SessionedChildClient c =
(Client.SessionedChildClient) client;
return Optional.of(c.getSessionId());
}
diff --git
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinSessionReuseTxIntegrateTest.java
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinSessionReuseTxIntegrateTest.java
index e49349dee6..44766fb7d5 100644
---
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinSessionReuseTxIntegrateTest.java
+++
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinSessionReuseTxIntegrateTest.java
@@ -18,7 +18,23 @@
*/
package org.apache.tinkerpop.gremlin.server;
+import nl.altindag.log.LogCaptor;
import org.apache.tinkerpop.gremlin.driver.Cluster;
+import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection;
+import
org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static
org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource.traversal;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeFalse;
/**
* Integration tests for gremlin-driver and bytecode sessions where the
underlying connection can be re-used for
@@ -26,9 +42,11 @@ import org.apache.tinkerpop.gremlin.driver.Cluster;
*/
public class GremlinSessionReuseTxIntegrateTest extends
AbstractSessionTxIntegrateTest {
+ public static Pattern CHANNEL_ID_PATTERN =
Pattern.compile("SessionedChildClient choosing.*\\{channel=(.*)\\}");
+
@Override
protected Cluster createCluster() {
- return TestClientFactory.build().create();
+ return
TestClientFactory.build().reuseConnectionsForSessions(true).create();
}
/**
@@ -43,4 +61,278 @@ public class GremlinSessionReuseTxIntegrateTest extends
AbstractSessionTxIntegra
return settings;
}
+
+ @Test
+ public void shouldCleanupResourcesAfterSuccessfulCommit() throws Exception
{
+ assumeFalse("Test not supported on deprecated UnifiedChannelizer",
isUsingUnifiedChannelizer());
+
+ final LogCaptor logCaptor = LogCaptor.forRoot();
+ final Cluster cluster = createCluster();
+ try {
+ logCaptor.setLogLevelToDebug();
+ final GraphTraversalSource g =
traversal().withRemote(DriverRemoteConnection.using(cluster));
+
+ assertEquals(0, (long) g.V().count().next());
+
+ final GraphTraversalSource gtx = g.tx().begin();
+ gtx.addV("person").iterate();
+ gtx.tx().commit();
+
+ // Check the both client and server side "sessions" are closed.
+ assertClientAndServerSessionResourcesClosed(logCaptor.getLogs());
+ assertEquals(1, (long) g.V().count().next());
+ } finally {
+ cluster.close();
+ resetLogCaptor(logCaptor);
+ }
+ }
+
+ @Test
+ public void shouldCleanupResourcesAfterSuccessfulRollback() throws
Exception {
+ assumeFalse("Test not supported on deprecated UnifiedChannelizer",
isUsingUnifiedChannelizer());
+
+ final LogCaptor logCaptor = LogCaptor.forRoot();
+ final Cluster cluster = createCluster();
+ try {
+ logCaptor.setLogLevelToDebug();
+ final GraphTraversalSource g =
traversal().withRemote(DriverRemoteConnection.using(cluster));
+
+ assertEquals(0, (long) g.V().count().next());
+
+ final GraphTraversalSource gtx = g.tx().begin();
+ gtx.addV("person").iterate();
+ gtx.tx().rollback();
+
+ // Check the both client and server side "sessions" are closed.
+ assertClientAndServerSessionResourcesClosed(logCaptor.getLogs());
+ assertEquals(0, (long) g.V().count().next());
+ } finally {
+ cluster.close();
+ resetLogCaptor(logCaptor);
+ }
+ }
+
+ @Test
+ public void shouldNotAllowCommittedGtxToBeReused() throws Exception {
+ final Cluster cluster = createCluster();
+ final GraphTraversalSource g =
traversal().withRemote(DriverRemoteConnection.using(cluster));
+
+ final GraphTraversalSource gtx = g.tx().begin();
+ gtx.addV("person").iterate();
+ gtx.tx().commit();
+
+ try {
+ gtx.addV("software").iterate();
+ gtx.tx().commit();
+ fail("gtx that has already been committed shouldn't be reusable.");
+ } catch (Exception e) {
+ // Consider any exception to be correct behavior.
+ }
+
+ cluster.close();
+ }
+
+ @Test
+ public void shouldAllowMultipleTransactionsOnDifferentConnection() throws
Exception {
+ final LogCaptor logCaptor = LogCaptor.forRoot();
+ final Cluster cluster = createCluster();
+ try {
+ logCaptor.setLogLevelToDebug();
+
+ final Pattern channelCountPattern = Pattern.compile("Session
closed for.*count\\s(.*)");
+ final GraphTraversalSource g =
traversal().withRemote(DriverRemoteConnection.using(cluster));
+
+ final GraphTraversalSource gtx1 = g.tx().begin();
+ final GraphTraversalSource gtx2 = g.tx().begin();
+
+ gtx1.addV("person").iterate();
+ assertEquals(1, (long) gtx1.V().count().next());
+
+ gtx2.addV("software").iterate();
+ assertEquals(1, (long) gtx1.V().count().next());
+
+ gtx1.tx().commit();
+ gtx2.tx().commit();
+
+ assertEquals(2, (long) g.V().count().next());
+
+ // Ensure that two different underlying connections were used.
+ final Set<String> channelIds = new HashSet<>();
+ final List<String> lines = logCaptor.getLogs();
+ for (String line : lines) {
+ final Matcher idMatcher = CHANNEL_ID_PATTERN.matcher(line);
+ final Matcher countMatcher = channelCountPattern.matcher(line);
+ if (idMatcher.find()) {
+ channelIds.add(idMatcher.group(1));
+ } else if (countMatcher.find()) {
+ // Check that the client properly updates the borrowed
count so that it's reset to zero after session
+ // closed, but make the check a bit fuzzy since returning
is async and so it may not hit 0 in time
+ assertTrue(Integer.parseInt(countMatcher.group(1)) < 2);
+ }
+ }
+
+ assertEquals(2, channelIds.size());
+ } finally {
+ cluster.close();
+ resetLogCaptor(logCaptor);
+ }
+ }
+
+ @Test
+ public void shouldAllowMultipleTransactionsOnSameConnection() throws
Exception {
+ assumeFalse("Test not supported on deprecated UnifiedChannelizer",
isUsingUnifiedChannelizer());
+ final LogCaptor logCaptor = LogCaptor.forRoot();
+ // Cluster setup that has a single connection so simultaneous
transactions must share it.
+ final Cluster cluster = TestClientFactory.
+ build().
+ reuseConnectionsForSessions(true).
+ minConnectionPoolSize(1).
+ maxConnectionPoolSize(1).
+ create();
+ try {
+ logCaptor.setLogLevelToDebug();
+
+ final GraphTraversalSource g =
traversal().withRemote(DriverRemoteConnection.using(cluster));
+
+ final GraphTraversalSource gtx1 = g.tx().begin();
+ final GraphTraversalSource gtx2 = g.tx().begin();
+
+ gtx1.addV("person").iterate();
+ gtx2.addV("software").iterate();
+ gtx1.addV("place").iterate();
+
+ assertEquals(0, (long) g.V().count().next());
+ gtx1.tx().commit();
+ assertEquals(2, (long) g.V().count().next());
+ assertClientAndServerSessionResourcesClosed(logCaptor.getLogs());
+
+ gtx2.tx().commit();
+ assertEquals(3, (long) g.V().count().next());
+ assertClientAndServerSessionResourcesClosed(logCaptor.getLogs());
+ } finally {
+ cluster.close();
+ resetLogCaptor(logCaptor);
+ }
+ }
+
+ @Test
+ public void shouldReuseSameConnectionForSubsequentTransactionAfterCommit()
throws Exception {
+ assumeFalse("Test not supported on deprecated UnifiedChannelizer",
isUsingUnifiedChannelizer());
+ final LogCaptor logCaptor = LogCaptor.forRoot();
+ // Cluster setup with single connection to ensure that transaction
state isn't shared even though connection
+ // is reused.
+ final Cluster cluster = TestClientFactory.build().
+ minConnectionPoolSize(1).
+ maxConnectionPoolSize(1).
+ reuseConnectionsForSessions(true).
+ create();
+ try {
+ logCaptor.setLogLevelToDebug();
+
+ String channelId = "";
+ final GraphTraversalSource g =
traversal().withRemote(DriverRemoteConnection.using(cluster));
+
+ final GraphTraversalSource gtx1 = g.tx().begin();
+ gtx1.addV("person1").iterate();
+ gtx1.tx().commit();
+
+ List<String> lines = logCaptor.getLogs();
+ for (final String line : lines) {
+ final Matcher idMatcher = CHANNEL_ID_PATTERN.matcher(line);
+ if (idMatcher.find()) {
+ // Save the channelId used for this transaction for
comparison with the subsequent one.
+ channelId = idMatcher.group(1);
+ }
+ }
+ assertClientAndServerSessionResourcesClosed(lines);
+
+ final GraphTraversalSource gtx2 = g.tx().begin();
+ gtx2.addV("person2").iterate();
+ gtx2.tx().commit();
+
+ lines = logCaptor.getLogs();
+ for (final String line : lines) {
+ final Matcher idMatcher = CHANNEL_ID_PATTERN.matcher(line);
+ if (idMatcher.find()) {
+ // Ensure that same connection was used for both
transactions.
+ assertEquals(channelId, idMatcher.group(1));
+ }
+ }
+ assertClientAndServerSessionResourcesClosed(lines);
+
+ assertEquals(2, (long) g.V().count().next());
+ } finally {
+ cluster.close();
+ resetLogCaptor(logCaptor);
+ }
+ }
+
+ @Test
+ public void
shouldReuseSameConnectionForSubsequentTransactionAfterRollback() throws
Exception {
+ assumeFalse("Test not supported on deprecated UnifiedChannelizer",
isUsingUnifiedChannelizer());
+ final LogCaptor logCaptor = LogCaptor.forRoot();
+ final Cluster cluster = TestClientFactory.build().
+ minConnectionPoolSize(1).
+ maxConnectionPoolSize(1).
+ reuseConnectionsForSessions(true).
+ create();
+ try {
+ logCaptor.setLogLevelToDebug();
+
+ String channelId = "";
+ final GraphTraversalSource g =
traversal().withRemote(DriverRemoteConnection.using(cluster));
+
+ final GraphTraversalSource gtx1 = g.tx().begin();
+ gtx1.addV("person1").iterate();
+ gtx1.tx().rollback();
+
+ List<String> lines = logCaptor.getLogs();
+ for (final String line : lines) {
+ final Matcher idMatcher = CHANNEL_ID_PATTERN.matcher(line);
+ if (idMatcher.find()) {
+ channelId = idMatcher.group(1);
+ }
+ }
+ assertClientAndServerSessionResourcesClosed(lines);
+
+ final GraphTraversalSource gtx2 = g.tx().begin();
+ gtx2.addV("person2").iterate();
+ gtx2.tx().commit();
+
+ lines = logCaptor.getLogs();
+ for (final String line : lines) {
+ final Matcher idMatcher = CHANNEL_ID_PATTERN.matcher(line);
+ if (idMatcher.find()) {
+ assertEquals(channelId, idMatcher.group(1));
+ }
+ }
+ assertClientAndServerSessionResourcesClosed(lines);
+
+ assertEquals(1, (long) g.V().count().next());
+ } finally {
+ cluster.close();
+ resetLogCaptor(logCaptor);
+ }
+ }
+
+ // Needed to prevent other long running tests that don't require the
LogCaptor from accidentally logging.
+ private void resetLogCaptor(final LogCaptor logCaptor) {
+ logCaptor.resetLogLevel();
+ logCaptor.clearLogs();
+ logCaptor.close();
+ }
+
+ private void assertClientAndServerSessionResourcesClosed(final
List<String> logLines) {
+ boolean clientSessionClosed = false;
+ boolean serverSessionClosed = false;
+ for (final String line : logLines) {
+ if (line.matches("Session.*closed$")) {
+ serverSessionClosed = true;
+ } else if (line.matches("Session closed for Connection.*")) {
+ clientSessionClosed = true;
+ }
+ }
+ assertTrue(clientSessionClosed);
+ assertTrue(serverSessionClosed);
+ }
}