This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new e490bfe781 IGNITE-20056 .NET: Track observable timestamp (#2426)
e490bfe781 is described below

commit e490bfe781dd0cdc57660e841307ecd8a6ffec20
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Wed Aug 9 15:46:55 2023 +0300

    IGNITE-20056 .NET: Track observable timestamp (#2426)
    
    Track observable timestamp in .NET client:
    * Maintain latest value in `ClientFailoverSocket`
    * Send back to server in `TxBegin` and `SqlExec`
---
 .../dotnet/Apache.Ignite.Tests/.editorconfig       |  3 ++
 .../Apache.Ignite.Tests/ClientSocketTests.cs       | 23 +++++++--
 .../dotnet/Apache.Ignite.Tests/FakeServer.cs       | 20 +++++++-
 .../Transactions/TransactionsTests.cs              | 27 ++++++++++
 .../Apache.Ignite/Internal/ClientFailoverSocket.cs | 60 +++++++++++++++-------
 .../dotnet/Apache.Ignite/Internal/ClientSocket.cs  | 22 ++++----
 .../Internal/IClientSocketEventListener.cs         | 36 +++++++++++++
 .../dotnet/Apache.Ignite/Internal/Sql/Sql.cs       |  3 +-
 .../Internal/Transactions/Transactions.cs          |  4 +-
 9 files changed, 159 insertions(+), 39 deletions(-)

diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/.editorconfig 
b/modules/platforms/dotnet/Apache.Ignite.Tests/.editorconfig
index 44f85e5015..3937bd3ecc 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/.editorconfig
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/.editorconfig
@@ -46,3 +46,6 @@ dotnet_diagnostic.NUnit2005.severity = none # Consider using 
the constraint mode
 dotnet_diagnostic.NUnit2006.severity = none # Consider using the constraint 
model
 dotnet_diagnostic.NUnit2015.severity = none # Consider using the constraint 
model
 dotnet_diagnostic.NUnit2031.severity = none # Consider using the constraint 
model
+
+# ReSharper (refer to 
https://www.jetbrains.com/help/resharper/Reference__Code_Inspections_CSHARP.html)
+resharper_using_statement_resource_initialization_highlighting = none # Do not 
use object initializer for 'using' variable
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/ClientSocketTests.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/ClientSocketTests.cs
index 1d873755b8..64d23585d5 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/ClientSocketTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/ClientSocketTests.cs
@@ -30,10 +30,12 @@ namespace Apache.Ignite.Tests
     /// </summary>
     public class ClientSocketTests : IgniteTestsBase
     {
+        private static readonly IClientSocketEventListener Listener = new 
NoOpListener();
+
         [Test]
         public async Task TestConnectAndSendRequestReturnsResponse()
         {
-            using var socket = await ClientSocket.ConnectAsync(GetEndPoint(), 
new(), _ => {});
+            using var socket = await ClientSocket.ConnectAsync(GetEndPoint(), 
new(), Listener);
 
             using var requestWriter = ProtoCommon.GetMessageWriter();
             requestWriter.MessageWriter.Write("non-existent-table");
@@ -45,7 +47,7 @@ namespace Apache.Ignite.Tests
         [Test]
         public async Task 
TestConnectAndSendRequestWithInvalidOpCodeThrowsError()
         {
-            using var socket = await ClientSocket.ConnectAsync(GetEndPoint(), 
new(), _ => {});
+            using var socket = await ClientSocket.ConnectAsync(GetEndPoint(), 
new(), Listener);
 
             using var requestWriter = ProtoCommon.GetMessageWriter();
             requestWriter.MessageWriter.Write(123);
@@ -59,7 +61,7 @@ namespace Apache.Ignite.Tests
         [Test]
         public async Task TestDisposedSocketThrowsExceptionOnSend()
         {
-            var socket = await ClientSocket.ConnectAsync(GetEndPoint(), new(), 
_ => {});
+            var socket = await ClientSocket.ConnectAsync(GetEndPoint(), new(), 
Listener);
 
             socket.Dispose();
 
@@ -78,10 +80,23 @@ namespace Apache.Ignite.Tests
         [Test]
         public void TestConnectWithoutServerThrowsException()
         {
-            Assert.CatchAsync(async () => await 
ClientSocket.ConnectAsync(GetEndPoint(569), new(), _ => { }));
+            Assert.CatchAsync(async () => await 
ClientSocket.ConnectAsync(GetEndPoint(569), new(), Listener));
         }
 
         private static SocketEndpoint GetEndPoint(int? serverPort = null) =>
             new(new(IPAddress.Loopback, serverPort ?? ServerPort), 
string.Empty);
+
+        private class NoOpListener : IClientSocketEventListener
+        {
+            public void OnAssignmentChanged(ClientSocket clientSocket)
+            {
+                // No-op.
+            }
+
+            public void OnObservableTimestampChanged(long timestamp)
+            {
+                // No-op.
+            }
+        }
     }
 }
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
index 81c668e199..ac1f46e962 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
@@ -120,6 +120,10 @@ namespace Apache.Ignite.Tests
 
         public int RequestCount { get; set; }
 
+        public long ObservableTimestamp { get; set; }
+
+        public long LastClientObservableTimestamp { get; set; }
+
         internal IList<ClientOp> ClientOps => _ops?.ToList() ?? throw new 
Exception("Ops tracking is disabled");
 
         public async Task<IIgniteClient> 
ConnectClientAsync(IgniteClientConfiguration? cfg = null)
@@ -281,6 +285,9 @@ namespace Apache.Ignite.Tests
                         continue;
 
                     case ClientOp.TxBegin:
+                        reader.Skip(); // Read only.
+                        LastClientObservableTimestamp = reader.ReadInt64();
+
                         Send(handler, requestId, new byte[] { 0 }.AsMemory());
                         continue;
 
@@ -339,7 +346,7 @@ namespace Apache.Ignite.Tests
             writer.Write(0); // Message type.
             writer.Write(requestId);
             writer.Write(PartitionAssignmentChanged ? 
(int)ResponseFlags.PartitionAssignmentChanged : 0);
-            writer.Write(0); // Observable timestamp.
+            writer.Write(ObservableTimestamp); // Observable timestamp.
 
             if (!isError)
             {
@@ -411,6 +418,17 @@ namespace Apache.Ignite.Tests
             var sql = reader.ReadString();
             props["sql"] = sql;
 
+            if (!reader.TryReadNil())
+            {
+                var argCount = reader.ReadInt32();
+                if (argCount > 0)
+                {
+                    reader.Skip();
+                }
+            }
+
+            LastClientObservableTimestamp = reader.ReadInt64();
+
             LastSql = sql;
             LastSqlPageSize = pageSize;
             LastSqlTimeoutMs = timeoutMs;
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs
 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs
index c493020f54..d4e7da66c5 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs
@@ -283,6 +283,33 @@ namespace Apache.Ignite.Tests.Transactions
             Assert.AreEqual($"Transaction {{ Id = {id + 2}, State = Committed, 
IsReadOnly = False }}", tx3.ToString());
         }
 
+        [Test]
+        public async Task TestObservableTimestampPropagation([Values(true, 
false)] bool sql)
+        {
+            using var server = new FakeServer();
+            using var client = await server.ConnectClientAsync();
+
+            server.ObservableTimestamp = 123;
+
+            // Non-transactional operations do not propagate timestamp.
+            await client.Tables.GetTablesAsync();
+            await client.Tables.GetTablesAsync();
+
+            Assert.AreEqual(0, server.LastClientObservableTimestamp);
+
+            // Transactional operations propagate timestamp.
+            if (sql)
+            {
+                await client.Sql.ExecuteAsync(null, "select 1");
+            }
+            else
+            {
+                await client.Transactions.BeginAsync();
+            }
+
+            Assert.AreEqual(123, server.LastClientObservableTimestamp);
+        }
+
         private class CustomTx : ITransaction
         {
             public bool IsReadOnly => false;
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
index 08a3140ec2..5f2e64f4df 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
@@ -38,7 +38,7 @@ namespace Apache.Ignite.Internal
     /// <summary>
     /// Client socket wrapper with reconnect/failover functionality.
     /// </summary>
-    internal sealed class ClientFailoverSocket : IDisposable
+    internal sealed class ClientFailoverSocket : IDisposable, 
IClientSocketEventListener
     {
         /** Current global endpoint index for Round-robin. */
         private static long _globalEndPointIndex;
@@ -75,6 +75,9 @@ namespace Apache.Ignite.Internal
         /** Local index for round-robin balancing within this FailoverSocket. 
*/
         private long _endPointIndex = Interlocked.Increment(ref 
_globalEndPointIndex);
 
+        /** Observable timestamp. */
+        private long _observableTimestamp;
+
         /// <summary>
         /// Initializes a new instance of the <see 
cref="ClientFailoverSocket"/> class.
         /// </summary>
@@ -104,6 +107,11 @@ namespace Apache.Ignite.Internal
         /// </summary>
         public int PartitionAssignmentVersion => 
Interlocked.CompareExchange(ref _assignmentVersion, -1, -1);
 
+        /// <summary>
+        /// Gets the observable timestamp.
+        /// </summary>
+        public long ObservableTimestamp => Interlocked.Read(ref 
_observableTimestamp);
+
         /// <summary>
         /// Connects the socket.
         /// </summary>
@@ -245,6 +253,38 @@ namespace Apache.Ignite.Internal
             return res;
         }
 
+        /// <inheritdoc/>
+        void IClientSocketEventListener.OnAssignmentChanged(ClientSocket 
clientSocket)
+        {
+            // NOTE: Multiple channels will send the same update to us, 
resulting in multiple cache invalidations.
+            // This could be solved with a cluster-wide AssignmentVersion, but 
we don't have that.
+            // So we only react to updates from the last known good channel. 
When no user-initiated operations are performed on that
+            // channel, heartbeat messages will trigger updates.
+            if (clientSocket == _lastConnectedSocket)
+            {
+                Interlocked.Increment(ref _assignmentVersion);
+            }
+        }
+
+        /// <inheritdoc/>
+        void IClientSocketEventListener.OnObservableTimestampChanged(long 
timestamp)
+        {
+            // Atomically update the observable timestamp to max(newTs, curTs).
+            while (true)
+            {
+                var current = Interlocked.Read(ref _observableTimestamp);
+                if (current >= timestamp)
+                {
+                    return;
+                }
+
+                if (Interlocked.CompareExchange(ref _observableTimestamp, 
timestamp, current) == current)
+                {
+                    return;
+                }
+            }
+        }
+
         /// <summary>
         /// Gets a socket. Reconnects if necessary.
         /// </summary>
@@ -412,7 +452,7 @@ namespace Apache.Ignite.Internal
 
             try
             {
-                var socket = await ClientSocket.ConnectAsync(endpoint, 
Configuration, OnAssignmentChanged).ConfigureAwait(false);
+                var socket = await ClientSocket.ConnectAsync(endpoint, 
Configuration, this).ConfigureAwait(false);
 
                 if (_clusterId == null)
                 {
@@ -440,22 +480,6 @@ namespace Apache.Ignite.Internal
             }
         }
 
-        /// <summary>
-        /// Called when an assignment update is detected.
-        /// </summary>
-        /// <param name="clientSocket">Socket.</param>
-        private void OnAssignmentChanged(ClientSocket clientSocket)
-        {
-            // NOTE: Multiple channels will send the same update to us, 
resulting in multiple cache invalidations.
-            // This could be solved with a cluster-wide AssignmentVersion, but 
we don't have that.
-            // So we only react to updates from the last known good channel. 
When no user-initiated operations are performed on that
-            // channel, heartbeat messages will trigger updates.
-            if (clientSocket == _lastConnectedSocket)
-            {
-                Interlocked.Increment(ref _assignmentVersion);
-            }
-        }
-
         /// <summary>
         /// Gets the endpoints: all combinations of IP addresses and ports 
according to configuration.
         /// </summary>
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
index d61dd176e0..2c5039eb8c 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
@@ -92,8 +92,8 @@ namespace Apache.Ignite.Internal
         /** Logger. */
         private readonly IIgniteLogger? _logger;
 
-        /** Partition assignment change callback. */
-        private readonly Action<ClientSocket> _assignmentChangeCallback;
+        /** Event listener. */
+        private readonly IClientSocketEventListener _listener;
 
         /** Pre-allocated buffer for message size + op code + request id. To 
be used under <see cref="_sendLock"/>. */
         private readonly byte[] _prefixBuffer = new 
byte[ProtoCommon.MessagePrefixSize];
@@ -110,18 +110,18 @@ namespace Apache.Ignite.Internal
         /// <param name="stream">Network stream.</param>
         /// <param name="configuration">Configuration.</param>
         /// <param name="connectionContext">Connection context.</param>
-        /// <param name="assignmentChangeCallback">Partition assignment change 
callback.</param>
+        /// <param name="listener">Event listener.</param>
         /// <param name="logger">Logger.</param>
         private ClientSocket(
             Stream stream,
             IgniteClientConfiguration configuration,
             ConnectionContext connectionContext,
-            Action<ClientSocket> assignmentChangeCallback,
+            IClientSocketEventListener listener,
             IIgniteLogger? logger)
         {
             _stream = stream;
             ConnectionContext = connectionContext;
-            _assignmentChangeCallback = assignmentChangeCallback;
+            _listener = listener;
             _logger = logger;
             _socketTimeout = configuration.SocketTimeout;
 
@@ -154,7 +154,7 @@ namespace Apache.Ignite.Internal
         /// </summary>
         /// <param name="endPoint">Specific endpoint to connect to.</param>
         /// <param name="configuration">Configuration.</param>
-        /// <param name="assignmentChangeCallback">Partition assignment change 
callback.</param>
+        /// <param name="listener">Event listener.</param>
         /// <returns>A <see cref="Task{TResult}"/> representing the result of 
the asynchronous operation.</returns>
         [SuppressMessage(
             "Microsoft.Reliability",
@@ -163,7 +163,7 @@ namespace Apache.Ignite.Internal
         public static async Task<ClientSocket> ConnectAsync(
             SocketEndpoint endPoint,
             IgniteClientConfiguration configuration,
-            Action<ClientSocket> assignmentChangeCallback)
+            IClientSocketEventListener listener)
         {
             var socket = new Socket(SocketType.Stream, ProtocolType.Tcp)
             {
@@ -209,7 +209,7 @@ namespace Apache.Ignite.Internal
                     logger.Debug($"Handshake succeeded 
[remoteAddress={socket.RemoteEndPoint}]: {context}.");
                 }
 
-                return new ClientSocket(stream, configuration, context, 
assignmentChangeCallback, logger);
+                return new ClientSocket(stream, configuration, context, 
listener, logger);
             }
             catch (Exception e)
             {
@@ -697,11 +697,11 @@ namespace Apache.Ignite.Internal
                         $"Partition assignment change notification received 
[remoteAddress={ConnectionContext.ClusterNode.Address}]");
                 }
 
-                _assignmentChangeCallback(this);
+                _listener.OnAssignmentChanged(this);
             }
 
-            // TODO IGNITE-20056 .NET: Thin 3.0: Track observable timestamp
-            _ = reader.ReadInt64();
+            var observableTimestamp = reader.ReadInt64();
+            _listener.OnObservableTimestampChanged(observableTimestamp);
 
             var exception = ReadError(ref reader);
 
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/IClientSocketEventListener.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/IClientSocketEventListener.cs
new file mode 100644
index 0000000000..469afbfd2d
--- /dev/null
+++ 
b/modules/platforms/dotnet/Apache.Ignite/Internal/IClientSocketEventListener.cs
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Internal;
+
+/// <summary>
+/// <see cref="ClientSocket"/> event listener.
+/// </summary>
+internal interface IClientSocketEventListener
+{
+    /// <summary>
+    /// Called when partition assignment changes.
+    /// </summary>
+    /// <param name="clientSocket">Source socket.</param>
+    void OnAssignmentChanged(ClientSocket clientSocket);
+
+    /// <summary>
+    /// Called when observable timestamp changes.
+    /// </summary>
+    /// <param name="timestamp">Timestamp.</param>
+    void OnObservableTimestampChanged(long timestamp);
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs
index bc7a569d28..c7f544613b 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs
@@ -177,8 +177,7 @@ namespace Apache.Ignite.Internal.Sql
                 w.Write(statement.Query);
                 w.WriteObjectCollectionAsBinaryTuple(args);
 
-                // TODO IGNITE-20056 .NET: Thin 3.0: Track observable timestamp
-                w.Write(0);
+                w.Write(_socket.ObservableTimestamp);
 
                 return writer;
             }
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Transactions/Transactions.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Transactions/Transactions.cs
index 31129c944c..bf4698341c 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Transactions/Transactions.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Transactions/Transactions.cs
@@ -59,9 +59,7 @@ namespace Apache.Ignite.Internal.Transactions
             {
                 var w = writer.MessageWriter;
                 w.Write(options.ReadOnly);
-
-                // TODO IGNITE-20056 .NET: Thin 3.0: Track observable timestamp
-                w.Write(0);
+                w.Write(_socket.ObservableTimestamp);
             }
         }
 

Reply via email to