This is an automated email from the ASF dual-hosted git repository. kenhuuu pushed a commit to branch TINKERPOP-3217 in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit 8221c11e5b41fb026f4cb29ab1af3c99f674c0c8 Author: Ken Hu <[email protected]> AuthorDate: Mon Dec 1 12:16:13 2025 -0800 TINKERPOP-3217 Add server option to close Session automatically. The added destroySessionPostGraphOp setting enables re-using the same underlying connection for a different subsequent Session. This should increase performance for cases where many short-lived Transactions are sent to the server. --- CHANGELOG.asciidoc | 2 + docs/src/reference/gremlin-applications.asciidoc | 1 + .../apache/tinkerpop/gremlin/server/Settings.java | 8 + .../gremlin/server/handler/AbstractSession.java | 4 + .../server/op/session/SessionOpProcessor.java | 18 + .../server/GremlinSessionReuseTxIntegrateTest.java | 411 +++++++++++++++++++++ 6 files changed, 444 insertions(+) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index b292f8939a..350b26e5da 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -22,7 +22,9 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima [[release-3-7-6]] === TinkerPop 3.7.6 (NOT OFFICIALLY RELEASED YET) + * Integrated Python driver examples into automated build process to ensure examples remain functional. +* Added `destroySessionPostGraphOp` to the Gremlin Server settings to indicate that the `Session` should be closed on either a successful commit or rollback. [[release-3-7-5]] === TinkerPop 3.7.5 (Release Date: November 12, 2025) diff --git a/docs/src/reference/gremlin-applications.asciidoc b/docs/src/reference/gremlin-applications.asciidoc index 0eee376fab..4143bbfa76 100644 --- a/docs/src/reference/gremlin-applications.asciidoc +++ b/docs/src/reference/gremlin-applications.asciidoc @@ -1015,6 +1015,7 @@ The following table describes the various YAML configuration options that Gremli |authorization.authorizer |The fully qualified classname of an `Authorizer` implementation to use. |_none_ |authorization.config |A `Map` of configuration settings to be passed to the `Authorizer` when it is constructed. The settings available are dependent on the implementation. |_none_ |channelizer |The fully qualified classname of the `Channelizer` implementation to use. A `Channelizer` is a "channel initializer" which Gremlin Server uses to define the type of processing pipeline to use. By allowing different `Channelizer` implementations, Gremlin Server can support different communication protocols (e.g. WebSocket). |`WebSocketChannelizer` +|destroySessionPostGraphOp |Controls whether a `Session` will be closed by the server after a successful TX_COMMIT or TX_ROLLBACK bytecode request. |_false_ |enableAuditLog |The `AuthenticationHandler`, `AuthorizationHandler` and processors can issue audit logging messages with the authenticated user, remote socket address and requests with a gremlin query. For privacy reasons, the default value of this setting is false. The audit logging messages are logged at the INFO level via the `audit.org.apache.tinkerpop.gremlin.server` logger, which can be configured using the `logback.xml` file. |_false_ |graphManager |The fully qualified classname of the `GraphManager` implementation to use. A `GraphManager` is a class that adheres to the TinkerPop `GraphManager` interface, allowing custom implementations for storing and managing graph references, as well as defining custom methods to open and close graphs instantiations. To prevent Gremlin Server from starting when all graphs fails, the `CheckedGraphManager` can be used.|`DefaultGraphManager` |graphs |A `Map` of `Graph` configuration files where the key of the `Map` becomes the name to which the `Graph` will be bound and the value is the file name of a `Graph` configuration file. |_none_ diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java index 50e7444b2f..e293a40d1e 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java @@ -189,6 +189,14 @@ public class Settings { */ public boolean strictTransactionManagement = false; + /** + * If set to {@code true} the Gremlin Server will destroy the session when a GraphOp (commit or rollback) is + * successfully completed on that session. + * + * NOTE: Defaults to false in 3.7.6/3.8.1 to prevent breaking change. + */ + public boolean destroySessionPostGraphOp = false; + /** * The full class name of the {@link Channelizer} to use in Gremlin Server. */ diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/AbstractSession.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/AbstractSession.java index 6680127d51..5b93bf9f7d 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/AbstractSession.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/AbstractSession.java @@ -699,6 +699,10 @@ public abstract class AbstractSession implements Session, AutoCloseable { .code(ResponseStatusCode.NO_CONTENT) .statusAttributes(attributes) .create()); + + if (sessionTask.getSettings().destroySessionPostGraphOp) { + close(); + } } else { throw new IllegalStateException(String.format( "Bytecode in request is not a recognized graph operation: %s", bytecode.toString())); diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java index e10a0f1dd0..b9f9ca08b7 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java @@ -141,6 +141,9 @@ public class SessionOpProcessor extends AbstractEvalOpProcessor { }}; } + // Determines whether to destroy the session after a successful COMMIT/ROLLBACK. Set during init(). + private boolean destroySessionPostGraphOp; + public SessionOpProcessor() { super(false); } @@ -154,6 +157,7 @@ public class SessionOpProcessor extends AbstractEvalOpProcessor { public void init(final Settings settings) { this.maxParameters = (int) settings.optionalProcessor(SessionOpProcessor.class).orElse(DEFAULT_SETTINGS).config. getOrDefault(CONFIG_MAX_PARAMETERS, DEFAULT_MAX_PARAMETERS); + this.destroySessionPostGraphOp = settings.destroySessionPostGraphOp; } /** @@ -546,6 +550,13 @@ public class SessionOpProcessor extends AbstractEvalOpProcessor { .statusAttributes(attributes) .create()); + if (destroySessionPostGraphOp) { + // Setting force to true prevents deadlock when this thread attempts to destroy the session. + // This should be safe since either a commit or rollback just finished so the transaction + // shouldn't be open. + session.manualKill(true); + } + } catch (Throwable t) { onError(graph, context); // if any exception in the chain is TemporaryException or Failure then we should respond with the @@ -571,6 +582,13 @@ public class SessionOpProcessor extends AbstractEvalOpProcessor { .statusMessage(t.getMessage()) .statusAttributeException(t).create()); } + + if (destroySessionPostGraphOp) { + // Destroy the session after a successful rollback due to error. Placed here rather than + // in a finally block since we don't want to end the session if no commit/rollback succeeded. + session.manualKill(true); + } + if (t instanceof Error) { //Re-throw any errors to be handled by and set as the result the FutureTask throw t; diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinSessionReuseTxIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinSessionReuseTxIntegrateTest.java new file mode 100644 index 0000000000..e6adf74fa0 --- /dev/null +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinSessionReuseTxIntegrateTest.java @@ -0,0 +1,411 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.server; + +import org.apache.tinkerpop.gremlin.server.op.session.SessionOpProcessor; +import org.apache.tinkerpop.gremlin.util.ExceptionHelper; +import org.apache.tinkerpop.gremlin.driver.Client; +import org.apache.tinkerpop.gremlin.driver.Cluster; +import org.apache.tinkerpop.gremlin.driver.RequestOptions; +import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection; +import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; +import org.apache.tinkerpop.gremlin.structure.Transaction; +import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.junit.Ignore; +import org.junit.Test; + +import java.io.File; +import java.util.HashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource.traversal; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * Integration tests for gremlin-driver and bytecode sessions. + * + * NOTE: this is effectively a copy of GremlinSessionTxIntegrateTest but the expectation is that the gremlin-driver + * will be configured in a way that allows connection re-use as the server allows it via the "destroySessionPostGraphOp" + * setting. + */ +public class GremlinSessionReuseTxIntegrateTest extends AbstractGremlinServerIntegrationTest { + + /** + * Configure specific Gremlin Server settings for specific tests. + */ + @Override + public Settings overrideSettings(final Settings settings) { + final String nameOfTest = name.getMethodName(); + + settings.graphs.put("graph", "conf/tinkertransactiongraph-empty.properties"); + + // Set this as it is what allows connection re-use on the server side. + settings.destroySessionPostGraphOp = true; + + switch (nameOfTest) { + case "shouldExecuteBytecodeInSession": + break; + case "shouldTimeoutTxBytecode": + settings.processors.clear(); + + // OpProcessor setting + final Settings.ProcessorSettings processorSettings = new Settings.ProcessorSettings(); + processorSettings.className = SessionOpProcessor.class.getCanonicalName(); + processorSettings.config = new HashMap<>(); + processorSettings.config.put(SessionOpProcessor.CONFIG_SESSION_TIMEOUT, 3000L); + settings.processors.add(processorSettings); + + // Unified setting + settings.sessionLifetimeTimeout = 3000L; + break; + } + + return settings; + } + + @Test + @Ignore("TINKERPOP-2832") + public void shouldTimeoutTxBytecode() throws Exception { + + final Cluster cluster = TestClientFactory.build().create(); + final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster)); + + GraphTraversalSource gtx = g.tx().begin(); + gtx.addV("person").addE("link").iterate(); + gtx.tx().commit(); + + assertEquals(1L, g.V().count().next().longValue()); + assertEquals(1L, g.E().count().next().longValue()); + + try { + gtx = g.tx().begin(); + + assertEquals(1L, gtx.V().count().next().longValue()); + assertEquals(1L, gtx.E().count().next().longValue()); + + // wait long enough for the session to die + Thread.sleep(4000); + + // the following should fail with a dead session + gtx.V().count().iterate(); + fail("Session is dead - a new one should not reopen to server this request"); + } catch (Exception ex) { + ex.printStackTrace(); + } + + cluster.close(); + } + + @Test + public void shouldCommitTxBytecodeInSession() throws Exception { + + final Cluster cluster = TestClientFactory.build().create(); + final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster)); + + final GraphTraversalSource gtx = g.tx().begin(); + assertThat(gtx.tx().isOpen(), is(true)); + + gtx.addV("person").iterate(); + assertEquals(1, (long) gtx.V().count().next()); + + // outside the session we should be at zero + assertEquals(0, (long) g.V().count().next()); + + gtx.tx().commit(); + assertThat(gtx.tx().isOpen(), is(false)); + + // sessionless connections should still be good - close() should not affect that + assertEquals(1, (long) g.V().count().next()); + + // but the spawned gtx should be dead + try { + gtx.addV("software").iterate(); + fail("Should have failed since we committed the transaction"); + } catch (Exception ex) { + final Throwable root = ExceptionHelper.getRootCause(ex); + assertEquals("Client is closed", root.getMessage()); + } + + cluster.close(); + } + + @Test + public void shouldCommitTxBytecodeInSessionWithExplicitTransactionObject() throws Exception { + + final Cluster cluster = TestClientFactory.build().create(); + final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster)); + final Transaction tx = g.tx(); + assertThat(tx.isOpen(), is(true)); + + final GraphTraversalSource gtx = tx.begin(); + gtx.addV("person").iterate(); + assertEquals(1, (long) gtx.V().count().next()); + tx.commit(); + assertThat(tx.isOpen(), is(false)); + + // sessionless connections should still be good - close() should not affect that + assertEquals(1, (long) g.V().count().next()); + + cluster.close(); + } + + @Test + public void shouldRollbackTxBytecodeInSession() throws Exception { + + final Cluster cluster = TestClientFactory.build().create(); + final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster)); + + final GraphTraversalSource gtx = g.tx().begin(); + assertThat(gtx.tx().isOpen(), is(true)); + + gtx.addV("person").iterate(); + assertEquals(1, (long) gtx.V().count().next()); + gtx.tx().rollback(); + assertThat(gtx.tx().isOpen(), is(false)); + + // sessionless connections should still be good - close() should not affect that + assertEquals(0, (long) g.V().count().next()); + + // but the spawned gtx should be dead + try { + gtx.addV("software").iterate(); + fail("Should have failed since we committed the transaction"); + } catch (Exception ex) { + final Throwable root = ExceptionHelper.getRootCause(ex); + assertEquals("Client is closed", root.getMessage()); + } + + cluster.close(); + } + + @Test + public void shouldCommitTxBytecodeInSessionOnCloseOfGtx() throws Exception { + + final Cluster cluster = TestClientFactory.build().create(); + final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster)); + + final GraphTraversalSource gtx = g.tx().begin(); + assertThat(gtx.tx().isOpen(), is(true)); + + gtx.addV("person").iterate(); + assertEquals(1, (long) gtx.V().count().next()); + gtx.close(); + assertThat(gtx.tx().isOpen(), is(false)); + + // sessionless connections should still be good - close() should not affect that + assertEquals(1, (long) g.V().count().next()); + + // but the spawned gtx should be dead + try { + gtx.addV("software").iterate(); + fail("Should have failed since we committed the transaction"); + } catch (Exception ex) { + final Throwable root = ExceptionHelper.getRootCause(ex); + assertEquals("Client is closed", root.getMessage()); + } + + cluster.close(); + } + + @Test + public void shouldCommitTxBytecodeInSessionOnCloseTx() throws Exception { + + final Cluster cluster = TestClientFactory.build().create(); + final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster)); + + final GraphTraversalSource gtx = g.tx().begin(); + assertThat(gtx.tx().isOpen(), is(true)); + + gtx.addV("person").iterate(); + assertEquals(1, (long) gtx.V().count().next()); + gtx.close(); + assertThat(gtx.tx().isOpen(), is(false)); + + // sessionless connections should still be good - close() should not affect that + assertEquals(1, (long) g.V().count().next()); + + // but the spawned gtx should be dead + try { + gtx.addV("software").iterate(); + fail("Should have failed since we committed the transaction"); + } catch (Exception ex) { + final Throwable root = ExceptionHelper.getRootCause(ex); + assertEquals("Client is closed", root.getMessage()); + } + + cluster.close(); + } + + @Test + public void shouldOpenAndCloseObsceneAmountOfSessions() throws Exception { + + final Cluster cluster = TestClientFactory.build().create(); + final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster)); + + // need to open significantly more sessions that we have threads in gremlinPool. if we go too obscene on + // OpProcessor this test will take too long + final int numberOfSessions = isUsingUnifiedChannelizer() ? 1000 : 100; + for (int ix = 0; ix < numberOfSessions; ix ++) { + final Transaction tx = g.tx(); + final GraphTraversalSource gtx = tx.begin(); + try { + final Vertex v1 = gtx.addV("person").property("pid", ix + "a").next(); + final Vertex v2 = gtx.addV("person").property("pid", ix + "b").next(); + gtx.addE("knows").from(v1).to(v2).iterate(); + tx.commit(); + } catch (Exception ex) { + tx.rollback(); + fail("Should not expect any failures"); + } finally { + assertThat(tx.isOpen(), is(false)); + } + } + + // sessionless connections should still be good - close() should not affect that + assertEquals(numberOfSessions * 2, (long) g.V().count().next()); + assertEquals(numberOfSessions, (long) g.E().count().next()); + + cluster.close(); + } + + @Test + public void shouldCommitTxBytecodeInSessionReusingGtxAcrossThreads() throws Exception { + + final ExecutorService service = Executors.newFixedThreadPool(2); + + final Cluster cluster = TestClientFactory.build().create(); + final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster)); + + final GraphTraversalSource gtx = g.tx().begin(); + assertThat(gtx.tx().isOpen(), is(true)); + + final int verticesToAdd = 64; + for (int ix = 0; ix < verticesToAdd; ix++) { + service.submit(() -> gtx.addV("person").iterate()); + } + + service.shutdown(); + service.awaitTermination(90000, TimeUnit.MILLISECONDS); + + // outside the session we should be at zero + assertEquals(0, (long) g.V().count().next()); + + assertEquals(verticesToAdd, (long) gtx.V().count().next()); + gtx.tx().commit(); + assertThat(gtx.tx().isOpen(), is(false)); + + // sessionless connections should still be good - close() should not affect that + assertEquals(verticesToAdd, (long) g.V().count().next()); + + cluster.close(); + } + + @Test + public void shouldSpawnMultipleTraversalSourceInSameTransaction() throws Exception { + + final Cluster cluster = TestClientFactory.build().create(); + final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster)); + + final Transaction tx1 = g.tx(); + final GraphTraversalSource gtx1a = tx1.begin(); + final GraphTraversalSource gtx1b = tx1.begin(); + final Transaction tx2 = g.tx(); + final GraphTraversalSource gtx2 = tx2.begin(); + + gtx1a.addV("person").iterate(); + assertEquals(1, (long) gtx1a.V().count().next()); + assertEquals(1, (long) gtx1b.V().count().next()); + + // outside the session we should be at zero + assertEquals(0, (long) g.V().count().next()); + assertEquals(0, (long) gtx2.V().count().next()); + + // either can commit to end the transaction + gtx1b.tx().commit(); + assertThat(gtx1a.tx().isOpen(), is(false)); + assertThat(gtx1b.tx().isOpen(), is(false)); + + // sessionless connections should still be good - close() should not affect that + assertEquals(1, (long) g.V().count().next()); + + // but the spawned gtx should be dead + try { + gtx1a.addV("software").iterate(); + fail("Should have failed since we committed the transaction"); + } catch (Exception ex) { + final Throwable root = ExceptionHelper.getRootCause(ex); + assertEquals("Client is closed", root.getMessage()); + } + + try { + gtx1b.addV("software").iterate(); + fail("Should have failed since we committed the transaction"); + } catch (Exception ex) { + final Throwable root = ExceptionHelper.getRootCause(ex); + assertEquals("Client is closed", root.getMessage()); + } + + cluster.close(); + } + + @Test + public void shouldCommitRollbackInScriptUsingGremlinLang() throws Exception { + final Cluster cluster = TestClientFactory.open(); + final Client.SessionSettings sessionSettings = Client.SessionSettings.build(). + sessionId(name.getMethodName()). + manageTransactions(false). + maintainStateAfterException(false). + create(); + final Client.Settings clientSettings = Client.Settings.build().useSession(sessionSettings).create(); + final Client client = cluster.connect(); + final Client session = cluster.connect(clientSettings); + + // this test mixes calls across scriptengines - probably not a use case but interesting + try { + session.submit("g.addV('person')").all().get(10, TimeUnit.SECONDS); + session.submit("g.addV('person')").all().get(10, TimeUnit.SECONDS); + + // outside of session graph should be empty still but in session we should have 2 + assertEquals(0, client.submit("g.V().count()").all().get(10, TimeUnit.SECONDS).get(0).getInt()); + assertEquals(2, session.submit("g.V().count()").all().get(10, TimeUnit.SECONDS).get(0).getInt()); + + // commit whats there using gremlin-language and test again + session.submit("g.tx().commit()", RequestOptions.build().language("gremlin-lang").create()).all().get(10, TimeUnit.SECONDS); + assertEquals(2, client.submit("g.V().count()").all().get(10, TimeUnit.SECONDS).get(0).getInt()); + assertEquals(2, session.submit("g.V().count()").all().get(10, TimeUnit.SECONDS).get(0).getInt()); + + // add one more in session and test + session.submit("g.addV('person')").all().get(10, TimeUnit.SECONDS); + assertEquals(2, client.submit("g.V().count()").all().get(10, TimeUnit.SECONDS).get(0).getInt()); + assertEquals(3, session.submit("g.V().count()").all().get(10, TimeUnit.SECONDS).get(0).getInt()); + + // rollback the additional one and test + session.submit("g.tx().rollback()", RequestOptions.build().language("gremlin-lang").create()).all().get(10, TimeUnit.SECONDS); + assertEquals(2, client.submit("g.V().count()").all().get(10, TimeUnit.SECONDS).get(0).getInt()); + assertEquals(2, session.submit("g.V().count()").all().get(10, TimeUnit.SECONDS).get(0).getInt()); + } finally { + cluster.close(); + } + } +}
