This is an automated email from the ASF dual-hosted git repository.
kenhuuu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
The following commit(s) were added to refs/heads/master by this push:
new c0ac6cb256 Add transaction closure methods to GLVs (#3464)
c0ac6cb256 is described below
commit c0ac6cb256464580b4aa0ed6f979a9ad857b5ec2
Author: Ken Hu <[email protected]>
AuthorDate: Fri Jun 26 14:33:17 2026 -0700
Add transaction closure methods to GLVs (#3464)
Adds executeInTx/evaluateInTx to GraphTraversalSource across all GLVs to
wrap the
begin/commit/rollback lifecycle, so a lambda receives the transaction-bound
g and
is auto-committed on success or rolled back on error.
The methods live on GraphTraversalSource rather than on the Transaction
returned
by g.tx() because the closure is a Traversal-API convenience; hosting it on
the
Transaction would hand Driver-API users (who submit strings) a traversal
source
and mix the two APIs. Keeping it on g also lets Java's existing g.tx()
routing
cover both embedded and remote with no interface changes. The
value-returning
form is named evaluateInTx rather than call because call already exists on
GraphTraversalSource as the service step, and the InTx suffix keeps the
transactional intent clear now that the methods no longer sit under tx().
The surface follows each language's idiom rather than forcing uniformity:
dynamic
languages expose a single method, while statically typed languages get a
void/value pair. Go returns interface{} instead of using generics, to match
the
driver's existing untyped result API.
Behavior is single-shot with no retry, since Gremlin Server has no
standardized
retriable-error signal across providers. On commit failure a rollback is
still
attempted for server-side resource hygiene, but the original error stays
primary
and secondary cleanup failures are only logged. gtx.tx() still returns the
same
transaction so the commit path keeps working; only opening a second
transaction
errors.
Assisted-by: Claude Code:claude-opus-4-8
---
CHANGELOG.asciidoc | 1 +
docs/src/reference/gremlin-variants.asciidoc | 121 +++++++++++++
docs/src/reference/the-traversal.asciidoc | 52 +++++-
docs/src/upgrade/release-4.x.x.asciidoc | 27 +++
.../traversal/dsl/graph/GraphTraversalSource.java | 93 ++++++++++
.../Process/Traversal/GraphTraversalSource.cs | 105 +++++++++++
.../Driver/TransactionTests.cs | 121 +++++++++++++
gremlin-go/driver/graphTraversalSource.go | 106 ++++++++++++
gremlin-go/driver/logger.go | 1 +
.../driver/resources/logger-messages/en.json | 3 +-
gremlin-go/driver/transaction.go | 13 ++
gremlin-go/driver/transaction_test.go | 191 ++++++++++++++++++++-
.../lib/process/graph-traversal.ts | 68 ++++++++
.../test/integration/transaction-tests.js | 114 ++++++++++++
.../gremlin_python/process/graph_traversal.py | 53 ++++++
.../tests/integration/driver/test_transaction.py | 86 ++++++++++
.../GremlinDriverTransactionIntegrateTest.java | 125 ++++++++++++++
.../gremlin/structure/TransactionTest.java | 88 ++++++++++
18 files changed, 1362 insertions(+), 6 deletions(-)
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 0fe949254e..a4ad0e1e20 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -35,6 +35,7 @@
image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima
* Added Provider Defined Types (PDT) support — graph providers can define
custom types via `@ProviderDefined` annotation that serialize/deserialize
seamlessly across all GLVs without driver-side configuration. Replaces TP3
custom type mechanism.
* Added Gremlator, a single page web application, that translates Gremlin into
various programming languages like Javascript and Python.
* Added explicit transaction support to all non-Java GLVs (gremlin-python,
gremlin-go, gremlin-javascript, gremlin-dotnet).
+* Added transaction closure methods (`executeInTx`/`evaluateInTx`) on
`GraphTraversalSource` in all GLVs that wrap the begin/commit/rollback
lifecycle, so a lambda receives the transaction-bound `g` and is auto-committed
on success or rolled back on error.
* Changed default transaction close behavior from commit to rollback across
all GLVs to align with embedded graph defaults.
* Refactored Go driver connection to block until response headers arrive,
enabling synchronous error returns and proper transaction ordering.
* Removed `uuid` dependency from `gremlin-javascript` in favor of the built-in
`globalThis.crypto.randomUUID()`.
diff --git a/docs/src/reference/gremlin-variants.asciidoc
b/docs/src/reference/gremlin-variants.asciidoc
index 36ce3b480e..39167b4e25 100644
--- a/docs/src/reference/gremlin-variants.asciidoc
+++ b/docs/src/reference/gremlin-variants.asciidoc
@@ -354,6 +354,34 @@ err = tx.Commit()
if err != nil { log.Fatal(err) }
----
+==== Managed Transaction Blocks
+
+To avoid the manual begin/commit/rollback boilerplate, `g` provides closure
methods that manage the
+lifecycle: the closure receives the transaction-bound `gtx` and the
transaction is committed when it returns a `nil`
+error or rolled back if it returns an error (or panics). `ExecuteInTx` is for
the common no-result case; `EvaluateInTx`
+returns the value the body produces as `interface{}`, which the caller
type-asserts:
+
+[source,go]
+----
+g := gremlingo.Traversal_().With(remote)
+
+// ExecuteInTx: no return value
+err := g.ExecuteInTx(func(gtx *gremlingo.GraphTraversalSource) error {
+ _, e := gtx.AddV("person").Property("name", "alice").Iterate()
+ return <-e
+})
+
+// EvaluateInTx: returns the body's value as interface{}
+v, err := g.EvaluateInTx(func(gtx *gremlingo.GraphTraversalSource)
(interface{}, error) {
+ return gtx.V().Count().Next()
+})
+count := v.(int64)
+----
+
+The transaction runs exactly once (no automatic retry). The body's error is
returned after the rollback; if the commit
+fails, a rollback is still attempted and the commit error is returned. A panic
in the body rolls back and then
+re-panics.
+
==== Driver API
[source,go]
@@ -1055,6 +1083,31 @@ try {
Traversals spawned from `gtx` are bound to the transaction. The driver handles
host pinning and transaction ID
propagation automatically.
+==== Managed Transaction Blocks
+
+To avoid the manual begin/commit/rollback boilerplate, `g` also provides
closure methods that manage the
+transaction lifecycle. The closure receives the transaction-bound `gtx`, and
the transaction is committed when the
+closure completes normally or rolled back if it throws. Use `executeInTx` when
the body returns nothing and
+`evaluateInTx` when it returns a value:
+
+[source,java]
+----
+GraphTraversalSource g =
traversal().with(DriverRemoteConnection.using("localhost", 8182, "g"));
+
+// executeInTx: no return value
+g.executeInTx(gtx -> {
+ gtx.addV("person").property("name", "jorge").iterate();
+ gtx.addV("person").property("name", "josh").iterate();
+});
+
+// evaluateInTx: returns the body's value
+long count = g.evaluateInTx(gtx -> gtx.V().count().next());
+----
+
+The transaction runs exactly once (no automatic retry). If the closure throws,
the original exception is re-thrown
+after the rollback; if the commit fails, a rollback is still attempted to
release server-side resources and the commit
+error propagates. These methods are also available on embedded graphs that
support transactions.
+
==== Driver API
For script-based usage or when working with the `Client` API directly,
transactions can be created from the `Cluster`:
@@ -2017,6 +2070,29 @@ await gtx.addV("person").property("name",
"josh").iterate();
await tx.commit();
----
+==== Managed Transaction Blocks
+
+To avoid the manual begin/commit/rollback boilerplate, `g` provides an
`executeInTx` method that manages the
+lifecycle. The callback receives the transaction-bound `gtx` and may be
`async`; the transaction is committed when the
+callback resolves or rolled back if it throws. `executeInTx` resolves to
whatever the callback returns:
+
+[source,javascript]
+----
+const g = traversal().with_(new
DriverRemoteConnection('http://localhost:8182/gremlin'));
+
+// no return value
+await g.executeInTx(async (gtx) => {
+ await gtx.addV("person").property("name", "jorge").iterate();
+ await gtx.addV("person").property("name", "josh").iterate();
+});
+
+// returns the callback's value
+const count = await g.executeInTx((gtx) => gtx.V().count().next());
+----
+
+The transaction runs exactly once (no automatic retry). If the callback
rejects, the original error is re-thrown after
+the rollback; if the commit fails, a rollback is still attempted and the
commit error propagates.
+
==== Driver API
[source,javascript]
@@ -2656,6 +2732,31 @@ await gtx.AddV("person").Property("name",
"josh").Promise(t => t.Iterate());
await tx.CommitAsync();
----
+==== Managed Transaction Blocks
+
+To avoid the manual begin/commit/rollback boilerplate, `g` provides closure
methods that manage the
+lifecycle. The callback receives the transaction-bound `gtx` and the
transaction is committed when it completes or
+rolled back if it throws. Use `ExecuteInTxAsync` when the body returns nothing
and `EvaluateInTxAsync` when it returns
+a value; both accept an optional `CancellationToken`:
+
+[source,csharp]
+----
+var g = AnonymousTraversalSource.Traversal().With(new
DriverRemoteConnection("localhost", 8182));
+
+// ExecuteInTxAsync: no return value
+await g.ExecuteInTxAsync(async gtx =>
+{
+ await gtx.AddV("person").Property("name", "jorge").Promise(t =>
t.Iterate());
+ await gtx.AddV("person").Property("name", "josh").Promise(t =>
t.Iterate());
+});
+
+// EvaluateInTxAsync: returns the body's value
+var count = await g.EvaluateInTxAsync(gtx => gtx.V().Count().Promise(t =>
t.Next()));
+----
+
+The transaction runs exactly once (no automatic retry). If the callback
throws, the original exception is re-thrown
+after the rollback; if the commit fails, a rollback is still attempted and the
commit error propagates.
+
==== Driver API
[source,csharp]
@@ -3310,6 +3411,26 @@ gtx.addV('person').property('name', 'josh').iterate()
tx.commit()
----
+==== Managed Transaction Blocks
+
+To avoid the manual begin/commit/rollback boilerplate, `g` provides an
`execute_in_tx` method that manages the
+lifecycle. The function receives the transaction-bound `gtx` and the
transaction is committed when it returns or rolled
+back if it raises. `execute_in_tx` returns whatever the function returns:
+
+[source,python]
+----
+g =
traversal().with_remote(DriverRemoteConnection('http://localhost:8182/gremlin',
'g'))
+
+# no return value
+g.execute_in_tx(lambda gtx: gtx.addV('person').property('name',
'jorge').iterate())
+
+# returns the function's value
+count = g.execute_in_tx(lambda gtx: gtx.V().count().next())
+----
+
+The transaction runs exactly once (no automatic retry). If the function
raises, the original exception is re-raised
+after the rollback; if the commit fails, a rollback is still attempted and the
commit error propagates.
+
==== Driver API
[source,python]
diff --git a/docs/src/reference/the-traversal.asciidoc
b/docs/src/reference/the-traversal.asciidoc
index b8886a651a..b085b4a084 100644
--- a/docs/src/reference/the-traversal.asciidoc
+++ b/docs/src/reference/the-traversal.asciidoc
@@ -60,8 +60,54 @@ traversal strategies may not function properly.
image:gremlin-coins.png[width=100,float=right] A
link:http://en.wikipedia.org/wiki/Database_transaction[database transaction]
represents a unit of work to execute against the database. A traversals unit
of work is affected by usage convention
(i.e. the method of <<connecting-gremlin, connecting>>) and the graph
provider's transaction model. Without diving
-deeply into different conventions and models the most general and recommended
approach to working with transactions is
-demonstrated as follows:
+deeply into different conventions and models, the recommended way to work with
transactions is to wrap the unit of
+work in a closure passed to `executeInTx` (or `evaluateInTx` when a value must
be returned), which manages the
+transaction lifecycle for you:
+
+[source,java]
+----
+GraphTraversalSource g = traversal().with(graph);
+// or
+GraphTraversalSource g = traversal().with(conn);
+
+// the closure receives the transaction-bound gtx; the transaction is begun
+// automatically, committed when the closure completes normally, and rolled
+// back if it throws
+g.executeInTx(gtx -> {
+ gtx.addV('person').iterate();
+ gtx.addV('software').iterate();
+});
+
+// use evaluateInTx when the unit of work needs to return a value
+long count = g.evaluateInTx(gtx -> gtx.V().count().next());
+----
+
+This closure form is preferred because it guarantees correctness by default:
the transaction is always committed on
+success and rolled back on any error (or other abnormal exit), so partial work
is never left dangling and there is no
+way to forget a `commit()` or a `rollback()` in an error path. It also reduces
boilerplate, collapsing the common
+begin/do-work/commit cycle into a single call and removing the repetitive
try/catch lifecycle management that is easy
+to get subtly wrong. Because the closure receives `gtx` as its argument, the
transactional source is the one already
+in hand for the unit of work, making it natural to run traversals against
`gtx` rather than accidentally against the
+non-transactional `g` (which would execute as a separate implicit transaction).
+
+In Java, a `GraphTraversalSource` can be backed by either a remote connection
or an embedded graph, and the two model
+transactions differently. In the remote case, `g.tx()` returns a distinct
transaction object each time, keyed by a
+server-side transaction ID, and the `gtx` spawned from it is a genuinely
separate `GraphTraversalSource` from `g`.
+Issuing a traversal against `g` instead of `gtx` silently runs as its own
implicit transaction. In the embedded case,
+`g.tx()` returns a thread-bound transaction (the traditional Java model in
which the transaction is associated with the
+current thread), so `g` and `gtx` actually share the same transaction scope.
Because the closure hands you the correct
+transaction-bound `gtx` and owns the lifecycle, the exact same code is correct
under both paradigms; you do not have to
+reason about whether `g` and `gtx` refer to the same underlying transaction.
(The non-Java GLVs are always remote, so
+this distinction does not arise there.)
+
+The closure runs the unit of work exactly once (there is no automatic retry).
If the closure throws, the original
+error is re-raised after the rollback. If the `commit()` itself fails, the
commit error is raised and a rollback is
+still attempted to release server-side resources.
+
+Manual transaction control (calling `begin()`, `commit()`, and `rollback()`
yourself, shown below) remains fully
+supported, but it should only be reached for when the closure form does not
fit the use case. Examples include when the
+transaction must stay open across several independent units of work, when its
lifetime is driven by external control
+flow that cannot be expressed as a single closure, or when interleaving work
across multiple open transactions:
[source,java]
----
@@ -84,7 +130,7 @@ try {
}
----
-The above example is straightforward and represents a good starting point for
discussing the nuances of transactions
+The manual example above is a good starting point for discussing the nuances
of transactions
in relation to the usage convention and graph provider caveats alluded to
earlier.
Focusing on remote contexts first, note that it is still possible to issue
traversals from `g`, but those will
diff --git a/docs/src/upgrade/release-4.x.x.asciidoc
b/docs/src/upgrade/release-4.x.x.asciidoc
index 176f68c116..712808f57f 100644
--- a/docs/src/upgrade/release-4.x.x.asciidoc
+++ b/docs/src/upgrade/release-4.x.x.asciidoc
@@ -151,6 +151,33 @@ Key behaviors consistent across all GLVs:
See the <<gremlin-drivers-variants,Gremlin Drivers and Variants>> reference
documentation for language-specific
syntax and examples.
+==== Managed Transaction Blocks
+
+In addition to the manual `begin()`/`commit()`/`rollback()` lifecycle, every
GLV (including the Java driver and embedded
+Java graphs) now offers closure-based convenience methods directly on
`GraphTraversalSource` (`g`). You hand `g` a
+function that receives the transaction-bound `g` (`gtx`), and the transaction
lifecycle is managed for you: the
+transaction is begun, your function runs, and it is committed on normal
completion or rolled back if your function
+raises. This reduces the common begin/do-work/commit boilerplate and makes it
harder to accidentally run a traversal
+against the non-transactional `g`, since only the transactional source is in
scope inside the closure. The methods are a
+Traversal-API convenience (they live on `g`, not on the `Transaction` returned
by `g.tx()`).
+
+The method naming follows each language's idiom (the value-returning form is
`evaluateInTx` rather than `call`, because
+`call` is already the `call()` service step on `GraphTraversalSource`):
+
+* **Java**: `g.executeInTx(Consumer)` (no return) and
`g.evaluateInTx(Function)` (returns the body's value).
+* **Python**: `g.execute_in_tx(fn)` — a single method that returns whatever
the function returns.
+* **JavaScript**: `await g.executeInTx(fn)` — the callback may be `async`;
resolves to the callback's return value.
+* **.NET**: `g.ExecuteInTxAsync(Func<..,Task>)` and
`g.EvaluateInTxAsync<T>(Func<..,Task<T>>)`, both accepting an optional
`CancellationToken`.
+* **Go**: `g.ExecuteInTx(func(*GraphTraversalSource) error) error` and
`g.EvaluateInTx(func(*GraphTraversalSource) (interface{}, error)) (interface{},
error)`.
+
+The closure runs the transaction exactly once (no automatic retry). If the
function fails, the original error is
+re-raised after rollback; if `commit()` fails, the commit error is raised and
a rollback is still attempted to release
+server-side resources. The manual `begin()`/`commit()`/`rollback()` API
remains available and unchanged for advanced
+use. See the <<gremlin-drivers-variants,Gremlin Drivers and Variants>>
reference documentation for language-specific
+examples.
+
+See: link:https://issues.apache.org/jira/browse/TINKERPOP-3253[TINKERPOP-3253]
+
==== Transaction Default Close Behavior Changed
The default behavior of `close()` on a remote transaction has been changed
from `commit` to `rollback` across all
diff --git
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/dsl/graph/GraphTraversalSource.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/dsl/graph/GraphTraversalSource.java
index 7a4adb054a..fa9be0a639 100644
---
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/dsl/graph/GraphTraversalSource.java
+++
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/dsl/graph/GraphTraversalSource.java
@@ -51,12 +51,16 @@ import org.apache.tinkerpop.gremlin.structure.Transaction;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
import org.apache.tinkerpop.gremlin.structure.util.empty.EmptyGraph;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.BinaryOperator;
+import java.util.function.Consumer;
+import java.util.function.Function;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
@@ -69,6 +73,8 @@ import java.util.function.UnaryOperator;
* @author Stephen Mallette (http://stephen.genoprime.com)
*/
public class GraphTraversalSource implements TraversalSource {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(GraphTraversalSource.class);
+
protected transient RemoteConnection connection;
protected final Graph graph;
protected TraversalStrategies strategies;
@@ -759,6 +765,93 @@ public class GraphTraversalSource implements
TraversalSource {
return this.connection.tx();
}
+ /**
+ * Runs the supplied unit of work inside a single transaction, managing
the transaction lifecycle automatically.
+ * <p>
+ * This is the no-return (action) form of {@link #evaluateInTx(Function)}.
It is a thin convenience wrapper that
+ * obtains a {@link Transaction} via {@link #tx()}, {@link
Transaction#begin() begins} it, invokes {@code txWork}
+ * with the transaction-bound {@link GraphTraversalSource} ({@code gtx}),
and then {@link Transaction#commit()
+ * commits} on normal completion. If {@code txWork} throws, the
transaction is {@link Transaction#rollback() rolled
+ * back} and the original error is re-thrown unchanged. Because the
lifecycle is driven through {@link #tx()}, the
+ * underlying transaction semantics (embedded thread-bound vs. remote
server session) are whatever the underlying
+ * {@code begin()}/{@code commit()}/{@code rollback()} provide.
+ * <p>
+ * This is a <strong>single-shot</strong> operation - exactly one attempt
is made, with no automatic retry. The
+ * lambda receives the transactional {@code gtx} and should issue its
traversals against that source only.
+ *
+ * @param txWork the unit of work to run against the transaction-bound
{@link GraphTraversalSource}
+ * @see #evaluateInTx(Function)
+ */
+ public void executeInTx(final Consumer<GraphTraversalSource> txWork) {
+ evaluateInTx(gtx -> {
+ txWork.accept(gtx);
+ return null;
+ });
+ }
+
+ /**
+ * Runs the supplied unit of work inside a single transaction, managing
the transaction lifecycle automatically,
+ * and returns the value the work produces.
+ * <p>
+ * This wrapper obtains a {@link Transaction} via {@link #tx()}, {@link
Transaction#begin() begins} it, invokes
+ * {@code txWork} with the transaction-bound {@link GraphTraversalSource}
({@code gtx}), and then
+ * {@link Transaction#commit() commits} on normal completion, returning
the value computed by {@code txWork}.
+ * Error handling:
+ * <ul>
+ * <li>If {@code txWork} throws, the transaction is {@link
Transaction#rollback() rolled back} and the exact
+ * original error is re-thrown to the caller. If that rollback
itself fails, the rollback failure is attached
+ * to the original error via {@link
Throwable#addSuppressed(Throwable)} and a warning is logged, but the
+ * original error still propagates as the primary error.</li>
+ * <li>If {@link Transaction#commit() commit} fails, a {@link
Transaction#rollback() rollback} is attempted
+ * afterward (to avoid leaving transaction resources tied up on the
server), and the commit error is re-thrown
+ * as the primary error. If the follow-up rollback also fails, the
rollback failure is attached to the commit
+ * error via {@link Throwable#addSuppressed(Throwable)} and a
warning is logged.</li>
+ * </ul>
+ * <p>
+ * This is a <strong>single-shot</strong> operation - exactly one attempt
is made, with no automatic retry. The
+ * lambda receives the transactional {@code gtx} and should issue its
traversals against that source only. Because
+ * the lifecycle is driven through {@link #tx()}, the underlying
transaction semantics (embedded thread-bound vs.
+ * remote server session) are whatever the underlying {@code
begin()}/{@code commit()}/{@code rollback()} provide.
+ *
+ * @param txWork the unit of work to run against the transaction-bound
{@link GraphTraversalSource}
+ * @param <T> the type of value produced by {@code txWork}
+ * @return the value produced by {@code txWork}
+ * @see #executeInTx(Consumer)
+ */
+ public <T> T evaluateInTx(final Function<GraphTraversalSource, T> txWork) {
+ final Transaction tx = this.tx();
+ final GraphTraversalSource gtx = tx.begin();
+ final T result;
+ // Phase 1: run the user's work. If it throws, roll back and rethrow
the body error - the
+ // throw below exits the method, so a failed body never reaches the
commit in phase 2.
+ try {
+ result = txWork.apply(gtx);
+ } catch (Throwable bodyError) {
+ try {
+ tx.rollback();
+ } catch (Throwable rollbackError) {
+ bodyError.addSuppressed(rollbackError);
+ LOGGER.warn("Rollback failed after transaction body error",
rollbackError);
+ }
+ throw bodyError;
+ }
+ // Phase 2: the body succeeded, so commit. A separate try because this
failure mode is
+ // distinct (commit, not body): we still roll back for server-side
hygiene, then rethrow
+ // the commit error as the primary error.
+ try {
+ tx.commit();
+ } catch (Throwable commitError) {
+ try {
+ tx.rollback();
+ } catch (Throwable rollbackError) {
+ commitError.addSuppressed(rollbackError);
+ LOGGER.warn("Rollback failed after commit failure",
rollbackError);
+ }
+ throw commitError;
+ }
+ return result;
+ }
+
/**
* If there is an underlying {@link RemoteConnection} it will be closed by
this method.
*/
diff --git
a/gremlin-dotnet/src/Gremlin.Net/Process/Traversal/GraphTraversalSource.cs
b/gremlin-dotnet/src/Gremlin.Net/Process/Traversal/GraphTraversalSource.cs
index 947e5b1ec8..9de184eb88 100644
--- a/gremlin-dotnet/src/Gremlin.Net/Process/Traversal/GraphTraversalSource.cs
+++ b/gremlin-dotnet/src/Gremlin.Net/Process/Traversal/GraphTraversalSource.cs
@@ -24,6 +24,8 @@
using System;
using System.Collections.Generic;
using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
using Gremlin.Net.Driver;
using Gremlin.Net.Process.Remote;
using Gremlin.Net.Process.Traversal.Strategy.Decoration;
@@ -275,6 +277,109 @@ namespace Gremlin.Net.Process.Traversal
return _connection!.Tx(this);
}
+ /// <summary>
+ /// Runs a unit of work inside a transaction whose lifecycle is
managed automatically.
+ /// A transaction is started via <c>this.Tx().BeginAsync()</c>,
the supplied
+ /// <paramref name="txWork"/> is invoked with the transaction-bound
+ /// <see cref="GraphTraversalSource"/> (<c>gtx</c>), and the
transaction is then committed
+ /// on normal completion or rolled back on any failure.
+ ///
+ /// This is a single-shot wrapper (no retry): exactly one
+ /// begin → run → commit/rollback sequence is performed. Only
<c>gtx</c> is in scope inside
+ /// the body; the non-transactional source must not be used. If
<paramref name="txWork"/>
+ /// throws, the transaction is rolled back and the original
exception is re-thrown unchanged.
+ /// If the commit fails, a rollback is attempted for server-side
hygiene and the commit error
+ /// is propagated. A failed rollback during cleanup is swallowed
so it never replaces the
+ /// primary (body or commit) error.
+ /// </summary>
+ /// <param name="txWork">
+ /// The unit of work to run against the transaction-bound
<c>gtx</c>.
+ /// </param>
+ /// <param name="cancellationToken">The token to cancel the
operation.</param>
+ /// <returns>A <see cref="Task"/> that completes when the transaction
has been committed.</returns>
+ public async Task ExecuteInTxAsync(Func<GraphTraversalSource, Task>
txWork,
+ CancellationToken cancellationToken = default)
+ => await EvaluateInTxAsync<object?>(async gtx =>
+ {
+ await txWork(gtx).ConfigureAwait(false);
+ return null;
+ }, cancellationToken).ConfigureAwait(false);
+
+ /// <summary>
+ /// Runs a value-returning unit of work inside a transaction whose
lifecycle is managed
+ /// automatically. A transaction is started via
<c>this.Tx().BeginAsync()</c>, the supplied
+ /// <paramref name="txWork"/> is invoked with the transaction-bound
+ /// <see cref="GraphTraversalSource"/> (<c>gtx</c>), and the
transaction is then committed
+ /// on normal completion or rolled back on any failure.
+ ///
+ /// This is a single-shot wrapper (no retry): exactly one
+ /// begin → run → commit/rollback sequence is performed. Only
<c>gtx</c> is in scope inside
+ /// the body; the non-transactional source must not be used. The
value produced by
+ /// <paramref name="txWork"/> is returned to the caller after a
successful commit. If
+ /// <paramref name="txWork"/> throws, the transaction is rolled
back and the original
+ /// exception is re-thrown unchanged. If the commit fails, a
rollback is attempted for
+ /// server-side hygiene and the commit error is propagated. A
failed rollback during cleanup
+ /// is swallowed so it never replaces the primary (body or commit)
error.
+ /// </summary>
+ /// <typeparam name="T">The type of value produced by <paramref
name="txWork"/>.</typeparam>
+ /// <param name="txWork">
+ /// The unit of work to run against the transaction-bound
<c>gtx</c>. Its result is returned
+ /// to the caller once the transaction commits.
+ /// </param>
+ /// <param name="cancellationToken">The token to cancel the
operation.</param>
+ /// <returns>The value produced by <paramref name="txWork"/>.</returns>
+ public async Task<T> EvaluateInTxAsync<T>(Func<GraphTraversalSource,
Task<T>> txWork,
+ CancellationToken cancellationToken = default)
+ {
+ var tx = this.Tx();
+ var gtx = await
tx.BeginAsync(cancellationToken).ConfigureAwait(false);
+ T result;
+ // Phase 1: run the user's work. If it throws, roll back and
rethrow the body error - the
+ // throw below exits the method, so a failed body never reaches
the commit in phase 2.
+ try
+ {
+ result = await txWork(gtx).ConfigureAwait(false);
+ }
+ catch (Exception)
+ {
+ try
+ {
+ // No cancellation token: if the body failed because the
token was cancelled,
+ // honoring it here would abort the cleanup rollback
before it is sent.
+ await tx.RollbackAsync().ConfigureAwait(false);
+ }
+ catch (Exception)
+ {
+ // Rollback cleanup failure is swallowed so the original
body error stays primary.
+ }
+ throw;
+ }
+
+ // Phase 2: the body succeeded, so commit. A separate try because
this failure mode is
+ // distinct (commit, not body): we still roll back for server-side
hygiene, then rethrow
+ // the commit error as the primary error.
+ try
+ {
+ await tx.CommitAsync(cancellationToken).ConfigureAwait(false);
+ }
+ catch (Exception)
+ {
+ try
+ {
+ // No cancellation token: honoring a cancelled token here
would abort the
+ // cleanup rollback before it is sent, leaving the
transaction open server-side.
+ await tx.RollbackAsync().ConfigureAwait(false);
+ }
+ catch (Exception)
+ {
+ // Rollback cleanup failure is swallowed so the commit
error stays primary.
+ }
+ throw;
+ }
+
+ return result;
+ }
+
/// <summary>
/// Add a GraphComputer class used to execute the traversal.
/// This adds a <see cref="VertexProgramStrategy" /> to the
strategies.
diff --git
a/gremlin-dotnet/test/Gremlin.Net.IntegrationTest/Driver/TransactionTests.cs
b/gremlin-dotnet/test/Gremlin.Net.IntegrationTest/Driver/TransactionTests.cs
index 57df672be9..9e62f09c38 100644
--- a/gremlin-dotnet/test/Gremlin.Net.IntegrationTest/Driver/TransactionTests.cs
+++ b/gremlin-dotnet/test/Gremlin.Net.IntegrationTest/Driver/TransactionTests.cs
@@ -516,6 +516,127 @@ namespace Gremlin.Net.IntegrationTest.Driver
Assert.Equal(0L, await GetCount(client2, "drc_close_test"));
}
+ // Sentinel exception used by the body-throws closure test to assert
the exact
+ // original error (type + message) propagates out of the closure
wrapper.
+ private sealed class SentinelTransactionException : Exception
+ {
+ public SentinelTransactionException(string message) : base(message)
+ {
+ }
+ }
+
+ [Fact]
+ public async Task ShouldCommitOnSuccessWithExecuteInTxAsync()
+ {
+ using var client = CreateClient();
+ await DropGraph(client);
+ var connection = new DriverRemoteConnection(client, "gtx");
+ var g = AnonymousTraversalSource.Traversal().With(connection);
+
+ await g.ExecuteInTxAsync(async gtx => await
gtx.AddV("person").Promise(t => t.Iterate()));
+
+ // The closure committed automatically on success, so the vertex
is persisted.
+ Assert.Equal(1L, await GetCount(client, "person"));
+ }
+
+ [Fact]
+ public async Task
ShouldRollbackAndRethrowWhenExecuteInTxAsyncBodyThrows()
+ {
+ using var client = CreateClient();
+ await DropGraph(client);
+ var connection = new DriverRemoteConnection(client, "gtx");
+ var g = AnonymousTraversalSource.Traversal().With(connection);
+
+ const string sentinelMessage = "sentinel-body-error-3f1c8e";
+
+ // (i) The exact original exception (type + message) propagates to
the caller.
+ var ex = await Assert.ThrowsAsync<SentinelTransactionException>(()
=>
+ g.ExecuteInTxAsync(async gtx =>
+ {
+ await gtx.AddV("person").Promise(t => t.Iterate());
+ throw new SentinelTransactionException(sentinelMessage);
+ }));
+ Assert.Equal(sentinelMessage, ex.Message);
+
+ // (ii) The closure rolled back automatically, so the vertex is
NOT persisted.
+ Assert.Equal(0L, await GetCount(client, "person"));
+ }
+
+ [Fact]
+ public async Task ShouldReturnBodyValueFromEvaluateInTxAsync()
+ {
+ using var client = CreateClient();
+ await DropGraph(client);
+ var connection = new DriverRemoteConnection(client, "gtx");
+ var g = AnonymousTraversalSource.Traversal().With(connection);
+
+ var n = await g.EvaluateInTxAsync(async gtx =>
+ {
+ await gtx.AddV("person").Promise(t => t.Iterate());
+ await gtx.AddV("person").Promise(t => t.Iterate());
+ return await gtx.V().Count().Promise(t => t.Next());
+ });
+
+ // The body counted the two vertices it added within the
transaction.
+ Assert.Equal(2L, n);
+
+ // The closure committed, so the same count is visible afterwards.
+ Assert.Equal(2L, await GetCount(client, "person"));
+ }
+
+ [Fact]
+ public async Task
ShouldThrowWhenOpeningNestedTransactionInsideExecuteInTxAsync()
+ {
+ using var client = CreateClient();
+ await DropGraph(client);
+ var connection = new DriverRemoteConnection(client, "gtx");
+ var g = AnonymousTraversalSource.Traversal().With(connection);
+
+ // Opening a SECOND transaction from inside the body must throw.
The closure body's
+ // own commit will then fail because the body threw, surfacing the
nesting error.
+ await Assert.ThrowsAsync<InvalidOperationException>(() =>
+ g.ExecuteInTxAsync(async gtx =>
+ {
+ // gtx.Tx() legitimately returns the SAME transaction (it
must not throw);
+ // calling BeginAsync() on it opens a second transaction
and must throw.
+ await gtx.Tx().BeginAsync();
+ }));
+ }
+
+ [Fact]
+ public async Task ShouldPropagateCommitErrorFromExecuteInTxAsync()
+ {
+ using var client = CreateClient();
+ await DropGraph(client);
+ var connection = new DriverRemoteConnection(client, "gtx");
+ var g = AnonymousTraversalSource.Traversal().With(connection);
+
+ // Drive a commit failure without a mock: from inside the body,
terminate the
+ // server-side transaction out-of-band (rollback by its
transactionId via a second
+ // client). The wrapper's automatic CommitAsync then fails
server-side with
+ // "Transaction not found", and that commit error must propagate
out of ExecuteInTxAsync.
+ using var sideClient = CreateClient();
+
+ var ex = await Assert.ThrowsAsync<ResponseException>(() =>
+ g.ExecuteInTxAsync(async gtx =>
+ {
+ await gtx.AddV("person").Promise(t => t.Iterate());
+
+ // Roll back this very transaction out-of-band so the
upcoming commit fails.
+ var txId = gtx.Tx().TransactionId!;
+ var rollbackMsg = RequestMessage.Build("g.tx().rollback()")
+ .AddG("gtx")
+ .AddField(Tokens.ArgsTransactionId, txId)
+ .Create();
+ await sideClient.SubmitAsync<object>(rollbackMsg);
+ }));
+
+ Assert.Contains("Transaction not found", ex.Message);
+
+ // The out-of-band rollback already discarded the work, so nothing
is persisted.
+ Assert.Equal(0L, await GetCount(client, "person"));
+ }
+
[Fact]
public async Task ShouldSerializeUnawaitedSubmissions()
{
diff --git a/gremlin-go/driver/graphTraversalSource.go
b/gremlin-go/driver/graphTraversalSource.go
index 727fe165e2..f75ea7c382 100644
--- a/gremlin-go/driver/graphTraversalSource.go
+++ b/gremlin-go/driver/graphTraversalSource.go
@@ -278,3 +278,109 @@ func (gts *GraphTraversalSource) Tx() *Transaction {
}
return &Transaction{client: drc.client}
}
+
+// ExecuteInTx runs txWork inside a single, automatically managed transaction
and
+// returns any error that occurred.
+//
+// It owns the full lifecycle: it obtains a transaction via Tx, calls Begin to
+// start it, passes the resulting transaction-bound GraphTraversalSource (gtx)
to
+// txWork, and then commits on success or rolls back on failure. The body must
use
+// only the gtx it receives; the non-transactional source is never in scope.
+//
+// This is single-shot: exactly one attempt is made (begin, run,
commit/rollback)
+// with no automatic retry.
+//
+// Error handling:
+// - If Begin fails, that error is returned and txWork is never invoked.
+// - If txWork returns a non-nil error, the transaction is rolled back and
the
+// original body error is returned unchanged.
+// - If the commit fails, a rollback is attempted for server-side hygiene and
+// the commit error is returned (it takes precedence over any rollback
error).
+// - If a rollback attempted during cleanup itself fails, a warning is logged
+// but the primary (body or commit) error still propagates.
+// - If txWork panics, the transaction is rolled back and the panic is
+// re-raised (it is never swallowed).
+//
+// For a transaction body that needs to return a value, use EvaluateInTx
instead.
+func (gts *GraphTraversalSource) ExecuteInTx(txWork
func(*GraphTraversalSource) error) error {
+ _, err := gts.EvaluateInTx(func(gtx *GraphTraversalSource)
(interface{}, error) {
+ return nil, txWork(gtx)
+ })
+ return err
+}
+
+// EvaluateInTx runs txWork inside a single, automatically managed transaction
and
+// returns the value produced by txWork along with any error.
+//
+// It is the value-returning counterpart to ExecuteInTx. The value is returned
as
+// interface{}, matching the rest of the driver's untyped result API (e.g.
+// Traversal.Next returns a *Result whose concrete value is obtained via the
+// Result.Get* accessors); the caller type-asserts the returned value as
needed.
+//
+// It owns the full lifecycle: it obtains a transaction via gts.Tx, calls
Begin to
+// start it, passes the resulting transaction-bound GraphTraversalSource (gtx)
to
+// txWork, and then commits on success or rolls back on failure. The body must
use
+// only the gtx it receives; the non-transactional source is never in scope.
+//
+// This is single-shot: exactly one attempt is made (begin, run,
commit/rollback)
+// with no automatic retry.
+//
+// Error handling:
+// - If Begin fails, that error is returned (with a nil value) and txWork is
+// never invoked.
+// - If txWork returns a non-nil error, the transaction is rolled back and
the
+// original body error is returned unchanged, along with the value txWork
+// returned.
+// - If the commit fails, a rollback is attempted for server-side hygiene and
+// the commit error is returned (it takes precedence over any rollback
error).
+// - If a rollback attempted during cleanup itself fails, a warning is logged
+// but the primary (body or commit) error still propagates.
+// - If txWork panics, the transaction is rolled back and the panic is
+// re-raised (it is never swallowed).
+func (gts *GraphTraversalSource) EvaluateInTx(
+ txWork func(*GraphTraversalSource) (interface{}, error)) (interface{},
error) {
+
+ var result interface{}
+ tx := gts.Tx()
+ gtx, err := tx.Begin()
+ if err != nil {
+ return result, err
+ }
+
+ // rollbackQuietly performs a best-effort cleanup rollback. It never
propagates
+ // a failure - a returned error is logged, and a panic from Rollback
itself is
+ // recovered and discarded - so it can never mask the primary (body,
commit, or
+ // panic) error. A failed rollback is not fatal anyway: the server
rolls the
+ // transaction back when it hits its transaction timeout.
+ rollbackQuietly := func() {
+ defer func() { _ = recover() }()
+ if rbErr := tx.Rollback(); rbErr != nil {
+ tx.logRollbackWarning(rbErr)
+ }
+ }
+
+ // A panic in the body must roll back the transaction and then re-panic
so the
+ // original panic is never swallowed.
+ defer func() {
+ if r := recover(); r != nil {
+ rollbackQuietly()
+ panic(r)
+ }
+ }()
+
+ result, err = txWork(gtx)
+ if err != nil {
+ // Body returned an error: roll back and surface the exact
original error.
+ rollbackQuietly()
+ return result, err
+ }
+
+ if commitErr := tx.Commit(); commitErr != nil {
+ // Commit failed: attempt a rollback for server-side hygiene,
but the
+ // commit error remains the primary error returned to the
caller.
+ rollbackQuietly()
+ return result, commitErr
+ }
+
+ return result, nil
+}
diff --git a/gremlin-go/driver/logger.go b/gremlin-go/driver/logger.go
index 9ae624284a..5f294118c5 100644
--- a/gremlin-go/driver/logger.go
+++ b/gremlin-go/driver/logger.go
@@ -116,4 +116,5 @@ const (
closeClient errorKey = "CLOSE_CLIENT"
failedToCloseResponseBody errorKey = "FAILED_TO_CLOSE_RESPONSE_BODY"
failedToCloseDecompReader errorKey =
"FAILED_TO_CLOSE_DECOMPRESSION_READER"
+ rollbackFailedDuringCleanup errorKey = "ROLLBACK_FAILED_DURING_CLEANUP"
)
diff --git a/gremlin-go/driver/resources/logger-messages/en.json
b/gremlin-go/driver/resources/logger-messages/en.json
index f7025e837a..3787040a63 100644
--- a/gremlin-go/driver/resources/logger-messages/en.json
+++ b/gremlin-go/driver/resources/logger-messages/en.json
@@ -22,5 +22,6 @@
"FAILED_TO_RECEIVE_RESPONSE": "Failed to receive response: %s",
"FAILED_TO_SEND_REQUEST": "Failed to send request: %s",
"FAILED_TO_CLOSE_RESPONSE_BODY": "Error closing response body: %s",
- "FAILED_TO_CLOSE_DECOMPRESSION_READER": "Error closing decompression reader:
%s"
+ "FAILED_TO_CLOSE_DECOMPRESSION_READER": "Error closing decompression reader:
%s",
+ "ROLLBACK_FAILED_DURING_CLEANUP": "Rollback failed during transaction
cleanup; the primary error still propagates: %s"
}
diff --git a/gremlin-go/driver/transaction.go b/gremlin-go/driver/transaction.go
index 9f7846031c..e57177a65c 100644
--- a/gremlin-go/driver/transaction.go
+++ b/gremlin-go/driver/transaction.go
@@ -172,6 +172,19 @@ func (t *Transaction) Submit(gremlin string, options
...RequestOptions) (ResultS
return t.client.SubmitWithOptions(gremlin, opts)
}
+// logRollbackWarning logs a warning that a rollback attempted during
transaction
+// cleanup failed. The rollback failure is non-fatal: the primary body or
commit
+// error still propagates to the caller.
+//
+// It backs the rollback-cleanup handling in the GraphTraversalSource
transaction
+// closure helpers (ExecuteInTx / EvaluateInTx); those callers live in the same
+// package and reach the driver's logHandler through this Transaction method.
+func (t *Transaction) logRollbackWarning(rbErr error) {
+ if t.client != nil && t.client.logHandler != nil {
+ t.client.logHandler.logf(Warning, rollbackFailedDuringCleanup,
rbErr.Error())
+ }
+}
+
func extractTransactionId(results []*Result) (string, error) {
if len(results) == 0 {
return "", fmt.Errorf("server did not return transaction ID")
diff --git a/gremlin-go/driver/transaction_test.go
b/gremlin-go/driver/transaction_test.go
index a449913809..9c9cc17674 100644
--- a/gremlin-go/driver/transaction_test.go
+++ b/gremlin-go/driver/transaction_test.go
@@ -21,6 +21,7 @@ package gremlingo
import (
"crypto/tls"
+ "errors"
"testing"
"github.com/stretchr/testify/assert"
@@ -536,7 +537,6 @@ func TestTransactionBeginAfterRollback(t *testing.T) {
assert.Contains(t, err.Error(), "E1101")
}
-
func TestTransactionRollbackOnClientClose(t *testing.T) {
client := newTxClient(t)
dropTxGraph(t, client)
@@ -579,7 +579,6 @@ func TestTransactionRollbackOnDrcClose(t *testing.T) {
assert.Equal(t, int64(0), getTxCount(t, client2, "drc_close_test"))
}
-
func TestTransactionMultiRollback(t *testing.T) {
client := newTxClient(t)
defer client.Close()
@@ -631,3 +630,191 @@ func TestTransactionMultiCommitAndRollback(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, int64(0), getTxCount(t, client, "multi_cr2"))
}
+
+func TestTransactionExecuteInTxCommitsOnSuccess(t *testing.T) {
+ client := newTxClient(t)
+ defer client.Close()
+ dropTxGraph(t, client)
+
+ remote := newTxRemoteConnection(t)
+ defer remote.Close()
+ g := Traversal_().With(remote)
+
+ err := g.ExecuteInTx(func(gtx *GraphTraversalSource) error {
+ promise := gtx.AddV("person").Iterate()
+ return <-promise
+ })
+ assert.Nil(t, err)
+
+ // Committed data is visible outside the transaction.
+ assert.Equal(t, int64(1), getTxCount(t, client, "person"))
+}
+
+func TestTransactionExecuteInTxRollsBackAndRethrowsOnBodyError(t *testing.T) {
+ client := newTxClient(t)
+ defer client.Close()
+ dropTxGraph(t, client)
+
+ remote := newTxRemoteConnection(t)
+ defer remote.Close()
+ g := Traversal_().With(remote)
+
+ sentinel := errors.New("intentional body failure")
+
+ err := g.ExecuteInTx(func(gtx *GraphTraversalSource) error {
+ // Add a vertex, then fail: the add must be rolled back.
+ promise := gtx.AddV("person").Iterate()
+ assert.Nil(t, <-promise)
+ return sentinel
+ })
+
+ // (i) the exact original error is returned unchanged.
+ assert.NotNil(t, err)
+ assert.True(t, errors.Is(err, sentinel))
+ assert.Equal(t, sentinel, err)
+
+ // (ii) the vertex was NOT persisted (rollback happened).
+ assert.Equal(t, int64(0), getTxCount(t, client, "person"))
+}
+
+// EvaluateInTx returns the value computed by the body as interface{}, matching
+// the driver's untyped result API; the caller type-asserts it.
+func TestTransactionEvaluateInTxReturnsValue(t *testing.T) {
+ client := newTxClient(t)
+ defer client.Close()
+ dropTxGraph(t, client)
+
+ remote := newTxRemoteConnection(t)
+ defer remote.Close()
+ g := Traversal_().With(remote)
+
+ v, err := g.EvaluateInTx(func(gtx *GraphTraversalSource) (interface{},
error) {
+ // Add two vertices and return the in-transaction count.
+ promise := gtx.AddV("widget").Iterate()
+ if pErr := <-promise; pErr != nil {
+ return nil, pErr
+ }
+ promise = gtx.AddV("widget").Iterate()
+ if pErr := <-promise; pErr != nil {
+ return nil, pErr
+ }
+ counts, cErr := gtx.V().HasLabel("widget").Count().ToList()
+ if cErr != nil {
+ return nil, cErr
+ }
+ count, cErr := counts[0].GetInt64()
+ if cErr != nil {
+ return nil, cErr
+ }
+ return count, nil
+ })
+ assert.Nil(t, err)
+ // the body returned an int64; the caller type-asserts the interface{}
result
+ assert.Equal(t, int64(2), v)
+
+ // The committed value matches what was returned.
+ assert.Equal(t, int64(2), getTxCount(t, client, "widget"))
+}
+
+// Opening a SECOND transaction from inside the body must error. gtx.Tx()
itself
+// legitimately returns the SAME transaction (it must not error - that is the
+// commit path), but a nested Begin() on it is rejected by the existing
+// double-begin guard (E1101).
+func TestTransactionExecuteInTxRejectsNestedBegin(t *testing.T) {
+ client := newTxClient(t)
+ defer client.Close()
+ dropTxGraph(t, client)
+
+ remote := newTxRemoteConnection(t)
+ defer remote.Close()
+ g := Traversal_().With(remote)
+
+ err := g.ExecuteInTx(func(gtx *GraphTraversalSource) error {
+ // gtx.Tx() returns the bound, already-open transaction (must
NOT error).
+ tx := gtx.Tx()
+ assert.True(t, tx.IsOpen())
+
+ // gtx.Tx() called again returns that same transaction handle.
+ assert.Equal(t, tx, gtx.Tx())
+
+ // Opening a second transaction (a nested begin) IS rejected.
+ _, beginErr := tx.Begin()
+ assert.NotNil(t, beginErr)
+ assert.Contains(t, beginErr.Error(), "E1101")
+
+ // Surface the nested-begin error from the body so the wrapper
rolls back.
+ return beginErr
+ })
+
+ assert.NotNil(t, err)
+ assert.Contains(t, err.Error(), "E1101")
+}
+
+// Verifies that a failure of the wrapper's automatic Commit() is surfaced from
+// ExecuteInTx. To drive a deterministic, no-mock commit failure, the body
+// succeeds but the transaction is rolled back from a SEPARATE connection (by
its
+// transactionId) before the body returns; the wrapper's Commit() is then a
real
+// commit RPC that the server rejects with "Transaction not found".
+func TestTransactionExecuteInTxPropagatesCommitFailure(t *testing.T) {
+ client := newTxClient(t)
+ defer client.Close()
+ dropTxGraph(t, client)
+
+ // a second connection used to kill the transaction out-of-band
+ sideClient := newTxClient(t)
+ defer sideClient.Close()
+
+ remote := newTxRemoteConnection(t)
+ defer remote.Close()
+ g := Traversal_().With(remote)
+
+ err := g.ExecuteInTx(func(gtx *GraphTraversalSource) error {
+ if pErr := <-gtx.AddV("commit_fail").Iterate(); pErr != nil {
+ return pErr
+ }
+ // Roll the transaction back from a separate connection,
targeting it by its
+ // transactionId. The body still returns nil, so the wrapper
proceeds to commit -
+ // which now fails server-side because the transaction no
longer exists.
+ txId := gtx.Tx().TransactionId()
+ _, rbErr := sideClient.SubmitWithOptions("g.tx().rollback()",
+
new(RequestOptionsBuilder).SetTransactionId(txId).Create())
+ return rbErr
+ })
+
+ // the commit failure is the error surfaced to the caller
+ assert.NotNil(t, err)
+ assert.Contains(t, err.Error(), "Transaction not found")
+
+ // data was not persisted (the transaction was rolled back out-of-band)
+ assert.Equal(t, int64(0), getTxCount(t, client, "commit_fail"))
+}
+
+func TestTransactionExecuteInTxRollsBackAndRepanicsOnPanic(t *testing.T) {
+ client := newTxClient(t)
+ defer client.Close()
+ dropTxGraph(t, client)
+
+ remote := newTxRemoteConnection(t)
+ defer remote.Close()
+ g := Traversal_().With(remote)
+
+ func() {
+ defer func() {
+ r := recover()
+ // The panic must propagate out of ExecuteInTx (not be
swallowed).
+ assert.NotNil(t, r)
+ assert.Equal(t, "intentional body panic", r)
+ }()
+
+ _ = g.ExecuteInTx(func(gtx *GraphTraversalSource) error {
+ promise := gtx.AddV("panic_vertex").Iterate()
+ assert.Nil(t, <-promise)
+ panic("intentional body panic")
+ })
+ // Unreachable: ExecuteInTx must re-panic.
+ assert.Fail(t, "expected ExecuteInTx to re-panic")
+ }()
+
+ // The transaction was rolled back during panic handling: nothing
persisted.
+ assert.Equal(t, int64(0), getTxCount(t, client, "panic_vertex"))
+}
diff --git a/gremlin-js/gremlin-javascript/lib/process/graph-traversal.ts
b/gremlin-js/gremlin-javascript/lib/process/graph-traversal.ts
index b82f8c9362..e4df507086 100644
--- a/gremlin-js/gremlin-javascript/lib/process/graph-traversal.ts
+++ b/gremlin-js/gremlin-javascript/lib/process/graph-traversal.ts
@@ -81,6 +81,74 @@ export class GraphTraversalSource {
return new Transaction(client);
}
+ /**
+ * Runs a unit of work inside a transaction, owning the full lifecycle.
+ *
+ * The transaction is begun automatically and the resulting transaction-bound
+ * <code>GraphTraversalSource</code> (<code>gtx</code>) is passed to the
+ * callback as its sole argument. Only <code>gtx</code> is in scope inside
the
+ * callback, which avoids accidentally issuing traversals against the
+ * non-transactional <code>g</code>.
+ *
+ * On normal completion of the callback the transaction is committed; on any
+ * abnormal exit (a thrown error or a rejected promise) the transaction is
+ * rolled back. This is a single-shot operation: there is exactly one attempt
+ * (begin -> run -> commit/rollback) and no retry. A second transaction may
not
+ * be opened from inside the callback (calling <code>gtx.tx().begin()</code>
+ * throws via the existing double-begin guard).
+ *
+ * The callback may be synchronous or asynchronous; its return value (or the
+ * value its promise resolves to) is awaited and returned from
+ * <code>executeInTx</code>. If the callback returns nothing the result is
+ * <code>undefined</code>.
+ *
+ * Error handling: if the callback throws/rejects, the original error is
+ * re-raised unchanged after rollback is attempted. If the commit itself
fails,
+ * the commit error propagates and a rollback is still attempted afterward
for
+ * server-side hygiene. In both cases a failed rollback during cleanup is
logged
+ * as a warning and never replaces the primary error.
+ *
+ * @param txWork callback that receives the transaction-bound
GraphTraversalSource
+ * @returns {Promise<T>} resolves to the callback's value (undefined if none)
+ */
+ async executeInTx<T>(txWork: (gtx: GraphTraversalSource) => T | Promise<T>):
Promise<T> {
+ const tx = this.tx();
+ const gtx = await tx.begin();
+
+ let result: T;
+ // Phase 1: run the user's work. If it throws, roll back and rethrow the
body error - the
+ // throw below exits the method, so a failed body never reaches the commit
in phase 2.
+ try {
+ result = await txWork(gtx);
+ } catch (bodyError) {
+ try {
+ await tx.rollback();
+ } catch (rollbackError) {
+ console.warn('Rollback failed after transaction body error',
rollbackError);
+ }
+ // Re-raise the exact original error; rollback failures never replace it.
+ throw bodyError;
+ }
+
+ // Phase 2: the body succeeded, so commit. A separate try because this
failure mode is
+ // distinct (commit, not body): we still roll back for server hygiene,
then rethrow the
+ // commit error as the primary error.
+ try {
+ await tx.commit();
+ } catch (commitError) {
+ try {
+ // Attempt rollback so the server does not keep the transaction around.
+ await tx.rollback();
+ } catch (rollbackError) {
+ console.warn('Rollback failed after commit failure', rollbackError);
+ }
+ // The commit error stays primary; rollback was only for server hygiene.
+ throw commitError;
+ }
+
+ return result;
+ }
+
/**
* @param graphComputer
* @param workers
diff --git
a/gremlin-js/gremlin-javascript/test/integration/transaction-tests.js
b/gremlin-js/gremlin-javascript/test/integration/transaction-tests.js
index 5abfc8436c..01a95367df 100644
--- a/gremlin-js/gremlin-javascript/test/integration/transaction-tests.js
+++ b/gremlin-js/gremlin-javascript/test/integration/transaction-tests.js
@@ -421,4 +421,118 @@ describe('Transaction', function () {
await nonTxClient.close();
});
});
+
+ describe('Closure API (g.executeInTx())', function () {
+ it('should commit on success', async function () {
+ const connection = getConnection('gtx');
+ const g = anon.traversal().withRemote(connection);
+
+ await g.executeInTx(async (gtx) => {
+ await gtx.addV('person').iterate();
+ });
+
+ // Fresh non-transactional query confirms the vertex was committed.
+ const count = await g.V().hasLabel('person').count().next();
+ assert.strictEqual(count.value, 1);
+
+ await connection.close();
+ });
+
+ it('should rollback and rethrow when the body throws', async function () {
+ const connection = getConnection('gtx');
+ const g = anon.traversal().withRemote(connection);
+
+ const sentinelMessage = 'sentinel body failure 0xC0FFEE';
+ class SentinelError extends Error {}
+
+ let caught;
+ try {
+ await g.executeInTx(async (gtx) => {
+ await gtx.addV('person').iterate();
+ throw new SentinelError(sentinelMessage);
+ });
+ assert.fail('Expected executeInTx() to rethrow the body error');
+ } catch (e) {
+ caught = e;
+ }
+
+ // (i) the exact original error (type + message) is rethrown to the
caller
+ assert.ok(caught instanceof SentinelError);
+ assert.strictEqual(caught.message, sentinelMessage);
+
+ // (ii) the vertex added in the body was NOT persisted (rollback
happened)
+ const count = await g.V().hasLabel('person').count().next();
+ assert.strictEqual(count.value, 0);
+
+ await connection.close();
+ });
+
+ it('should return the body value', async function () {
+ const connection = getConnection('gtx');
+ const g = anon.traversal().withRemote(connection);
+
+ // Seed a vertex outside the transaction so the body can count it.
+ await g.addV('person').iterate();
+
+ const n = await g.executeInTx((gtx) => gtx.V().count().next());
+ assert.strictEqual(n.value, 1);
+
+ await connection.close();
+ });
+
+ it('should reject opening a nested transaction in the body', async
function () {
+ const connection = getConnection('gtx');
+ const g = anon.traversal().withRemote(connection);
+
+ await assert.rejects(
+ () =>
+ g.executeInTx(async (gtx) => {
+ // gtx.tx() legitimately returns the SAME transaction; calling
begin()
+ // on it opens a second transaction and trips the double-begin
guard.
+ await gtx.tx().begin();
+ }),
+ /Transaction already started/
+ );
+
+ await connection.close();
+ });
+
+ // Forces a real, no-mock commit failure by finalizing the transaction from
+ // inside the body: gtx.tx() returns the same Transaction, and calling
+ // rollback() on it closes it. executeInTx's own trailing commit() then
fails
+ // because the transaction is no longer open, and that error propagates.
+ it('should propagate a commit failure out of executeInTx', async function
() {
+ const connection = getConnection('gtx');
+ const g = anon.traversal().withRemote(connection);
+ // a second connection used to kill the transaction out-of-band
+ const sideClient = getClient('gtx');
+
+ // To drive a deterministic, no-mock commit failure, the body succeeds
but the
+ // transaction is rolled back from a separate connection (by its
transactionId)
+ // before the body returns; executeInTx's commit() is then a real commit
RPC that
+ // the server rejects (HTTP 404, "Transaction not found").
+ let caught;
+ try {
+ await g.executeInTx(async (gtx) => {
+ await gtx.addV('person').iterate();
+ const transactionId = gtx.tx().transactionId;
+ await sideClient.submit('g.tx().rollback()', null, { transactionId
});
+ });
+ assert.fail('Expected executeInTx() to propagate the commit failure');
+ } catch (e) {
+ caught = e;
+ }
+ // The server maps a missing/closed transaction to HTTP 404; the body
text is on
+ // statusMessage (the driver puts the HTTP status line in the Error
message).
+ assert.strictEqual(caught.statusCode, 404);
+ assert.ok(caught.statusMessage.includes('Transaction not found'));
+
+ // The body's work was not persisted (the transaction was rolled back
out-of-band).
+ const count = await g.V().hasLabel('person').count().next();
+ assert.strictEqual(count.value, 0);
+
+ await sideClient.close();
+ await connection.close();
+ });
+ });
});
diff --git
a/gremlin-python/src/main/python/gremlin_python/process/graph_traversal.py
b/gremlin-python/src/main/python/gremlin_python/process/graph_traversal.py
index 411ceeb806..399c44e448 100644
--- a/gremlin-python/src/main/python/gremlin_python/process/graph_traversal.py
+++ b/gremlin-python/src/main/python/gremlin_python/process/graph_traversal.py
@@ -164,6 +164,59 @@ class GraphTraversalSource(object):
from gremlin_python.driver.transaction import Transaction
return Transaction(remote_connection._client)
+ def execute_in_tx(self, tx_work):
+ """Runs a unit of work inside a transaction, managing its lifecycle.
+
+ The transaction is started automatically via tx().begin() and the
+ transaction-bound GraphTraversalSource (gtx) is passed to tx_work as
its
+ sole argument. Only gtx should be used inside the body; the
+ non-transactional g is not in scope. On normal completion the
+ transaction is committed and the body's return value is returned
+ (None if the body returns nothing). If the body raises, the
+ transaction is rolled back and the exact original exception is
+ re-raised unchanged.
+
+ This is single-shot: exactly one begin -> run -> commit/rollback
+ attempt is made; there is no automatic retry. If the body raises and
+ the follow-up rollback also fails, a warning is logged and the original
+ body exception still propagates. If commit() fails (e.g. the server
+ already rolled the transaction back), a rollback is attempted for
+ server-side hygiene and the commit error propagates as the primary
+ error.
+
+ For example, g.execute_in_tx(lambda gtx: gtx.addV('person').iterate())
+ runs the body and commits, while
+ count = g.execute_in_tx(lambda gtx: gtx.V().count().next()) returns the
+ body's value.
+ """
+ tx = self.tx()
+ gtx = tx.begin()
+ # Phase 1: run the user's work. If it raises, roll back and re-raise
the body error - the
+ # bare `raise` exits the method, so a failed body never reaches the
commit in phase 2.
+ try:
+ result = tx_work(gtx)
+ except BaseException:
+ # Catch BaseException so any abnormal exit (including
KeyboardInterrupt) rolls back.
+ try:
+ tx.rollback()
+ except BaseException as rollback_error:
+ # The cleanup rollback failure is only logged - the bare
`raise` below re-raises
+ # the original body error unchanged so it always stays the
primary error.
+ log.warning("Rollback failed after transaction body error:
%s", rollback_error)
+ raise
+ # Phase 2: the body succeeded, so commit. A separate try because this
failure mode is
+ # distinct (commit, not body): we still roll back for server-side
hygiene, then re-raise
+ # the commit error as the primary error.
+ try:
+ tx.commit()
+ except Exception:
+ try:
+ tx.rollback()
+ except BaseException as rollback_error:
+ log.warning("Rollback failed after commit failure: %s",
rollback_error)
+ raise
+ return result
+
def withComputer(self, graph_computer=None, workers=None, result=None,
persist=None, vertices=None,
edges=None, configuration=None):
warnings.warn(
diff --git
a/gremlin-python/src/main/python/tests/integration/driver/test_transaction.py
b/gremlin-python/src/main/python/tests/integration/driver/test_transaction.py
index 77b5a04241..d7f2da1111 100644
---
a/gremlin-python/src/main/python/tests/integration/driver/test_transaction.py
+++
b/gremlin-python/src/main/python/tests/integration/driver/test_transaction.py
@@ -410,3 +410,89 @@ class TestTransaction(object):
tx2.rollback()
result =
client.submit("g.V().hasLabel('multi_cr2').count()").all().result()
assert result[0] == 0
+
+ def test_execute_in_tx_commits_on_success(self, remote_connection):
+ g = traversal().with_(remote_connection)
+
+ g.execute_in_tx(lambda gtx: gtx.addV('person').property('name',
'exec_commit').iterate())
+
+ c = Client(test_no_auth_url, 'gtx')
+ result =
c.submit("g.V().has('person','name','exec_commit').count()").all().result()
+ assert result[0] == 1
+ c.close()
+
+ def test_execute_in_tx_rolls_back_and_rethrows_when_body_throws(self,
remote_connection):
+ # Asserts BOTH that the exact original exception (type + message)
+ # propagates and that the vertex added before the raise is NOT
persisted.
+ g = traversal().with_(remote_connection)
+
+ def body(gtx):
+ gtx.addV('person').property('name', 'exec_throw').iterate()
+ raise RuntimeError("simulated body failure 0xDEADBEEF")
+
+ with pytest.raises(RuntimeError, match="simulated body failure
0xDEADBEEF"):
+ g.execute_in_tx(body)
+
+ c = Client(test_no_auth_url, 'gtx')
+ result =
c.submit("g.V().has('person','name','exec_throw').count()").all().result()
+ assert result[0] == 0
+ c.close()
+
+ def test_execute_in_tx_returns_body_value(self, remote_connection):
+ g = traversal().with_(remote_connection)
+
+ # Seed two vertices in their own committed transaction so the
+ # value-returning body has something to count.
+ g.execute_in_tx(lambda gtx: gtx.addV('person').iterate())
+ g.execute_in_tx(lambda gtx: gtx.addV('person').iterate())
+
+ count = g.execute_in_tx(lambda gtx: gtx.V().count().next())
+ assert count == 2
+
+ def test_execute_in_tx_rejects_nested_transaction(self, remote_connection):
+ # Opening a SECOND transaction inside the body must raise. gtx.tx()
+ # itself legitimately returns the same transaction (so we do NOT assert
+ # it raises); calling begin() on it does raise.
+ g = traversal().with_(remote_connection)
+
+ def body(gtx):
+ # gtx.tx() returns the same (already-open) transaction; begin()
+ # on an already-open transaction is the double-begin guard.
+ gtx.tx().begin()
+
+ with pytest.raises(Exception, match="Transaction already started"):
+ g.execute_in_tx(body)
+
+ def test_execute_in_tx_propagates_commit_failure(self, remote_connection):
+ # To drive a deterministic, no-mock commit failure, the body succeeds
but
+ # the transaction is rolled back server-side from a separate connection
+ # (by its transactionId) before the body returns, leaving the id
invalid.
+ # The commit() inside execute_in_tx() then fails with "Transaction not
+ # found", and that commit error propagates to the caller.
+ g = traversal().with_(remote_connection)
+
+ invalidator = Client(test_no_auth_url, 'gtx')
+
+ def body(gtx):
+ # Add work, then invalidate the transaction server-side by rolling
+ # it back through a separate plain client that targets this
+ # transaction's id. The body itself completes normally so
execute_in_tx()
+ # proceeds to commit(), which will now fail.
+ gtx.addV('person').property('name', 'exec_commit_fail').iterate()
+ tx_id = gtx.tx().transaction_id
+ invalidator.submit("g.tx().rollback()",
+ request_options={'transactionId':
tx_id}).all().result()
+
+ try:
+ with pytest.raises(Exception) as exc_info:
+ g.execute_in_tx(body)
+ # The commit error is the primary error surfaced to the caller.
+ assert "Transaction not found" in str(exc_info.value)
+ finally:
+ invalidator.close()
+
+ # Nothing was persisted (the server rolled the transaction back).
+ c = Client(test_no_auth_url, 'gtx')
+ result =
c.submit("g.V().has('person','name','exec_commit_fail').count()").all().result()
+ assert result[0] == 0
+ c.close()
diff --git
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverTransactionIntegrateTest.java
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverTransactionIntegrateTest.java
index 69f387b17e..5f0e69d43c 100644
---
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverTransactionIntegrateTest.java
+++
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverTransactionIntegrateTest.java
@@ -43,6 +43,7 @@ import java.util.concurrent.TimeUnit;
import static
org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource.traversal;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.hamcrest.core.StringContains.containsString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -703,4 +704,128 @@ public class GremlinDriverTransactionIntegrateTest
extends AbstractGremlinServer
cluster2.close();
}
}
+
+ /**
+ * A sentinel exception used by {@link
#shouldRollbackAndRethrowOriginalErrorWhenTxClosureBodyThrows()} to verify
+ * that the exact original error thrown by an {@code executeInTx}/{@code
evaluateInTx} body propagates to the
+ * caller unchanged.
+ */
+ private static class TxClosureSentinelException extends RuntimeException {
+ TxClosureSentinelException(final String message) {
+ super(message);
+ }
+ }
+
+ @Test
+ public void shouldCommitWhenTxClosureBodyCompletes() throws Exception {
+ final Client client = cluster.connect().alias(GTX);
+ final GraphTraversalSource g =
traversal().with(DriverRemoteConnection.using(cluster, GTX));
+
+ g.executeInTx(gtx -> gtx.addV("person").property("name",
"alice").iterate());
+
+ // committed data is visible to non-transactional reads
+ assertEquals(1L,
client.submit("g.V().hasLabel('person').count()").one().getLong());
+
+ client.close();
+ }
+
+ @Test
+ public void shouldRollbackAndRethrowOriginalErrorWhenTxClosureBodyThrows()
throws Exception {
+ final Client client = cluster.connect().alias(GTX);
+ final GraphTraversalSource g =
traversal().with(DriverRemoteConnection.using(cluster, GTX));
+
+ try {
+ g.executeInTx(gtx -> {
+ gtx.addV("person").property("name", "bob").iterate();
+ throw new TxClosureSentinelException("boom");
+ });
+ fail("The exact exception thrown by the closure body should
propagate to the caller");
+ } catch (Exception ex) {
+ // (i) the exact original error (type + message) propagates to the
caller, unchanged
+ assertThat(ex, instanceOf(TxClosureSentinelException.class));
+ assertEquals("boom", ex.getMessage());
+ }
+
+ // (ii) the vertex added before the error was rolled back and is not
persisted
+ assertEquals(0L,
client.submit("g.V().hasLabel('person').count()").one().getLong());
+
+ client.close();
+ }
+
+ @Test
+ public void shouldReturnBodyValueFromTxClosureCall() throws Exception {
+ final Client client = cluster.connect().alias(GTX);
+ final GraphTraversalSource g =
traversal().with(DriverRemoteConnection.using(cluster, GTX));
+
+ final Long count = g.evaluateInTx(gtx -> {
+ gtx.addV("person").iterate();
+ gtx.addV("person").iterate();
+ return gtx.V().hasLabel("person").count().next();
+ });
+
+ // the value computed in the body is returned to the caller
+ assertEquals(Long.valueOf(2L), count);
+
+ // the work also committed, so the data is visible afterward
+ assertEquals(2L,
client.submit("g.V().hasLabel('person').count()").one().getLong());
+
+ client.close();
+ }
+
+ @Test
+ public void shouldRejectOpeningSecondTransactionInsideTxClosureBody()
throws Exception {
+ final GraphTraversalSource g =
traversal().with(DriverRemoteConnection.using(cluster, GTX));
+
+ g.executeInTx(gtx -> {
+ // gtx.tx() itself is legitimate and returns a transaction handle
- it must NOT throw, as it is the
+ // standard way to commit/rollback when holding a transactional
source.
+ final Transaction nested = gtx.tx();
+ assertNotNull(nested);
+
+ // opening a SECOND transaction from within an already-open one
must raise. The remote
+ // HttpRemoteTransaction.begin() guards against double-begin, so
the remote nesting test asserts on begin().
+ try {
+ nested.begin();
+ fail("Opening a second transaction from within an already-open
one should raise");
+ } catch (Exception ex) {
+ // expected - the transaction is already started
+ assertThat(ex, instanceOf(IllegalStateException.class));
+ assertThat(ex.getMessage(), containsString("Transaction
already started"));
+ }
+
+ gtx.addV("person").iterate();
+ });
+ }
+
+ @Test
+ public void shouldPropagateCommitFailureFromTxClosure() throws Exception {
+ final Client client = cluster.connect().alias(GTX);
+ // a second connection used to kill the transaction out-of-band, so
the closure's own commit fails
+ final Client sideClient = cluster.connect().alias(GTX);
+ final GraphTraversalSource g =
traversal().with(DriverRemoteConnection.using(cluster, GTX));
+
+ // The closure body succeeds, but the transaction is rolled back
server-side from a separate
+ // connection (by its transactionId) before the body returns. The
wrapper's automatic commit() then
+ // fails with "Transaction not found", and that commit error must
propagate out of executeInTx().
+ try {
+ g.executeInTx(gtx -> {
+ gtx.addV("person").property("name", "doomed_commit").iterate();
+ final String txId = ((RemoteTransaction)
gtx.tx()).getTransactionId();
+ sideClient.submit("g.tx().rollback()",
+
RequestOptions.build().addG(GTX).transactionId(txId).create()).all().join();
+ });
+ fail("The commit failure should propagate out of executeInTx()");
+ } catch (Exception ex) {
+ // the commit failure is the error surfaced to the caller; the
server reports it as
+ // "Transaction not found" (asserted on the root cause, as the
other transaction-error tests do)
+ final Throwable root = ExceptionHelper.getRootCause(ex);
+ assertThat(root.getMessage(), containsString("Transaction not
found"));
+ }
+
+ // data was not persisted (the transaction was rolled back out-of-band)
+ assertEquals(0L,
client.submit("g.V().hasLabel('person').count()").one().getLong());
+
+ client.close();
+ sideClient.close();
+ }
}
diff --git
a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/structure/TransactionTest.java
b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/structure/TransactionTest.java
index bd6fc31104..4efc21576b 100644
---
a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/structure/TransactionTest.java
+++
b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/structure/TransactionTest.java
@@ -44,6 +44,7 @@ import static
org.apache.tinkerpop.gremlin.structure.Graph.Features.VertexProper
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.fail;
@@ -980,4 +981,91 @@ public class TransactionTest extends AbstractGremlinTest {
IteratorUtils.count(graph.vertices())
);
}
+
+ /**
+ * A sentinel exception used by {@link
#shouldRollbackAndRethrowOriginalErrorWhenTxClosureBodyThrows()} to verify
+ * that the exact original error thrown by an {@code executeInTx}/{@code
evaluateInTx} body propagates to the
+ * caller unchanged.
+ */
+ private static class TxClosureSentinelException extends RuntimeException {
+ TxClosureSentinelException(final String message) {
+ super(message);
+ }
+ }
+
+ @Test
+ @FeatureRequirement(featureClass = Graph.Features.VertexFeatures.class,
feature = Graph.Features.VertexFeatures.FEATURE_ADD_VERTICES)
+ @FeatureRequirement(featureClass = Graph.Features.GraphFeatures.class,
feature = FEATURE_TRANSACTIONS)
+ public void shouldCommitWhenTxClosureBodyCompletes() {
+ g.executeInTx(gtx -> gtx.addV("person").iterate());
+
+ // committed data is visible to reads outside the closure
+ assertEquals(1L, g.V().hasLabel("person").count().next().longValue());
+ }
+
+ @Test
+ @FeatureRequirement(featureClass = Graph.Features.VertexFeatures.class,
feature = Graph.Features.VertexFeatures.FEATURE_ADD_VERTICES)
+ @FeatureRequirement(featureClass = Graph.Features.GraphFeatures.class,
feature = FEATURE_TRANSACTIONS)
+ public void shouldRollbackAndRethrowOriginalErrorWhenTxClosureBodyThrows()
{
+ try {
+ g.executeInTx(gtx -> {
+ gtx.addV("person").iterate();
+ throw new TxClosureSentinelException("boom");
+ });
+ fail("The exact exception thrown by the closure body should
propagate to the caller");
+ } catch (Exception ex) {
+ // (i) the exact original error (type + message) propagates to the
caller, unchanged
+ assertThat(ex, instanceOf(TxClosureSentinelException.class));
+ assertEquals("boom", ex.getMessage());
+ }
+
+ // (ii) the vertex added before the error was rolled back and is not
persisted
+ assertEquals(0L, g.V().hasLabel("person").count().next().longValue());
+ }
+
+ @Test
+ @FeatureRequirement(featureClass = Graph.Features.VertexFeatures.class,
feature = Graph.Features.VertexFeatures.FEATURE_ADD_VERTICES)
+ @FeatureRequirement(featureClass = Graph.Features.GraphFeatures.class,
feature = FEATURE_TRANSACTIONS)
+ public void shouldReturnBodyValueFromTxClosureCall() {
+ final long count = g.evaluateInTx(gtx -> {
+ gtx.addV("person").iterate();
+ gtx.addV("person").iterate();
+ return gtx.V().hasLabel("person").count().next();
+ });
+
+ // the value computed in the body is returned to the caller
+ assertEquals(2L, count);
+
+ // the work also committed, so the data is visible afterward
+ assertEquals(2L, g.V().hasLabel("person").count().next().longValue());
+ }
+
+ @Test
+ @FeatureRequirement(featureClass = Graph.Features.VertexFeatures.class,
feature = Graph.Features.VertexFeatures.FEATURE_ADD_VERTICES)
+ @FeatureRequirement(featureClass = Graph.Features.GraphFeatures.class,
feature = FEATURE_TRANSACTIONS)
+ public void shouldRejectOpeningSecondTransactionInsideTxClosureBody() {
+ g.executeInTx(gtx -> {
+ // gtx.tx() itself is legitimate and returns the (same)
transaction - it must NOT throw, as it is the
+ // standard way to commit/rollback when holding a transactional
source.
+ final Transaction nested = gtx.tx();
+ assertNotNull(nested);
+
+ // opening a SECOND transaction from within an already-open one
must raise. On the embedded impl
+ // (TinkerTransaction) begin() calls doOpen() unconditionally with
no double-open guard, so the guard lives
+ // in AbstractTransaction.open() (transactionAlreadyOpen()) -
hence the embedded nesting test asserts on
+ // open(), not begin().
+ try {
+ nested.open();
+ fail("Opening a second transaction from within an already-open
one should raise");
+ } catch (Exception ex) {
+ // expected - a transaction is already open
+ assertThat(ex, instanceOf(IllegalStateException.class));
+ }
+
+ gtx.addV("person").iterate();
+ });
+
+ // the outer transaction still committed normally
+ assertEquals(1L, g.V().hasLabel("person").count().next().longValue());
+ }
}