kenhuuu commented on code in PR #3434:
URL: https://github.com/apache/tinkerpop/pull/3434#discussion_r3375548151


##########
gremlin-dotnet/src/Gremlin.Net/Driver/RemoteTransaction.cs:
##########
@@ -0,0 +1,270 @@
+#region License
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#endregion
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Gremlin.Net.Driver.Messages;
+using Gremlin.Net.Driver.Remote;
+using Gremlin.Net.Process.Traversal;
+
+namespace Gremlin.Net.Driver
+{
+    /// <summary>
+    ///     Controls an explicit remote transaction. Created via
+    ///     <c>GremlinClient.Transact()</c> or <c>g.Tx()</c>.
+    ///     The transaction is not started until <see cref="BeginAsync"/> is 
called.
+    ///
+    ///     All submissions are serialized internally via a semaphore to 
guarantee
+    ///     the server receives requests in order, even if the caller does not 
await
+    ///     each call before issuing the next.
+    ///
+    ///     This class is NOT thread-safe. Do not share a RemoteTransaction 
across
+    ///     multiple threads without external synchronization.
+    /// </summary>
+    public class RemoteTransaction : IGremlinClient, IAsyncDisposable
+    {
+        private readonly IGremlinClient _client;
+        private readonly string _traversalSource;
+        // Serializes all submissions to guarantee ordering. Matches the role 
of
+        // Java's synchronized on HttpRemoteTransaction.submitInternal.
+        private readonly SemaphoreSlim _submitLock = new(1, 1);
+        private string? _transactionId;
+        private bool _isOpen;
+        private bool _failed;
+        private TransactionRemoteConnection? _txConnection;
+
+        internal RemoteTransaction(IGremlinClient client, string 
traversalSource)
+        {
+            _client = client ?? throw new 
ArgumentNullException(nameof(client));
+            _traversalSource = traversalSource ?? throw new 
ArgumentNullException(nameof(traversalSource));
+        }
+
+        /// <summary>
+        ///     Gets the server-generated transaction ID, or null if the 
transaction has not yet been started.
+        /// </summary>
+        public string? TransactionId => _transactionId;
+
+        /// <summary>
+        ///     Gets whether the transaction is currently open.
+        /// </summary>
+        public bool IsOpen => _isOpen;
+
+        /// <summary>
+        ///     Starts the transaction and returns a transaction-bound <see 
cref="GraphTraversalSource"/>.
+        /// </summary>
+        /// <param name="cancellationToken">The token to cancel the 
operation.</param>
+        /// <returns>A <see cref="GraphTraversalSource"/> bound to this 
transaction.</returns>
+        /// <exception cref="InvalidOperationException">Thrown if the 
transaction is already started.</exception>
+        public async Task<GraphTraversalSource> BeginAsync(CancellationToken 
cancellationToken = default)
+        {
+            if (_isOpen || _failed)
+            {
+                throw new InvalidOperationException("Transaction already 
started");
+            }
+
+            var requestMsg = RequestMessage.Build("g.tx().begin()")
+                .AddG(_traversalSource)
+                .Create();
+
+            await 
_submitLock.WaitAsync(cancellationToken).ConfigureAwait(false);
+            try
+            {
+                List<object> results;
+                try
+                {
+                    var resultSet = await 
_client.SubmitAsync<object>(requestMsg, cancellationToken)
+                        .ConfigureAwait(false);
+                    results = await 
resultSet.ToListAsync(cancellationToken).ConfigureAwait(false);
+                }
+                catch
+                {
+                    _failed = true;
+                    throw;
+                }
+
+                if (results.Count == 0)
+                {
+                    _failed = true;
+                    throw new InvalidOperationException("Server did not return 
transaction ID");
+                }
+
+                if (results[0] is Dictionary<object, object> resultMap &&
+                    resultMap.TryGetValue("transactionId", out var txIdObj) &&
+                    txIdObj is string txId && !string.IsNullOrEmpty(txId))
+                {
+                    _transactionId = txId;
+                }
+                else
+                {
+                    _failed = true;
+                    throw new InvalidOperationException("Server did not return 
transaction ID in expected format");
+                }
+
+                _isOpen = true;
+            }
+            finally
+            {
+                _submitLock.Release();
+            }
+
+            _txConnection = new TransactionRemoteConnection(_client, 
_traversalSource, _transactionId, this);
+            return new GraphTraversalSource(
+                new List<ITraversalStrategy>(),
+                new GremlinLang(),
+                _txConnection);
+        }
+
+        /// <summary>
+        ///     Commits the transaction.
+        /// </summary>
+        /// <param name="cancellationToken">The token to cancel the 
operation.</param>
+        /// <exception cref="InvalidOperationException">Thrown if the 
transaction is not open.</exception>
+        public async Task CommitAsync(CancellationToken cancellationToken = 
default)
+        {
+            await CloseTransactionAsync("g.tx().commit()", 
cancellationToken).ConfigureAwait(false);
+        }
+
+        /// <summary>
+        ///     Rolls back the transaction.
+        /// </summary>
+        /// <param name="cancellationToken">The token to cancel the 
operation.</param>
+        /// <exception cref="InvalidOperationException">Thrown if the 
transaction is not open.</exception>
+        public async Task RollbackAsync(CancellationToken cancellationToken = 
default)
+        {
+            await CloseTransactionAsync("g.tx().rollback()", 
cancellationToken).ConfigureAwait(false);
+        }
+
+        private async Task CloseTransactionAsync(string script, 
CancellationToken cancellationToken)
+        {
+            if (!_isOpen)
+            {
+                throw new InvalidOperationException("Transaction is not open");
+            }
+
+            var requestMsg = RequestMessage.Build(script)
+                .AddG(_traversalSource)
+                .AddField(Tokens.ArgsTransactionId, _transactionId!)
+                .Create();
+
+            await 
_submitLock.WaitAsync(cancellationToken).ConfigureAwait(false);
+            try
+            {
+                var resultSet = await _client.SubmitAsync<object>(requestMsg, 
cancellationToken).ConfigureAwait(false);
+                // Drain the result to surface any GraphBinary-level errors 
from the response body
+                await 
resultSet.ToListAsync(cancellationToken).ConfigureAwait(false);
+            }
+            finally
+            {
+                _submitLock.Release();
+            }
+            _isOpen = false;
+            _failed = true; // Terminal state: transaction cannot be reused
+            _txConnection?.MarkClosed();
+        }
+
+        /// <summary>
+        ///     Submits a <see cref="RequestMessage"/> within this transaction.
+        ///     Submissions are serialized to guarantee the server receives 
them in order.
+        /// </summary>
+        public async Task<ResultSet<T>> SubmitAsync<T>(RequestMessage 
requestMessage,
+            CancellationToken cancellationToken = default)
+        {
+            if (!_isOpen)
+            {
+                throw new InvalidOperationException("Transaction is not open");
+            }
+
+            await 
_submitLock.WaitAsync(cancellationToken).ConfigureAwait(false);
+            try
+            {
+                return await _client.SubmitAsync<T>(requestMessage, 
cancellationToken).ConfigureAwait(false);
+            }
+            finally
+            {
+                _submitLock.Release();
+            }
+        }
+
+        /// <summary>
+        ///     Submits a plain gremlin-lang string within this transaction.
+        ///     The transactionId is automatically attached.
+        /// </summary>
+        /// <param name="gremlin">The Gremlin query string.</param>
+        /// <param name="cancellationToken">The token to cancel the 
operation.</param>
+        /// <returns>A <see cref="ResultSet{T}"/> containing the 
results.</returns>
+        /// <exception cref="InvalidOperationException">Thrown if the 
transaction is not open.</exception>
+        public async Task<ResultSet<T>> SubmitAsync<T>(string gremlin, 
CancellationToken cancellationToken = default)
+        {
+            if (!_isOpen)
+            {
+                throw new InvalidOperationException("Transaction is not open");
+            }
+
+            var requestMsg = RequestMessage.Build(gremlin)
+                .AddG(_traversalSource)
+                .AddField(Tokens.ArgsTransactionId, _transactionId!)
+                .Create();
+
+            await 
_submitLock.WaitAsync(cancellationToken).ConfigureAwait(false);
+            try
+            {
+                return await _client.SubmitAsync<T>(requestMsg, 
cancellationToken).ConfigureAwait(false);
+            }
+            finally
+            {
+                _submitLock.Release();
+            }
+        }
+
+        /// <summary>
+        ///     Disposes the transaction asynchronously. Default behavior is 
rollback.
+        /// </summary>
+        public async ValueTask DisposeAsync()
+        {
+            if (_isOpen)
+            {
+                try
+                {
+                    await RollbackAsync().ConfigureAwait(false);
+                }
+                catch
+                {
+                    _isOpen = false;
+                    _failed = true;
+                    _txConnection?.MarkClosed();
+                }
+            }
+        }
+
+        /// <summary>
+        ///     Synchronous dispose (required by IGremlinClient/IDisposable).
+        ///     Does not attempt rollback. Use <c>await using</c> for proper 
cleanup.
+        /// </summary>
+        public void Dispose()

Review Comment:
   This is actually a consequence of the interface of `IGremlinClient` which is 
`IDisposable`. Because I wanted to extend the same interface, it meant that 
`RemoteTransaction` also needs to implement a `Dispose` but its not safe to 
rollback there because of how certain .NET runtimes with a 
`SynchronizationContext` behave. This means that .NET users will have to use 
`await using` rather than `using`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to