This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new d1e163a3b96 IGNITE-27427 .NET: Synchronize observableTs within 
IgniteClientGroup (#7285)
d1e163a3b96 is described below

commit d1e163a3b96dba5f70008a8f0d1487e1775c870f
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Tue Dec 23 14:20:34 2025 +0200

    IGNITE-27427 .NET: Synchronize observableTs within IgniteClientGroup (#7285)
    
    * Extract `HybridTimestampTracker`
    * Share one tracker across all clients in `IgniteClientGroup`
---
 .../Apache.Ignite.Tests/ClientSocketTests.cs       |  2 +-
 .../Apache.Ignite.Tests/IgniteClientGroupTests.cs  | 21 ++++++-
 .../platforms/dotnet/Apache.Ignite/IgniteClient.cs |  9 ++-
 .../dotnet/Apache.Ignite/IgniteClientGroup.cs      |  9 ++-
 .../Apache.Ignite/Internal/ClientFailoverSocket.cs | 26 +++------
 .../Internal/HybridTimestampTracker.cs             | 64 ++++++++++++++++++++++
 .../Internal/IgniteClientConfigurationInternal.cs  |  4 +-
 7 files changed, 111 insertions(+), 24 deletions(-)

diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/ClientSocketTests.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/ClientSocketTests.cs
index 22614e48d77..104d9fe1e39 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/ClientSocketTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/ClientSocketTests.cs
@@ -87,7 +87,7 @@ namespace Apache.Ignite.Tests
             new(new(IPAddress.Loopback, serverPort ?? ServerPort), 
string.Empty, string.Empty);
 
         private static IgniteClientConfigurationInternal GetConfigInternal() =>
-            new(new(), Task.FromResult<IgniteApiAccessor>(null!), 
DnsResolver.Instance);
+            new(new(), Task.FromResult<IgniteApiAccessor>(null!), 
DnsResolver.Instance, new());
 
         private class NoOpListener : IClientSocketEventListener
         {
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteClientGroupTests.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteClientGroupTests.cs
index e4c25c10f5f..f3061c789f4 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteClientGroupTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteClientGroupTests.cs
@@ -19,6 +19,7 @@ namespace Apache.Ignite.Tests;
 
 using System;
 using System.Threading.Tasks;
+using Internal;
 using NUnit.Framework;
 
 /// <summary>
@@ -118,7 +119,7 @@ public class IgniteClientGroupTests
     [Test]
     public async Task TestToString()
     {
-        var group = CreateGroup(5);
+        using var group = CreateGroup(5);
 
         await group.GetIgniteAsync();
         await group.GetIgniteAsync();
@@ -129,7 +130,7 @@ public class IgniteClientGroupTests
     [Test]
     public void TestConfigurationCantBeChanged()
     {
-        IgniteClientGroup group = CreateGroup(3);
+        using IgniteClientGroup group = CreateGroup(3);
 
         var configuration = group.Configuration;
         configuration.Size = 100;
@@ -138,6 +139,22 @@ public class IgniteClientGroupTests
         Assert.AreNotSame(configuration, group.Configuration);
     }
 
+    [Test]
+    public async Task TestClientsShareObservableTimestamp()
+    {
+        _server.ObservableTimestamp = 123456789;
+        using IgniteClientGroup group = CreateGroup(size: 2);
+
+        var client1 = (IgniteClientInternal)await group.GetIgniteAsync();
+        var client2 = (IgniteClientInternal)await group.GetIgniteAsync();
+
+        Assert.AreNotSame(client1, client2);
+        Assert.AreSame(client1.Socket.Configuration.ObservableTimestamp, 
client2.Socket.Configuration.ObservableTimestamp);
+
+        Assert.AreEqual(123456789, client1.Socket.ObservableTimestamp);
+        Assert.AreEqual(123456789, client2.Socket.ObservableTimestamp);
+    }
+
     private IgniteClientGroup CreateGroup(int size = 1) =>
         new IgniteClientGroup(
             new IgniteClientGroupConfiguration
diff --git a/modules/platforms/dotnet/Apache.Ignite/IgniteClient.cs 
b/modules/platforms/dotnet/Apache.Ignite/IgniteClient.cs
index bcf4b64be39..5017e36e423 100644
--- a/modules/platforms/dotnet/Apache.Ignite/IgniteClient.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/IgniteClient.cs
@@ -39,8 +39,12 @@ public static class IgniteClient
     /// </summary>
     /// <param name="configuration">Configuration.</param>
     /// <param name="dnsResolver">DNS resolver.</param>
+    /// <param name="hybridTs">Hybrid timestamp.</param>
     /// <returns>Started client.</returns>
-    internal static async Task<IIgniteClient> 
StartInternalAsync(IgniteClientConfiguration configuration, IDnsResolver 
dnsResolver)
+    internal static async Task<IIgniteClient> StartInternalAsync(
+        IgniteClientConfiguration configuration,
+        IDnsResolver dnsResolver,
+        HybridTimestampTracker? hybridTs = null)
     {
         IgniteArgumentCheck.NotNull(configuration);
 
@@ -48,7 +52,8 @@ public static class IgniteClient
         var internalConfig = new IgniteClientConfigurationInternal(
             new(configuration), // Defensive copy.
             apiTaskSource.Task,
-            dnsResolver);
+            dnsResolver,
+            hybridTs ?? new HybridTimestampTracker());
 
         var socket = await 
ClientFailoverSocket.ConnectAsync(internalConfig).ConfigureAwait(false);
         var client = new IgniteClientInternal(socket);
diff --git a/modules/platforms/dotnet/Apache.Ignite/IgniteClientGroup.cs 
b/modules/platforms/dotnet/Apache.Ignite/IgniteClientGroup.cs
index 978352d3d61..453b49ac39d 100644
--- a/modules/platforms/dotnet/Apache.Ignite/IgniteClientGroup.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/IgniteClientGroup.cs
@@ -60,6 +60,9 @@ public sealed class IgniteClientGroup : IDisposable
 
     private readonly SemaphoreSlim _clientsLock = new(1);
 
+    /** Shared hybrid timestamp tracker to ensure consistency across client 
instances. */
+    private readonly HybridTimestampTracker _hybridTs = new();
+
     private int _disposed;
 
     private int _clientIndex;
@@ -158,7 +161,11 @@ public sealed class IgniteClientGroup : IDisposable
 
     private async Task<IgniteClientInternal> CreateClientAsync()
     {
-        var client = await 
IgniteClient.StartAsync(Configuration.ClientConfiguration).ConfigureAwait(false);
+        var client = await IgniteClient.StartInternalAsync(
+                configuration: Configuration.ClientConfiguration,
+                dnsResolver: new DnsResolver(),
+                hybridTs: _hybridTs)
+            .ConfigureAwait(false);
 
         return (IgniteClientInternal)client;
     }
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
index 96b41d8cacf..c4795308671 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
@@ -49,6 +49,9 @@ namespace Apache.Ignite.Internal
         /** Logger. */
         private readonly ILogger _logger;
 
+        /** Observable timestamp. */
+        private readonly HybridTimestampTracker _observableTimestamp;
+
         /** Cluster node unique name to endpoint map. */
         private readonly ConcurrentDictionary<string, SocketEndpoint> 
_endpointsByName = new();
 
@@ -82,9 +85,6 @@ namespace Apache.Ignite.Internal
         /** Local index for round-robin balancing within this FailoverSocket. 
*/
         private long _endPointIndex = Interlocked.Increment(ref 
_globalEndPointIndex);
 
-        /** Observable timestamp. */
-        private long _observableTimestamp;
-
         /// <summary>
         /// Initializes a new instance of the <see 
cref="ClientFailoverSocket"/> class.
         /// </summary>
@@ -104,6 +104,8 @@ namespace Apache.Ignite.Internal
             ClientId = Guid.NewGuid();
             ClientIdString = ClientId.ToString();
             Configuration = configuration;
+
+            _observableTimestamp = configuration.ObservableTimestamp;
         }
 
         /// <summary>
@@ -119,7 +121,7 @@ namespace Apache.Ignite.Internal
         /// <summary>
         /// Gets the observable timestamp.
         /// </summary>
-        public long ObservableTimestamp => Interlocked.Read(ref 
_observableTimestamp);
+        public long ObservableTimestamp => _observableTimestamp.Value;
 
         /// <summary>
         /// Gets the client ID.
@@ -370,20 +372,10 @@ namespace Apache.Ignite.Internal
         /// <inheritdoc/>
         void IClientSocketEventListener.OnObservableTimestampChanged(long 
timestamp)
         {
-            // Atomically update the observable timestamp to max(newTs, curTs).
-            while (true)
+            var prevVal = _observableTimestamp.GetAndUpdate(timestamp);
+            if (prevVal < timestamp)
             {
-                var current = Interlocked.Read(ref _observableTimestamp);
-                if (current >= timestamp)
-                {
-                    return;
-                }
-
-                if (Interlocked.CompareExchange(ref _observableTimestamp, 
timestamp, current) == current)
-                {
-                    _logger.LogObservableTsUpdatedTrace(prev: current, 
current: timestamp);
-                    return;
-                }
+                _logger.LogObservableTsUpdatedTrace(prev: prevVal, current: 
timestamp);
             }
         }
 
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/HybridTimestampTracker.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/HybridTimestampTracker.cs
new file mode 100644
index 00000000000..764b84cbd1c
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/HybridTimestampTracker.cs
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Internal;
+
+using System.Threading;
+
+/// <summary>
+/// Hybrid timestamp tracker.
+/// </summary>
+internal sealed class HybridTimestampTracker
+{
+    private long _val;
+
+    /// <summary>
+    /// Gets the current value.
+    /// </summary>
+    public long Value => Interlocked.Read(ref _val);
+
+    /// <summary>
+    /// Updates the timestamp to max(newVal, currentVal).
+    /// </summary>
+    /// <param name="newVal">New value.</param>
+    /// <returns>Previous value.</returns>
+    public long GetAndUpdate(long newVal)
+    {
+        // Atomically update the observable timestamp to max(newTs, curTs).
+        long current = Interlocked.Read(ref _val);
+
+        while (true)
+        {
+            if (current >= newVal)
+            {
+                // Already up to date or ahead.
+                return current;
+            }
+
+            long previous = Interlocked.CompareExchange(ref _val, newVal, 
current);
+
+            if (previous == current)
+            {
+                // Update succeeded.
+                return current;
+            }
+
+            // Update failed, another thread changed the value.
+            current = previous;
+        }
+    }
+}
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientConfigurationInternal.cs
 
b/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientConfigurationInternal.cs
index 5a37b1499df..a036f8a42dc 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientConfigurationInternal.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientConfigurationInternal.cs
@@ -25,7 +25,9 @@ using System.Threading.Tasks;
 /// <param name="Configuration">Configuration.</param>
 /// <param name="ApiTask">API accessor task.</param>
 /// <param name="DnsResolver">DNS resolver.</param>
+/// <param name="ObservableTimestamp">Observable timestamp.</param>
 internal sealed record IgniteClientConfigurationInternal(
     IgniteClientConfiguration Configuration,
     Task<IgniteApiAccessor> ApiTask,
-    IDnsResolver DnsResolver);
+    IDnsResolver DnsResolver,
+    HybridTimestampTracker ObservableTimestamp);

Reply via email to