xiazcy commented on code in PR #3329:
URL: https://github.com/apache/tinkerpop/pull/3329#discussion_r2977571982


##########
gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/HttpRemoteTransaction.java:
##########
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver.remote;
+
+import org.apache.tinkerpop.gremlin.driver.Client;
+import org.apache.tinkerpop.gremlin.driver.Cluster;
+import org.apache.tinkerpop.gremlin.driver.Host;
+import org.apache.tinkerpop.gremlin.driver.RemoteTransaction;
+import org.apache.tinkerpop.gremlin.driver.RequestOptions;
+import org.apache.tinkerpop.gremlin.driver.Result;
+import org.apache.tinkerpop.gremlin.driver.ResultSet;
+import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException;
+import org.apache.tinkerpop.gremlin.process.remote.RemoteConnection;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource;
+import org.apache.tinkerpop.gremlin.structure.Transaction;
+import org.apache.tinkerpop.gremlin.structure.util.TransactionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+/**
+ * A {@link Transaction} implementation for HTTP-based remote connections.
+ * <p>
+ * This class provides synchronous, sequential request execution within a 
transaction context.
+ * All requests are pinned to a single host and include the transaction ID 
(after begin()).
+ * <p>
+ * Key characteristics:
+ * <ul>
+ *   <li>Synchronous API only - no submitAsync() methods</li>
+ *   <li>Host pinning - all requests go to the same server</li>
+ *   <li>Sequential execution - requests block until complete</li>
+ * </ul>
+ * <p>
+ * Usage:
+ * <pre>
+ * Transaction tx = cluster.transact("g");
+ * GraphTraversalSource gtx = tx.begin();
+ * gtx.addV("person").property("name", "alice").iterate();
+ * tx.commit();
+ * </pre>
+ *
+ * This class is <b>NOT</b> thread-safe.
+ */
+public class HttpRemoteTransaction implements RemoteTransaction {
+    private static final Logger logger = 
LoggerFactory.getLogger(HttpRemoteTransaction.class);
+    private static final long CLOSING_MAX_WAIT_MS = 10000;
+
+    protected Consumer<Transaction> closeConsumer = CLOSE_BEHAVIOR.COMMIT;
+    private final Client.PinnedClient pinnedClient;
+    private final Cluster cluster;
+    private final Host pinnedHost;
+    private final String graphAlias;
+    private String transactionId;  // null until begin(), set from server 
response
+    private TransactionState state = TransactionState.NOT_STARTED;
+
+    private enum TransactionState {
+        NOT_STARTED, OPEN, CLOSED
+    }
+
+    /**
+     * Creates a new HTTP transaction.
+     * <p>
+     * The transaction is not started until {@link #begin(Class)} is called.
+     * A host is selected at creation time and all requests will be pinned to 
it.
+     *
+     * @param pinnedClient the underlying client for connection access
+     * @param graphAlias the graph/traversal source alias (e.g., "g")
+     * @throws NoHostAvailableException if no hosts are available in the 
cluster
+     */
+    public HttpRemoteTransaction(final Client.PinnedClient pinnedClient, final 
String graphAlias) {
+        this.pinnedClient = pinnedClient;
+        this.graphAlias = graphAlias;
+        this.pinnedHost = pinnedClient.getPinnedHost();
+        this.cluster = pinnedClient.getCluster();
+    }
+
+    /**
+     * Not supported for remote transactions. Use {@link #begin(Class)} 
instead.
+     *
+     * @throws UnsupportedOperationException always
+     */
+    @Override
+    public void open() {
+        begin();
+    }
+
+    /**
+     * Starts a transaction and returns a traversal source bound to it.
+     * <p>
+     * This method sends {@code g.tx().begin()} to the server, which returns
+     * the transaction ID. All subsequent requests will include this ID.
+     *
+     * @param traversalSourceClass the class of the traversal source to create
+     * @param <T> the type of the traversal source
+     * @return a new traversal source bound to this transaction
+     * @throws IllegalStateException if the transaction is already started
+     * @throws RuntimeException if the transaction fails to begin
+     */
+    @Override
+    public <T extends TraversalSource> T begin(final Class<T> 
traversalSourceClass) {
+        if (state != TransactionState.NOT_STARTED) {
+            throw new IllegalStateException("Transaction already started");
+        }
+        cluster.trackTransaction(this);
+
+        try {
+            // Send begin - no txId attached yet
+            final ResultSet rs = submitInternal("g.tx().begin()");
+            
+            // Server returns the transaction ID
+            this.transactionId = extractTransactionId(rs);
+            this.state = TransactionState.OPEN;
+        } catch (Exception e) {
+            cleanUp();
+            throw new RuntimeException("Failed to begin transaction: " + 
e.getMessage(), e);
+        }
+
+        // Create RemoteConnection for the traversal source
+        final TransactionRemoteConnection txConnection = new 
TransactionRemoteConnection(this);
+
+        try {
+            return 
traversalSourceClass.getConstructor(RemoteConnection.class).newInstance(txConnection);
+        } catch (Exception e) {
+            rollback();
+            throw new IllegalStateException("Failed to create 
TraversalSource", e);
+        }
+    }
+
+    /**
+     * Extracts the transaction ID from the begin() response.
+     * <p>
+     * The server returns the transaction ID as part of the response to 
g.tx().begin().
+     *
+     * @param rs the result set from the begin request
+     * @return the transaction ID
+     */
+    private String extractTransactionId(final ResultSet rs) {
+        // Wait for all results and extract the transaction ID
+        final List<Result> results = rs.all().join();
+        if (results.isEmpty()) {
+            throw new IllegalStateException("Server did not return transaction 
ID");
+        }
+        try {
+            final Object id = 
results.get(0).get(Map.class).get("transactionId");
+            if (id == null) throw new IllegalStateException("Server did not 
return transaction ID");
+
+            final String idStr = id.toString();
+            if (idStr.isBlank()) throw new IllegalStateException("Server 
returned empty transaction ID");
+
+            return idStr;
+        } catch (Exception e) {
+            throw new IllegalStateException("Server did not return transaction 
ID");
+        }
+    }
+
+    /**
+     * Commits the transaction.
+     * <p>
+     * Sends {@code g.tx().commit()} to the server and closes the transaction.
+     *
+     * @throws IllegalStateException if the transaction is not open
+     * @throws RuntimeException if the commit fails
+     */
+    @Override
+    public void commit() {
+        closeRemoteTransaction("g.tx().commit()");
+    }
+
+    /**
+     * Rolls back the transaction.
+     * <p>
+     * Sends {@code g.tx().rollback()} to the server and closes the 
transaction.
+     * This is best-effort - errors are logged but not thrown.
+     */
+    @Override
+    public void rollback() {
+        closeRemoteTransaction("g.tx().rollback()");
+    }
+
+    private void closeRemoteTransaction(final String closeScript) {
+        if (state != TransactionState.OPEN) throw new 
IllegalStateException("Transaction is not open");
+
+        try {
+            submitInternal(closeScript).all().get(CLOSING_MAX_WAIT_MS, 
TimeUnit.MILLISECONDS);
+            cleanUp();
+        } catch (Exception e) {
+            logger.warn("Failed to {} transaction on {}", closeScript, 
pinnedHost);
+            throw new TransactionException("Failed to " + closeScript, e);
+        }
+    }
+
+    private void cleanUp() {
+        state = TransactionState.CLOSED;
+        cluster.untrackTransaction(this);

Review Comment:
   Shouldn't we also close the `pinnedClient` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to