This is an automated email from the ASF dual-hosted git repository. kenhuuu pushed a commit to branch master-tx-client in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit 77732f46432383cc636020704deacacc5f53de79 Author: Ken Hu <[email protected]> AuthorDate: Fri Mar 20 14:38:38 2026 -0700 Add HTTP transaction support to gremlin-driver Introduce HttpRemoteTransaction and TransactionRemoteConnection to manage transaction lifecycle. RequestSubmitter and RequestSubmitterAsync interfaces were extracted out of Client to ensure transactions would have the same submission interface. Since Clients and Transactions now have a similar submission interface, they are both obtained from a Cluster. Transactions are meant to be short-lived then closed and Clients remain long-lived and preferrably re-used. --- CHANGELOG.asciidoc | 1 + .../traversal/dsl/graph/GraphTraversalSource.java | 13 +- .../apache/tinkerpop/gremlin/driver/Client.java | 69 ++- .../apache/tinkerpop/gremlin/driver/Cluster.java | 82 +-- .../gremlin/driver/RemoteTransaction.java | 36 ++ .../tinkerpop/gremlin/driver/RequestOptions.java | 31 ++ .../tinkerpop/gremlin/driver/RequestSubmitter.java | 61 ++ .../gremlin/driver/RequestSubmitterAsync.java | 67 +++ .../driver/handler/HttpGremlinRequestEncoder.java | 7 + .../driver/remote/DriverRemoteConnection.java | 15 +- .../driver/remote/HttpRemoteTransaction.java | 317 +++++++++++ .../driver/remote/TransactionRemoteConnection.java | 106 ++++ .../gremlin/server/GremlinDriverIntegrateTest.java | 4 - .../GremlinDriverTransactionIntegrateTest.java | 617 +++++++++++++++++++++ .../gremlin/server/HttpDriverIntegrateTest.java | 16 - 15 files changed, 1369 insertions(+), 73 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index fe123a3fae..5624866604 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -28,6 +28,7 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima * Added `__contains__` and `keys()` to `Element` in `gremlin-python`. * Added `subgraph()` support for `gremlin-python` so that results are stored in a detached `Graph` object. * Added support for remote transactions to the `gremlin-server` through `TransactionManager` and `UnmanagedTransaction`. +* Added support for transactions to `gremlin-driver` using the new `HttpRemoteTransaction`. * Modified grammar to make `discard()` usage more consistent as a filter step where it can now be used to chain additional traversal steps and be used anonymously. * Removed `Meta` field from `ResponseResult` struct in `gremlin-go`. * Removed deprecated elements of the Java-based process testing suite: `ProcessStandardSuite`, `ProcessComputerSuite`, `ProcessLimitedSuite` and associated tests. diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/dsl/graph/GraphTraversalSource.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/dsl/graph/GraphTraversalSource.java index b1d417ee02..684d9e8609 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/dsl/graph/GraphTraversalSource.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/dsl/graph/GraphTraversalSource.java @@ -703,14 +703,19 @@ public class GraphTraversalSource implements TraversalSource { /** * Proxies calls through to the underlying {@link Graph#tx()} or to the {@link RemoteConnection#tx()}. + * <p> + * When a remote connection is present, this method delegates to the connection's + * {@link RemoteConnection#tx()} method, which returns an appropriate transaction + * implementation for the remote connection type (e.g., {@code HttpRemoteTransaction} + * for HTTP-based connections). + * + * @return A {@link Transaction} for managing transactional operations */ public Transaction tx() { if (null == this.connection) return this.graph.tx(); - else { - throw new UnsupportedOperationException("TinkerPop 4 does not yet support remote transactions"); - } - + else + return this.connection.tx(); } /** diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java index 95af4270f5..26c24f2da2 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java @@ -53,7 +53,7 @@ import java.util.stream.Collectors; * * @author Stephen Mallette (http://stephen.genoprime.com) */ -public abstract class Client { +public abstract class Client implements RequestSubmitter, RequestSubmitterAsync { private static final Logger logger = LoggerFactory.getLogger(Client.class); public static final String TOO_MANY_IN_FLIGHT_REQUESTS = "Number of active requests (%s) exceeds pool size (%s). " + @@ -239,6 +239,7 @@ public abstract class Client { options.getLanguage().ifPresent(lang -> request.addLanguage(lang)); options.getMaterializeProperties().ifPresent(mp -> request.addMaterializeProperties(mp)); options.getBulkResults().ifPresent(bulked -> request.addBulkResults(Boolean.parseBoolean(bulked))); + options.getTransactionId().ifPresent(transactionId -> request.addTransactionId(transactionId)); return submitAsync(request.create()); } @@ -286,6 +287,13 @@ public abstract class Client { return cluster; } + protected Host chooseRandomHost() { + cluster.init(); + final List<Host> hosts = new ArrayList<>(cluster.allHosts()); + final int ix = random.nextInt(hosts.size()); + return hosts.get(ix); + } + /** * A {@code Client} implementation. Requests are sent to multiple servers given a {@link LoadBalancingStrategy}. * Transactions are automatically committed (or rolled-back on error) after each request. @@ -358,12 +366,6 @@ public abstract class Client { return pool.borrowConnection(cluster.connectionPoolSettings().maxWaitForConnection, TimeUnit.MILLISECONDS); } - private Host chooseRandomHost() { - final List<Host> hosts = new ArrayList<>(cluster.allHosts()); - final int ix = random.nextInt(hosts.size()); - return hosts.get(ix); - } - /** * Initializes the connection pools on all hosts. */ @@ -561,5 +563,58 @@ public abstract class Client { if (close.isDone()) throw new IllegalStateException("Client is closed"); return new AliasClusteredClient(client, graphOrTraversalSource); } + + } + + /** + * A {@link Client} that pins all requests to a single {@link Host}. Used internally by transactions + * to ensure all requests within a transaction go to the same server. + * <p> + * This client is not intended to be used directly — obtain a {@link org.apache.tinkerpop.gremlin.structure.Transaction} + * via {@link Cluster#transact()} or {@link Cluster#transact(String)} instead. + */ + public static class PinnedClient extends Client { + private final ClusteredClient clusteredClient; + private final Host pinnedHost; + private final AtomicReference<CompletableFuture<Void>> closing = new AtomicReference<>(null); + + PinnedClient(final Cluster cluster) { + super(cluster); + this.pinnedHost = chooseRandomHost(); + this.clusteredClient = cluster.connect(); + } + + public Host getPinnedHost() { + return pinnedHost; + } + + @Override + protected void initializeImplementation() { + this.clusteredClient.init(); + initialized = true; + } + + @Override + protected Connection chooseConnection(final RequestMessage msg) throws TimeoutException, ConnectionException { + final ConnectionPool pool = clusteredClient.hostConnectionPools.get(pinnedHost); + if (pool == null) throw new NoHostAvailableException(); + return pool.borrowConnection(cluster.connectionPoolSettings().maxWaitForConnection, TimeUnit.MILLISECONDS); + } + + @Override + public boolean isClosing() { + return closing.get() != null; + } + + /** + * Marks this client as closed. The underlying pool is owned by {@link ClusteredClient} and is not closed here. + */ + @Override + public synchronized CompletableFuture<Void> closeAsync() { + if (closing.get() != null) return closing.get(); + + closing.set(clusteredClient.closeAsync()); + return closing.get(); + } } } diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java index e080da8d51..cac193814c 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java @@ -31,7 +31,10 @@ import org.apache.commons.configuration2.Configuration; import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.apache.commons.lang3.tuple.Pair; import org.apache.tinkerpop.gremlin.driver.auth.Auth; +import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException; import org.apache.tinkerpop.gremlin.driver.interceptor.PayloadSerializingInterceptor; +import org.apache.tinkerpop.gremlin.driver.remote.HttpRemoteTransaction; +import org.apache.tinkerpop.gremlin.structure.Transaction; import org.apache.tinkerpop.gremlin.util.MessageSerializer; import org.apache.tinkerpop.gremlin.util.message.RequestMessage; import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV4; @@ -63,6 +66,7 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -94,49 +98,31 @@ public final class Cluster { } /** - * Creates a SessionedClient instance to this {@code Cluster}, meaning requests will be routed to - * a single server (randomly selected from the cluster), where the same bindings will be available on each request. - * Requests are bound to the same thread on the server and thus transactions may extend beyond the bounds of a - * single request. The transactions are managed by the user and must be committed or rolled-back manually. - * <p/> - * Note that calling this method does not imply that a connection is made to the server itself at this point. - * Therefore, if there is only one server specified in the {@code Cluster} and that server is not available an - * error will not be raised at this point. Connections get initialized in the {@link Client} when a request is - * submitted or can be directly initialized via {@link Client#init()}. - * - * @param sessionId user supplied id for the session which should be unique (a UUID is ideal). + * Creates a new {@link Client} based on the settings provided. */ - public <T extends Client> T connect(final String sessionId) { - throw new UnsupportedOperationException("not implemented"); + public <T extends Client> T connect() { + final Client client = new Client.ClusteredClient(this); + manager.trackClient(client); + return (T) client; } /** - * Creates a SessionedClient instance to this {@code Cluster}, meaning requests will be routed to - * a single server (randomly selected from the cluster), where the same bindings will be available on each request. - * Requests are bound to the same thread on the server and thus transactions may extend beyond the bounds of a - * single request. If {@code manageTransactions} is set to {@code false} then transactions are managed by the - * user and must be committed or rolled-back manually. When set to {@code true} the transaction is committed or - * rolled-back at the end of each request. - * <p/> - * Note that calling this method does not imply that a connection is made to the server itself at this point. - * Therefore, if there is only one server specified in the {@code Cluster} and that server is not available an - * error will not be raised at this point. Connections get initialized in the {@link Client} when a request is - * submitted or can be directly initialized via {@link Client#init()}. - * - * @param sessionId user supplied id for the session which should be unique (a UUID is ideal). - * @param manageTransactions enables auto-transactions when set to true + * Creates a new {@link Transaction} using the server's default traversal source. + * The server will bind to "g" by default when no traversal source is specified. */ - public <T extends Client> T connect(final String sessionId, final boolean manageTransactions) { - throw new UnsupportedOperationException("not implemented"); + public RemoteTransaction transact() { + return transact(null); } /** - * Creates a new {@link Client} based on the settings provided. + * Creates a new {@link Transaction} bound to the specified graph or traversal source. + * + * @param graphOrTraversalSource the graph/traversal source alias, or null to use the server default */ - public <T extends Client> T connect() { - final Client client = new Client.ClusteredClient(this); - manager.trackClient(client); - return (T) client; + public RemoteTransaction transact(final String graphOrTraversalSource) { + final Client.PinnedClient pinnedClient = new Client.PinnedClient(this); + manager.trackClient(pinnedClient); + return new HttpRemoteTransaction(pinnedClient, graphOrTraversalSource); } @Override @@ -365,6 +351,14 @@ public final class Cluster { return Collections.unmodifiableCollection(manager.allHosts()); } + public void trackTransaction(final HttpRemoteTransaction tx) { + manager.trackTransaction(tx); + } + + public void untrackTransaction(final HttpRemoteTransaction tx) { + manager.untrackTransaction(tx); + } + Factory getFactory() { return manager.factory; } @@ -936,6 +930,7 @@ public final class Cluster { class Manager { private final ConcurrentMap<InetSocketAddress, Host> hosts = new ConcurrentHashMap<>(); + private final Set<HttpRemoteTransaction> openTransactions = ConcurrentHashMap.newKeySet(); private boolean initialized; private final List<InetSocketAddress> contactPoints; private final Factory factory; @@ -1081,6 +1076,14 @@ public final class Cluster { openedClients.add(new WeakReference<>(client)); } + void trackTransaction(final HttpRemoteTransaction tx) { + openTransactions.add(tx); + } + + void untrackTransaction(final HttpRemoteTransaction tx) { + openTransactions.remove(tx); + } + public Host add(final InetSocketAddress address) { final Host newHost = new Host(address, Cluster.this); final Host previous = hosts.putIfAbsent(address, newHost); @@ -1096,6 +1099,17 @@ public final class Cluster { if (closeFuture.get() != null) return closeFuture.get(); + // best-effort rollback of any open transactions before closing + // snapshot to avoid concurrent modification since rollback() calls untrackTransaction() + new ArrayList<>(openTransactions).forEach(tx -> { + try { + tx.rollback(); + } catch (Exception e) { + logger.warn("Failed to rollback transaction on cluster close", e); + } + }); + openTransactions.clear(); + final List<CompletableFuture<Void>> clientCloseFutures = new ArrayList<>(openedClients.size()); for (WeakReference<Client> openedClient : openedClients) { final Client client = openedClient.get(); diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RemoteTransaction.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RemoteTransaction.java new file mode 100644 index 0000000000..d4505f6d8c --- /dev/null +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RemoteTransaction.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.driver; + +import org.apache.tinkerpop.gremlin.structure.Transaction; + +/** + * A {@link Transaction} interface for remote connections that combine lifecycle management and synchronous submission. + * + * Note: Implementations of this interface are generally <b>NOT</b> thread-safe. + */ +public interface RemoteTransaction extends Transaction, RequestSubmitter { + /** + * Returns the server-generated transaction ID, or {@code null} if the transaction has not yet been started via + * {@link #begin(Class)}. + * + * @return the transaction ID, or null if not yet begun + */ + String getTransactionId(); +} diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestOptions.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestOptions.java index a43e186164..c51f2b8516 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestOptions.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestOptions.java @@ -50,6 +50,7 @@ public final class RequestOptions { private final String language; private final String materializeProperties; private final String bulkResults; + private final String transactionId; private RequestOptions(final Builder builder) { this.graphOrTraversalSource = builder.graphOrTraversalSource; @@ -59,6 +60,7 @@ public final class RequestOptions { this.language = builder.language; this.materializeProperties = builder.materializeProperties; this.bulkResults = builder.bulkResults; + this.transactionId = builder.transactionId; } public Optional<String> getG() { @@ -85,6 +87,8 @@ public final class RequestOptions { public Optional<String> getBulkResults() { return Optional.ofNullable(bulkResults); } + public Optional<String> getTransactionId() { return Optional.ofNullable(transactionId); } + public static Builder build() { return new Builder(); } @@ -125,6 +129,25 @@ public final class RequestOptions { private String materializeProperties = null; private String language = null; private String bulkResults = null; + private String transactionId = null; + + /** + * Creates a {@link Builder} populated with the values from the provided {@link RequestOptions}. + * @param options the options to copy from + * @return a {@link Builder} with the copied options + */ + public static Builder from(final RequestOptions options) { + final Builder builder = build(); + builder.graphOrTraversalSource = options.graphOrTraversalSource; + builder.parameters = options.parameters; + builder.batchSize = options.batchSize; + builder.timeout = options.timeout; + builder.materializeProperties = options.materializeProperties; + builder.language = options.language; + builder.bulkResults = options.bulkResults; + builder.transactionId = options.transactionId; + return builder; + } /** * The aliases to set on the request. @@ -196,6 +219,14 @@ public final class RequestOptions { return this; } + /** + * Sets the transactionId value to be sent on the request. + */ + public Builder transactionId(final String id) { + this.transactionId = id; + return this; + } + public RequestOptions create() { return new RequestOptions(this); } diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestSubmitter.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestSubmitter.java new file mode 100644 index 0000000000..3afc8e099d --- /dev/null +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestSubmitter.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.driver; + +import java.util.Map; + +/** + * Defines the synchronous request submission contract for Gremlin requests. + * <p> + * This interface is implemented by both {@link Client} and transaction classes to ensure a consistent API for + * submitting Gremlin scripts. The synchronous nature of these methods means they block until the request completes. + * <p> + * For asynchronous submission, see {@link RequestSubmitterAsync}. + */ +public interface RequestSubmitter { + + /** + * Submits a Gremlin script and blocks until the response is received. + * + * @param gremlin the Gremlin script to execute + * @return the results of the script execution + */ + ResultSet submit(String gremlin); + + /** + * Submits a Gremlin script with bound parameters and blocks until the response is received. + * <p> + * Prefer this method over string concatenation when executing scripts with variable + * arguments, as parameterized scripts perform better. + * + * @param gremlin the Gremlin script to execute + * @param parameters a map of parameters that will be bound to the script on execution + * @return the results of the script execution + */ + ResultSet submit(String gremlin, Map<String, Object> parameters); + + /** + * Submits a Gremlin script with request options and blocks until the response is received. + * + * @param gremlin the Gremlin script to execute + * @param options the options to supply for this request + * @return the results of the script execution + */ + ResultSet submit(String gremlin, RequestOptions options); +} diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestSubmitterAsync.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestSubmitterAsync.java new file mode 100644 index 0000000000..c94ed5bb72 --- /dev/null +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestSubmitterAsync.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.driver; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * Defines the asynchronous request submission contract for Gremlin requests. + * <p> + * This interface is implemented by {@link Client} to provide non-blocking request submission. The returned + * {@link CompletableFuture} completes when the write of the request is complete, not when the response is received. + * <p> + * Note: Transaction classes intentionally do not implement this interface because transactional operations require + * sequential execution to maintain ordering guarantees over HTTP. + * <p> + * For synchronous submission, see {@link RequestSubmitter}. + */ +public interface RequestSubmitterAsync { + + /** + * Submits a Gremlin script asynchronously. + * <p> + * The returned future completes when the write of the request is complete. + * + * @param gremlin the Gremlin script to execute + * @return a future that completes with the results when the request write is complete + */ + CompletableFuture<ResultSet> submitAsync(String gremlin); + + /** + * Submits a Gremlin script with bound parameters asynchronously. + * <p> + * Prefer this method over string concatenation when executing scripts with variable + * arguments, as parameterized scripts perform better. + * + * @param gremlin the Gremlin script to execute + * @param parameters a map of parameters that will be bound to the script on execution + * @return a future that completes with the results when the request write is complete + */ + CompletableFuture<ResultSet> submitAsync(String gremlin, Map<String, Object> parameters); + + /** + * Submits a Gremlin script with request options asynchronously. + * + * @param gremlin the Gremlin script to execute + * @param options the options to supply for this request + * @return a future that completes with the results when the request write is complete + */ + CompletableFuture<ResultSet> submitAsync(String gremlin, RequestOptions options); +} diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java index c94d74264d..0f8ede4c7f 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java @@ -91,6 +91,13 @@ public final class HttpGremlinRequestEncoder extends MessageToMessageEncoder<Req if (bulkResults) { headersMap.put(Tokens.BULK_RESULTS, "true"); } + + // Add X-Transaction-Id header to comply with specification's dual transmission (header and body) + final String transactionId = requestMessage.getField(Tokens.ARGS_TRANSACTION_ID); + if (transactionId != null) { + headersMap.put(Tokens.Headers.TRANSACTION_ID, transactionId); + } + HttpRequest gremlinRequest = new HttpRequest(headersMap, requestMessage, uri); for (final Pair<String, ? extends RequestInterceptor> interceptor : interceptors) { diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java index c8d56814c4..c909e48604 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java @@ -248,15 +248,14 @@ public class DriverRemoteConnection implements RemoteConnection { } /** - * Constructs a new {@link DriverRemoteTransaction}. Not yet supported in TinkerPop 4. + * Creates a new {@link HttpRemoteTransaction} for executing transactional operations. + * + * @return A new {@link HttpRemoteTransaction} */ -// @Override -// public Transaction tx() { -// // todo: not implemented -// final DriverRemoteConnection session = new DriverRemoteConnection( -// client.getCluster().connect(), remoteTraversalSourceName, true); -// return new DriverRemoteTransaction(session); -// } + @Override + public Transaction tx() { + return client.getCluster().transact(remoteTraversalSourceName); + } @Override public String toString() { diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/HttpRemoteTransaction.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/HttpRemoteTransaction.java new file mode 100644 index 0000000000..a801fd197a --- /dev/null +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/HttpRemoteTransaction.java @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.driver.remote; + +import org.apache.tinkerpop.gremlin.driver.Client; +import org.apache.tinkerpop.gremlin.driver.Cluster; +import org.apache.tinkerpop.gremlin.driver.Host; +import org.apache.tinkerpop.gremlin.driver.RemoteTransaction; +import org.apache.tinkerpop.gremlin.driver.RequestOptions; +import org.apache.tinkerpop.gremlin.driver.Result; +import org.apache.tinkerpop.gremlin.driver.ResultSet; +import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException; +import org.apache.tinkerpop.gremlin.process.remote.RemoteConnection; +import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource; +import org.apache.tinkerpop.gremlin.structure.Transaction; +import org.apache.tinkerpop.gremlin.structure.util.TransactionException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +/** + * A {@link Transaction} implementation for HTTP-based remote connections. + * <p> + * This class provides synchronous, sequential request execution within a transaction context. + * All requests are pinned to a single host and include the transaction ID (after begin()). + * <p> + * Key characteristics: + * <ul> + * <li>Synchronous API only - no submitAsync() methods</li> + * <li>Host pinning - all requests go to the same server</li> + * <li>Sequential execution - requests block until complete</li> + * </ul> + * <p> + * Usage: + * <pre> + * Transaction tx = cluster.transact("g"); + * GraphTraversalSource gtx = tx.begin(); + * gtx.addV("person").property("name", "alice").iterate(); + * tx.commit(); + * </pre> + * + * This class is <b>NOT</b> thread-safe. + */ +public class HttpRemoteTransaction implements RemoteTransaction { + private static final Logger logger = LoggerFactory.getLogger(HttpRemoteTransaction.class); + private static final long CLOSING_MAX_WAIT_MS = 10000; + + protected Consumer<Transaction> closeConsumer = CLOSE_BEHAVIOR.COMMIT; + private final Client.PinnedClient pinnedClient; + private final Cluster cluster; + private final Host pinnedHost; + private final String graphAlias; + private String transactionId; // null until begin(), set from server response + private TransactionState state = TransactionState.NOT_STARTED; + + private enum TransactionState { + NOT_STARTED, OPEN, CLOSED + } + + /** + * Creates a new HTTP transaction. + * <p> + * The transaction is not started until {@link #begin(Class)} is called. + * A host is selected at creation time and all requests will be pinned to it. + * + * @param pinnedClient the underlying client for connection access + * @param graphAlias the graph/traversal source alias (e.g., "g") + * @throws NoHostAvailableException if no hosts are available in the cluster + */ + public HttpRemoteTransaction(final Client.PinnedClient pinnedClient, final String graphAlias) { + this.pinnedClient = pinnedClient; + this.graphAlias = graphAlias; + this.pinnedHost = pinnedClient.getPinnedHost(); + this.cluster = pinnedClient.getCluster(); + } + + /** + * Not supported for remote transactions. Use {@link #begin(Class)} instead. + * + * @throws UnsupportedOperationException always + */ + @Override + public void open() { + begin(); + } + + /** + * Starts a transaction and returns a traversal source bound to it. + * <p> + * This method sends {@code g.tx().begin()} to the server, which returns + * the transaction ID. All subsequent requests will include this ID. + * + * @param traversalSourceClass the class of the traversal source to create + * @param <T> the type of the traversal source + * @return a new traversal source bound to this transaction + * @throws IllegalStateException if the transaction is already started + * @throws RuntimeException if the transaction fails to begin + */ + @Override + public <T extends TraversalSource> T begin(final Class<T> traversalSourceClass) { + if (state != TransactionState.NOT_STARTED) { + throw new IllegalStateException("Transaction already started"); + } + cluster.trackTransaction(this); + + try { + // Send begin - no txId attached yet + final ResultSet rs = submitInternal("g.tx().begin()"); + + // Server returns the transaction ID + this.transactionId = extractTransactionId(rs); + this.state = TransactionState.OPEN; + } catch (Exception e) { + cleanUp(); + throw new RuntimeException("Failed to begin transaction: " + e.getMessage(), e); + } + + // Create RemoteConnection for the traversal source + final TransactionRemoteConnection txConnection = new TransactionRemoteConnection(this); + + try { + return traversalSourceClass.getConstructor(RemoteConnection.class).newInstance(txConnection); + } catch (Exception e) { + rollback(); + throw new IllegalStateException("Failed to create TraversalSource", e); + } + } + + /** + * Extracts the transaction ID from the begin() response. + * <p> + * The server returns the transaction ID as part of the response to g.tx().begin(). + * + * @param rs the result set from the begin request + * @return the transaction ID + */ + private String extractTransactionId(final ResultSet rs) { + // Wait for all results and extract the transaction ID + final List<Result> results = rs.all().join(); + if (results.isEmpty()) { + throw new IllegalStateException("Server did not return transaction ID"); + } + try { + final Object id = results.get(0).get(Map.class).get("transactionId"); + if (id == null) throw new IllegalStateException("Server did not return transaction ID"); + + final String idStr = id.toString(); + if (idStr.isBlank()) throw new IllegalStateException("Server returned empty transaction ID"); + + return idStr; + } catch (Exception e) { + throw new IllegalStateException("Server did not return transaction ID"); + } + } + + /** + * Commits the transaction. + * <p> + * Sends {@code g.tx().commit()} to the server and closes the transaction. + * + * @throws IllegalStateException if the transaction is not open + * @throws RuntimeException if the commit fails + */ + @Override + public void commit() { + closeRemoteTransaction("g.tx().commit()"); + } + + /** + * Rolls back the transaction. + * <p> + * Sends {@code g.tx().rollback()} to the server and closes the transaction. + * This is best-effort - errors are logged but not thrown. + */ + @Override + public void rollback() { + closeRemoteTransaction("g.tx().rollback()"); + } + + private void closeRemoteTransaction(final String closeScript) { + if (state != TransactionState.OPEN) throw new IllegalStateException("Transaction is not open"); + + try { + submitInternal(closeScript).all().get(CLOSING_MAX_WAIT_MS, TimeUnit.MILLISECONDS); + cleanUp(); + } catch (Exception e) { + logger.warn("Failed to {} transaction on {}", closeScript, pinnedHost); + throw new TransactionException("Failed to " + closeScript, e); + } + } + + private void cleanUp() { + state = TransactionState.CLOSED; + cluster.untrackTransaction(this); + } + + /** + * Returns the server-generated transaction ID, or {@code null} if the transaction + * has not yet been started via {@link #begin(Class)}. + * + * @return the transaction ID, or null if not yet begun + */ + public String getTransactionId() { + return transactionId; + } + + @Override + public boolean isOpen() { + return state == TransactionState.OPEN; + } + + @Override + public void readWrite() { + throw new UnsupportedOperationException("Remote transaction behaviors are not configurable - they are always manually controlled"); + } + + @Override + public void close() { + closeConsumer.accept(this); + + // this is just for safety in case of custom closeConsumer but should normally be handled by commit/rollback + cleanUp(); + } + + @Override + public Transaction onReadWrite(final Consumer<Transaction> consumer) { + throw new UnsupportedOperationException("Remote transaction behaviors are not configurable - they are always manually controlled"); + } + + @Override + public Transaction onClose(final Consumer<Transaction> consumer) { + this.closeConsumer = consumer; + return this; + } + + @Override + public void addTransactionListener(final Consumer<Status> listener) { + throw new UnsupportedOperationException("Remote transactions cannot have listeners attached"); + } + + @Override + public void removeTransactionListener(final Consumer<Status> listener) { + throw new UnsupportedOperationException("Remote transactions cannot have listeners attached"); + } + + @Override + public void clearTransactionListeners() { + throw new UnsupportedOperationException("Remote transactions cannot have listeners attached"); + } + + @Override + public ResultSet submit(final String gremlin) { + return submit(gremlin, RequestOptions.EMPTY); + } + + @Override + public ResultSet submit(final String gremlin, final Map<String, Object> parameters) { + final RequestOptions.Builder builder = RequestOptions.build(); + if (parameters != null && !parameters.isEmpty()) { + parameters.forEach(builder::addParameter); + } + return submit(gremlin, builder.create()); + } + + @Override + public ResultSet submit(final String gremlin, final RequestOptions options) { + if (state != TransactionState.OPEN) { + throw new IllegalStateException("Transaction is not open"); + } + return submitInternal(gremlin, options); + } + + private ResultSet submitInternal(final String gremlin) { + return submitInternal(gremlin, RequestOptions.EMPTY); + } + + // synchronized here is a bit defensive but ensures that even if a user accidentally uses this in different threads, + // the server will still receive the requests in the correct order + private synchronized ResultSet submitInternal(final String gremlin, final RequestOptions options) { + final RequestOptions.Builder builder = RequestOptions.Builder.from(options); + if (graphAlias != null) { + // Don't allow per-request override of "g" as transactions should only target a single Graph instance. + builder.addG(graphAlias); + } + + // Attach txId if we have one (not present for begin()) + if (transactionId != null) { + builder.transactionId(transactionId); + } + + try { + return pinnedClient.submit(gremlin, builder.create()); + } catch (Exception e) { + throw new RuntimeException("Transaction request failed: " + e.getMessage(), e); + } + } +} diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/TransactionRemoteConnection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/TransactionRemoteConnection.java new file mode 100644 index 0000000000..6ea9ce5e28 --- /dev/null +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/TransactionRemoteConnection.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.driver.remote; + +import org.apache.tinkerpop.gremlin.driver.RequestOptions; +import org.apache.tinkerpop.gremlin.driver.ResultSet; +import org.apache.tinkerpop.gremlin.process.remote.RemoteConnection; +import org.apache.tinkerpop.gremlin.process.remote.RemoteConnectionException; +import org.apache.tinkerpop.gremlin.process.remote.traversal.RemoteTraversal; +import org.apache.tinkerpop.gremlin.process.traversal.GremlinLang; +import org.apache.tinkerpop.gremlin.structure.Transaction; + +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +import static org.apache.tinkerpop.gremlin.driver.RequestOptions.getRequestOptions; + +/** + * A {@link RemoteConnection} that routes all submissions through an {@link HttpRemoteTransaction}. + * <p> + * This connection adapts the synchronous transaction API to the async {@link RemoteConnection} + * interface required by {@link org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource}. + * <p> + */ +class TransactionRemoteConnection implements RemoteConnection { + + private final HttpRemoteTransaction transaction; + + /** + * Creates a new connection bound to the specified transaction. + * + * @param transaction the transaction that owns this connection + */ + TransactionRemoteConnection(final HttpRemoteTransaction transaction) { + this.transaction = transaction; + } + + /** + * Submits a traversal through the transaction. + * <p> + * The submission is synchronous internally but returns a completed future + * to satisfy the {@link RemoteConnection} interface. + * + * @param gremlinLang the traversal to submit + * @return a completed future with the traversal results + * @throws RemoteConnectionException if the transaction is not open or submission fails + */ + @Override + public <E> CompletableFuture<RemoteTraversal<?, E>> submitAsync(final GremlinLang gremlinLang) + throws RemoteConnectionException { + if (!transaction.isOpen()) { + throw new RemoteConnectionException("Transaction is not open"); + } + + try { + // Synchronous submission through transaction + final ResultSet rs = transaction.submit(gremlinLang.getGremlin(), getRequestOptions(gremlinLang)); + + final RemoteTraversal<?, E> traversal = new DriverRemoteTraversal<>(rs, + null, // client not needed for iteration + false, // attachElements + Optional.empty()); + + return CompletableFuture.completedFuture(traversal); + } catch (Exception e) { + throw new RemoteConnectionException(e); + } + } + + /** + * Returns the owning transaction. + * + * @return the transaction that owns this connection + */ + @Override + public Transaction tx() { + return transaction; + } + + /** + * No-op close implementation. + * <p> + * The transaction manages its own lifecycle - users should call + * {@link Transaction#commit()} or {@link Transaction#rollback()} explicitly. + */ + @Override + public void close() { + // Transaction manages its own lifecycle - don't close it here + } +} diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java index 7b051812c7..89ad207135 100644 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java @@ -1094,13 +1094,11 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration public void shouldCloseAllClientsOnCloseOfCluster() throws Exception { final Cluster cluster = TestClientFactory.open(); final Client sessionlessOne = cluster.connect(); - final Client session = cluster.connect("session"); final Client sessionlessTwo = cluster.connect(); final Client sessionlessThree = cluster.connect(); final Client sessionlessFour = cluster.connect(); assertEquals(2, sessionlessOne.submit("g.inject(2)").all().get().get(0).getInt()); - assertEquals(2, session.submit("g.inject(2)").all().get().get(0).getInt()); assertEquals(2, sessionlessTwo.submit("g.inject(2)").all().get().get(0).getInt()); assertEquals(2, sessionlessThree.submit("g.inject(2)").all().get().get(0).getInt()); // dont' send anything on the 4th client @@ -1119,7 +1117,6 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration } try { - session.submit("g.inject(2)").all().get(); fail("Should have tossed an exception because cluster was closed"); } catch (Exception ex) { final Throwable root = ExceptionHelper.getRootCause(ex); @@ -1156,7 +1153,6 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration // allow call to close() even though closed through cluster sessionlessOne.close(); - session.close(); sessionlessTwo.close(); cluster.close(); diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverTransactionIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverTransactionIntegrateTest.java new file mode 100644 index 0000000000..2198db5525 --- /dev/null +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverTransactionIntegrateTest.java @@ -0,0 +1,617 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.server; + +import org.apache.tinkerpop.gremlin.driver.Client; +import org.apache.tinkerpop.gremlin.driver.Cluster; +import org.apache.tinkerpop.gremlin.driver.RemoteTransaction; +import org.apache.tinkerpop.gremlin.driver.RequestOptions; +import org.apache.tinkerpop.gremlin.driver.Result; +import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection; +import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; +import org.apache.tinkerpop.gremlin.server.channel.HttpChannelizer; +import org.apache.tinkerpop.gremlin.structure.Transaction; +import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.apache.tinkerpop.gremlin.util.ExceptionHelper; +import org.apache.tinkerpop.gremlin.util.Tokens; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +import static org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource.traversal; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.StringContains.containsString; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Integration tests for HTTP remote transactions using the driver API ({@code Cluster.transact()}). + * + * Tests exercise {@link RemoteTransaction} directly via its {@code submit()} methods. + * + * Server-side verification tests (raw HTTP) are in {@link GremlinServerHttpTransactionIntegrateTest}. + */ +public class GremlinDriverTransactionIntegrateTest extends AbstractGremlinServerIntegrationTest { + private static final String GTX = "gtx"; + private static final int MAX_GET_WAIT = 5000; + + private Cluster cluster; + + @Before + public void openCluster() { + cluster = TestClientFactory.open(); + } + + @After + public void closeCluster() throws Exception { + if (cluster != null) cluster.close(); + } + + @Override + public Settings overrideSettings(final Settings settings) { + settings.channelizer = HttpChannelizer.class.getName(); + final String nameOfTest = name.getMethodName(); + switch (nameOfTest) { + case "shouldEnforceMaxConcurrentTransactions": + settings.maxConcurrentTransactions = 1; + break; + case "shouldTimeoutIdleTransaction": + case "shouldTimeoutIdleTransactionWithNoOperations": + case "shouldRejectLateCommitAfterTimeout": + settings.transactionTimeout = 1000; + break; + case "shouldTimeoutOnlyIdleTransactionNotActiveOne": + settings.transactionTimeout = 2000; + break; + } + return settings; + } + + /** + * Begin a transaction, add a vertex, verify isolation, commit, verify isOpen transitions, and verify data persists + * after commit. + */ + @Test + public void shouldCommitTransaction() throws Exception { + final Client client = cluster.connect().alias(GTX); + + final RemoteTransaction tx = cluster.transact(GTX); + tx.begin(); + // #4: isOpen true after begin + assertTrue(tx.isOpen()); + + tx.submit("g.addV('person').property('name','alice')"); + + // #6: uncommitted data not visible outside the transaction + assertEquals(0L, client.submit("g.V().hasLabel('person').count()").one().getLong()); + + tx.commit(); + // #4: isOpen false after commit + assertFalse(tx.isOpen()); + // #1, #7: committed data visible to non-transactional reads + assertEquals(1L, client.submit("g.V().hasLabel('person').count()").one().getLong()); + + client.close(); + } + + @Test + public void shouldRollbackTransaction() throws Exception { + final Client client = cluster.connect().alias(GTX); + + final RemoteTransaction tx = cluster.transact(GTX); + tx.begin(); + assertTrue(tx.isOpen()); + + tx.submit("g.addV('person').property('name','bob')"); + + tx.rollback(); + // #5: isOpen false after rollback + assertFalse(tx.isOpen()); + // #2: data discarded after rollback + assertEquals(0L, client.submit("g.V().hasLabel('person').count()").one().getLong()); + + client.close(); + } + + @Test + public void shouldSupportIntraTransactionConsistency() throws Exception { + final Client client = cluster.connect().alias(GTX); + + final RemoteTransaction tx = cluster.transact(GTX); + tx.begin(); + tx.submit("g.addV('test').property('name','A')"); + // #8: read-your-own-writes — vertex A visible within the transaction + assertEquals(1L, tx.submit("g.V().hasLabel('test').count()").all().get().get(0).getLong()); + + tx.submit("g.addV('test').property('name','B')"); + tx.submit("g.V().has('name','A').addE('knows').to(V().has('name','B'))"); + + // verify the full subgraph is visible within the transaction before commit + assertEquals(2L, tx.submit("g.V().hasLabel('test').count()").all().get().get(0).getLong()); + assertEquals(1L, tx.submit("g.V().outE('knows').count()").all().get().get(0).getLong()); + + tx.commit(); + + // #3: vertices and edges persist after commit + assertEquals(2L, client.submit("g.V().hasLabel('test').count()").all().get().get(0).getLong()); + assertEquals(1L, client.submit("g.E().hasLabel('knows').count()").all().get().get(0).getLong()); + } + + @Test + public void shouldThrowOnSubmitAfterCommit() throws Exception { + final RemoteTransaction tx = cluster.transact(GTX); + tx.begin(); + + tx.submit("g.addV()"); + tx.commit(); + + try { + tx.submit("g.V().count()"); + fail("Expected exception on submit after commit"); + } catch (IllegalStateException ex) { + assertThat(ex.getMessage(), containsString("Transaction is not open")); + } + } + + @Test + public void shouldThrowOnSubmitAfterRollback() throws Exception { + final RemoteTransaction tx = cluster.transact(GTX); + tx.begin(); + + tx.submit("g.addV()"); + tx.rollback(); + + try { + tx.submit("g.V().count()"); + fail("Expected exception on submit after rollback"); + } catch (IllegalStateException ex) { + assertThat(ex.getMessage(), containsString("Transaction is not open")); + } + } + + @Test + public void shouldThrowOnDoubleBegin() throws Exception { + final RemoteTransaction tx = cluster.transact(GTX); + tx.begin(); + + try { + tx.begin(); + fail("Expected IllegalStateException on second begin()"); + } catch (IllegalStateException ex) { + assertThat(ex.getMessage(), containsString("Transaction already started")); + } + } + + @Test + public void shouldThrowOnCommitWhenNotOpen() throws Exception { + final RemoteTransaction tx = cluster.transact(GTX); + assertFalse(tx.isOpen()); + + try { + tx.commit(); + fail("Expected IllegalStateException on commit when not open"); + } catch (IllegalStateException ex) { + assertThat(ex.getMessage(), containsString("Transaction is not open")); + } + } + + @Test + public void shouldThrowOnRollbackWhenNotOpen() throws Exception { + final RemoteTransaction tx = cluster.transact(GTX); + assertFalse(tx.isOpen()); + + try { + tx.rollback(); + fail("Expected IllegalStateException on rollback when not open"); + } catch (IllegalStateException ex) { + assertThat(ex.getMessage(), containsString("Transaction is not open")); + } + } + + @Test + public void shouldReturnNullTransactionIdBeforeBegin() throws Exception { + final RemoteTransaction tx = cluster.transact(GTX); + + // before begin, transactionId should be null + assertNull(tx.getTransactionId()); + + tx.begin(); + // after begin, transactionId should be non-null and non-blank + assertNotNull(tx.getTransactionId()); + assertFalse(tx.getTransactionId().isBlank()); + } + + @Test + public void shouldCommitOnCloseByDefault() throws Exception { + final RemoteTransaction tx1 = cluster.transact(GTX); + tx1.begin(); + tx1.submit("g.addV('person').property('name','close_test')"); + // close() should trigger default COMMIT behavior + tx1.close(); + assertFalse(tx1.isOpen()); + + final RemoteTransaction tx2 = cluster.transact(GTX); + tx2.begin(); + assertEquals(1L, tx2.submit("g.V().hasLabel('person').count()").one().getLong()); + } + + @Test + public void shouldRollbackOnCloseWhenConfigured() throws Exception { + final RemoteTransaction tx1 = cluster.transact(GTX); + tx1.onClose(Transaction.CLOSE_BEHAVIOR.ROLLBACK); + tx1.begin(); + tx1.submit("g.addV('person').property('name','rollback_close_test')"); + tx1.close(); + assertFalse(tx1.isOpen()); + + final RemoteTransaction tx2 = cluster.transact(GTX); + tx2.begin(); + assertEquals(0L, tx2.submit("g.V().hasLabel('person').count()").one().getLong()); + } + + @Test + public void shouldRollbackOpenTransactionsOnClusterClose() throws Exception { + final RemoteTransaction tx1 = cluster.transact(GTX); + tx1.begin(); + tx1.submit("g.addV('cluster-close')"); + tx1.submit("g.addV('cluster-close')"); + + final RemoteTransaction tx2 = cluster.transact(GTX); + tx2.begin(); + tx2.submit("g.addV('cluster-close')"); + + // close cluster without committing — should rollback open transactions + cluster.close(); + cluster = null; + + // reconnect and verify data was not persisted + final Cluster cluster2 = TestClientFactory.open(); + try { + final RemoteTransaction cluster2tx1 = cluster2.transact(GTX); + cluster2tx1.begin(); + assertEquals(0L, cluster2tx1.submit("g.V().hasLabel('person').count()").all().get().get(0).getLong()); + } finally { + cluster2.close(); + } + } + + @Test + public void shouldEnforceMaxConcurrentTransactions() throws Exception { + // first transaction fills the single slot + final RemoteTransaction tx1 = cluster.transact(GTX); + tx1.begin(); + + // second transaction should fail + try { + final RemoteTransaction tx2 = cluster.transact(GTX); + tx2.begin(); + fail("Expected exception when max concurrent transactions exceeded"); + } catch (Exception ex) { + final Throwable root = ExceptionHelper.getRootCause(ex); + assertThat(root.getMessage(), containsString("Maximum concurrent transactions exceeded")); + } + } + + @Test + public void shouldTimeoutIdleTransaction() throws Exception { + final Client client = cluster.connect().alias(GTX); + + final RemoteTransaction tx = cluster.transact(GTX); + tx.begin(); + tx.submit("g.addV('timeout_test')"); + + // wait for the transaction to timeout + Thread.sleep(2000); + + // the transaction was rolled back server-side; attempting to commit should fail + try { + tx.commit(); + fail("Expected exception on commit after server-side timeout"); + } catch (Exception ex) { + final Throwable root = ExceptionHelper.getRootCause(ex); + assertThat(root.getMessage(), containsString("Transaction not found")); + } + + // verify data was not persisted + assertEquals(0L, client.submit("g.V().hasLabel('timeout_test').count()").all().get().get(0).getLong()); + } + + @Test + public void shouldTimeoutIdleTransactionWithNoOperations() throws Exception { + final RemoteTransaction tx = cluster.transact(GTX); + tx.begin(); + + // wait for the transaction to timeout + Thread.sleep(2000); + + // attempting to commit should fail because the server rolled back + try { + tx.commit(); + fail("Expected exception on commit after server-side timeout"); + } catch (Exception ex) { + final Throwable root = ExceptionHelper.getRootCause(ex); + assertThat(root.getMessage(), containsString("Transaction not found")); + } + } + + @Test + public void shouldTimeoutOnlyIdleTransactionNotActiveOne() throws Exception { + final Client client = cluster.connect().alias(GTX); + + final RemoteTransaction txActive = cluster.transact(GTX); + txActive.begin(); + txActive.submit("g.addV('active')"); + + final RemoteTransaction txIdle = cluster.transact(GTX); + txIdle.begin(); + txIdle.submit("g.addV('idle')"); + + // keep the active transaction alive by sending requests at intervals shorter than timeout + for (int i = 0; i < 3; i++) { + Thread.sleep(800); + txActive.submit("g.V().count()"); + } + + // by now the idle transaction should have timed out (2000ms elapsed) + // the active transaction should still be alive + txActive.commit(); + + // verify active transaction's data persisted + assertEquals(1L, client.submit("g.V().hasLabel('active').count()").all().get().get(0).getLong()); + + // idle transaction should have been rolled back by timeout + try { + txIdle.commit(); + fail("Expected exception on commit of timed-out idle transaction"); + } catch (Exception ex) { + final Throwable root = ExceptionHelper.getRootCause(ex); + assertThat(root.getMessage(), containsString("Transaction not found")); + } + + // verify idle transaction's data was not persisted + assertEquals(0L, client.submit("g.V().hasLabel('idle').count()").all().get().get(0).getLong()); + } + + @Test + public void shouldRejectLateCommitAfterTimeout() throws Exception { + final Client client = cluster.connect().alias(GTX); + + final RemoteTransaction tx = cluster.transact(GTX); + tx.begin(); + tx.submit("g.addV('person').property('name','late_commit')"); + + // wait for timeout + Thread.sleep(2000); + + // attempt commit — should fail because server already rolled back + try { + tx.commit(); + fail("Expected exception on late commit after timeout"); + } catch (Exception ex) { + final Throwable root = ExceptionHelper.getRootCause(ex); + assertThat(root.getMessage(), containsString("Transaction not found")); + } + + // verify data was not persisted + assertEquals(0L, client.submit("g.V().hasLabel('person').count()").all().get().get(0).getLong()); + } + + @Test + public void shouldIsolateConcurrentTransactions() throws Exception { + final Client client = cluster.connect().alias(GTX); + + final RemoteTransaction tx1 = cluster.transact(GTX); + tx1.begin(); + final RemoteTransaction tx2 = cluster.transact(GTX); + tx2.begin(); + + tx1.submit("g.addV('tx1')"); + tx2.submit("g.addV('tx2')"); + + // tx1 should not see tx2's data and vice versa + assertEquals(0L, tx1.submit("g.V().hasLabel('tx2').count()").all().get().get(0).getLong()); + assertEquals(0L, tx2.submit("g.V().hasLabel('tx1').count()").all().get().get(0).getLong()); + + tx1.commit(); + tx2.commit(); + + // both should be visible after commit + assertEquals(1L, client.submit("g.V().hasLabel('tx1').count()").all().get().get(0).getLong()); + assertEquals(1L, client.submit("g.V().hasLabel('tx2').count()").all().get().get(0).getLong()); + } + + @Test + public void shouldOpenAndCloseManyTransactionsSequentially() throws Exception { + final Client client = cluster.connect().alias(GTX); + final long numberOfTransactions = 50; + + for (int i = 0; i < numberOfTransactions; i++) { + final RemoteTransaction tx = cluster.transact(GTX); + tx.begin(); + tx.submit("g.addV('stress')"); + tx.commit(); + } + + final long count = client.submit("g.V().hasLabel('stress').count()").all().get().get(0).getLong(); + assertEquals(numberOfTransactions, count); + + Thread.sleep(100); + // this should be 0, but to prevent flakiness, make it a reasonable number less than numberOfTransactions + assertTrue(server.getServerGremlinExecutor().getTransactionManager().getActiveTransactionCount() < 35); + } + + @Test + public void shouldIsolateTransactionalAndNonTransactionalRequests() throws Exception { + final Client client = cluster.connect().alias(GTX); + + final RemoteTransaction tx = cluster.transact(GTX); + tx.begin(); + tx.submit("g.addV('tx_data')"); + + // non-transactional read should not see uncommitted tx data + assertEquals(0L, client.submit("g.V().hasLabel('tx_data').count()").all().get().get(0).getLong()); + + tx.commit(); + + // now the data should be visible + assertEquals(1L, client.submit("g.V().hasLabel('tx_data').count()").all().get().get(0).getLong()); + } + + @Test + public void shouldRejectBeginOnNonTransactionalGraph() throws Exception { + final RemoteTransaction tx = cluster.transact("gclassic"); + try { + tx.begin(); + fail("Expected exception when beginning transaction on non-transactional graph"); + } catch (Exception ex) { + final Throwable root = ExceptionHelper.getRootCause(ex); + assertThat(root.getMessage(), containsString("Graph does not support transactions")); + } + } + + @Test + public void shouldTargetCorrectGraph() throws Exception { + final Client client = cluster.connect(); + + final RemoteTransaction tx = cluster.transact(GTX); + tx.begin(); + + tx.submit("g.addV('routed')"); + tx.commit(); + + // vertex should exist in the transactional graph (gtx) + assertEquals(1L, client.submit("g.V().hasLabel('routed').count()", + RequestOptions.build().addG(GTX).create()).all().get().get(0).getLong()); + + // vertex should NOT exist in the classic graph (gclassic) + assertEquals(0L, client.submit("g.V().hasLabel('routed').count()", + RequestOptions.build().addG("gclassic").create()).all().get().get(0).getLong()); + } + + @Test + public void shouldAutoCommitNonTransactionalWrite() throws Exception { + final Client client = cluster.connect(); + client.submit("g.addV('auto')").all().get(); + assertEquals(1L, client.submit("g.V().hasLabel('auto').count()").one().getLong()); + } + + @Test + public void shouldUseProvidedRequestOptions() throws Exception { + final Client client = cluster.connect().alias(GTX); + final RemoteTransaction tx = cluster.transact(GTX); + tx.begin(); + final RequestOptions ro = RequestOptions.build(). + addParameter("x", "vowels"). + materializeProperties(Tokens.MATERIALIZE_PROPERTIES_TOKENS). + create(); + final Vertex v = tx.submit("g.addV(x).property('a', 'b')", ro). + all(). + get(MAX_GET_WAIT, TimeUnit.MILLISECONDS). + get(0). + getVertex(); + assertFalse(v.properties().hasNext()); + tx.commit(); + + final List<Result> results = + client.submit("g.V().hasLabel('vowels')").all().get(MAX_GET_WAIT, TimeUnit.MILLISECONDS); + assertEquals(1L, results.size()); + } + + @Test + public void shouldUseProvidedParameters() throws Exception { + final Client client = cluster.connect().alias(GTX); + final RemoteTransaction tx = cluster.transact(GTX); + tx.begin(); + final Map params = new HashMap(); + params.put("x", "consonants"); + tx.submit("g.addV(x)", params); + tx.commit(); + + final List<Result> results = + client.submit("g.V().hasLabel('consonants')").all().get(MAX_GET_WAIT, TimeUnit.MILLISECONDS); + assertEquals(1L, results.size()); + } + + @Test + public void shouldCleanUpOnBeginFailure() throws Exception { + final RemoteTransaction tx = cluster.transact("gclassic"); + + try { + tx.begin(); + fail("Expected exception on begin for non-transactional graph"); + } catch (RuntimeException ex) { + assertThat(ex.getMessage(), containsString("Failed to begin transaction")); + } + + // verify cleanup: transaction is closed and has no ID + assertFalse(tx.isOpen()); + assertNull(tx.getTransactionId()); + + // second begin should fail — state moved to CLOSED, not back to NOT_STARTED + try { + tx.begin(); + fail("Expected IllegalStateException on begin after failed begin"); + } catch (IllegalStateException ex) { + assertThat(ex.getMessage(), containsString("Transaction already started")); + } + } + + @Test + public void shouldKeepTransactionOpenAfterTraversalError() throws Exception { + final Client client = cluster.connect().alias(GTX); + + final RemoteTransaction tx = cluster.transact(GTX); + tx.begin(); + tx.submit("g.addV('good_vertex')"); + + // submit a bad traversal that should fail + try { + tx.submit("g.V().fail()"); + } catch (Exception ex) { + // expected error from bad traversal + } + + // transaction should still be open — rollback should work + assertTrue(tx.isOpen()); + tx.rollback(); + + assertFalse(tx.isOpen()); + assertEquals(0L, client.submit("g.V().hasLabel('good_vertex').count()").all().get().get(0).getLong()); + } + + @Test + public void shouldWorkWithDriverRemoteConnection() throws Exception { + final GraphTraversalSource g = traversal().with(DriverRemoteConnection.using(cluster, GTX)); + final GraphTraversalSource gtx = g.tx().begin(); + gtx.addV("val").iterate(); + gtx.tx().commit(); + + assertEquals(1L, g.V().hasLabel("val").count().next().longValue()); + } +} diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/HttpDriverIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/HttpDriverIntegrateTest.java index 17563cdd8b..f6452abd09 100644 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/HttpDriverIntegrateTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/HttpDriverIntegrateTest.java @@ -209,22 +209,6 @@ public class HttpDriverIntegrateTest extends AbstractGremlinServerIntegrationTes } } - @Test - public void shouldFailToUseSession() { - final Cluster cluster = TestClientFactory.build().create(); - try { - final Client client = cluster.connect("shouldFailToUseSession"); - client.submit("g.inject(2)").all().get(); - fail("Can't use session with HTTP"); - } catch (Exception ex) { - final Throwable t = ExceptionUtils.getRootCause(ex); - // assertEquals("Cannot use sessions or tx() with HttpChannelizer", t.getMessage()); - assertEquals("not implemented", t.getMessage()); - } finally { - cluster.close(); - } - } - @Test public void shouldDeserializeErrorWithGraphBinary() { final Cluster cluster = TestClientFactory.build().create();
