This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new d1e163a3b96 IGNITE-27427 .NET: Synchronize observableTs within
IgniteClientGroup (#7285)
d1e163a3b96 is described below
commit d1e163a3b96dba5f70008a8f0d1487e1775c870f
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Tue Dec 23 14:20:34 2025 +0200
IGNITE-27427 .NET: Synchronize observableTs within IgniteClientGroup (#7285)
* Extract `HybridTimestampTracker`
* Share one tracker across all clients in `IgniteClientGroup`
---
.../Apache.Ignite.Tests/ClientSocketTests.cs | 2 +-
.../Apache.Ignite.Tests/IgniteClientGroupTests.cs | 21 ++++++-
.../platforms/dotnet/Apache.Ignite/IgniteClient.cs | 9 ++-
.../dotnet/Apache.Ignite/IgniteClientGroup.cs | 9 ++-
.../Apache.Ignite/Internal/ClientFailoverSocket.cs | 26 +++------
.../Internal/HybridTimestampTracker.cs | 64 ++++++++++++++++++++++
.../Internal/IgniteClientConfigurationInternal.cs | 4 +-
7 files changed, 111 insertions(+), 24 deletions(-)
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/ClientSocketTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/ClientSocketTests.cs
index 22614e48d77..104d9fe1e39 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/ClientSocketTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/ClientSocketTests.cs
@@ -87,7 +87,7 @@ namespace Apache.Ignite.Tests
new(new(IPAddress.Loopback, serverPort ?? ServerPort),
string.Empty, string.Empty);
private static IgniteClientConfigurationInternal GetConfigInternal() =>
- new(new(), Task.FromResult<IgniteApiAccessor>(null!),
DnsResolver.Instance);
+ new(new(), Task.FromResult<IgniteApiAccessor>(null!),
DnsResolver.Instance, new());
private class NoOpListener : IClientSocketEventListener
{
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteClientGroupTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteClientGroupTests.cs
index e4c25c10f5f..f3061c789f4 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteClientGroupTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteClientGroupTests.cs
@@ -19,6 +19,7 @@ namespace Apache.Ignite.Tests;
using System;
using System.Threading.Tasks;
+using Internal;
using NUnit.Framework;
/// <summary>
@@ -118,7 +119,7 @@ public class IgniteClientGroupTests
[Test]
public async Task TestToString()
{
- var group = CreateGroup(5);
+ using var group = CreateGroup(5);
await group.GetIgniteAsync();
await group.GetIgniteAsync();
@@ -129,7 +130,7 @@ public class IgniteClientGroupTests
[Test]
public void TestConfigurationCantBeChanged()
{
- IgniteClientGroup group = CreateGroup(3);
+ using IgniteClientGroup group = CreateGroup(3);
var configuration = group.Configuration;
configuration.Size = 100;
@@ -138,6 +139,22 @@ public class IgniteClientGroupTests
Assert.AreNotSame(configuration, group.Configuration);
}
+ [Test]
+ public async Task TestClientsShareObservableTimestamp()
+ {
+ _server.ObservableTimestamp = 123456789;
+ using IgniteClientGroup group = CreateGroup(size: 2);
+
+ var client1 = (IgniteClientInternal)await group.GetIgniteAsync();
+ var client2 = (IgniteClientInternal)await group.GetIgniteAsync();
+
+ Assert.AreNotSame(client1, client2);
+ Assert.AreSame(client1.Socket.Configuration.ObservableTimestamp,
client2.Socket.Configuration.ObservableTimestamp);
+
+ Assert.AreEqual(123456789, client1.Socket.ObservableTimestamp);
+ Assert.AreEqual(123456789, client2.Socket.ObservableTimestamp);
+ }
+
private IgniteClientGroup CreateGroup(int size = 1) =>
new IgniteClientGroup(
new IgniteClientGroupConfiguration
diff --git a/modules/platforms/dotnet/Apache.Ignite/IgniteClient.cs
b/modules/platforms/dotnet/Apache.Ignite/IgniteClient.cs
index bcf4b64be39..5017e36e423 100644
--- a/modules/platforms/dotnet/Apache.Ignite/IgniteClient.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/IgniteClient.cs
@@ -39,8 +39,12 @@ public static class IgniteClient
/// </summary>
/// <param name="configuration">Configuration.</param>
/// <param name="dnsResolver">DNS resolver.</param>
+ /// <param name="hybridTs">Hybrid timestamp.</param>
/// <returns>Started client.</returns>
- internal static async Task<IIgniteClient>
StartInternalAsync(IgniteClientConfiguration configuration, IDnsResolver
dnsResolver)
+ internal static async Task<IIgniteClient> StartInternalAsync(
+ IgniteClientConfiguration configuration,
+ IDnsResolver dnsResolver,
+ HybridTimestampTracker? hybridTs = null)
{
IgniteArgumentCheck.NotNull(configuration);
@@ -48,7 +52,8 @@ public static class IgniteClient
var internalConfig = new IgniteClientConfigurationInternal(
new(configuration), // Defensive copy.
apiTaskSource.Task,
- dnsResolver);
+ dnsResolver,
+ hybridTs ?? new HybridTimestampTracker());
var socket = await
ClientFailoverSocket.ConnectAsync(internalConfig).ConfigureAwait(false);
var client = new IgniteClientInternal(socket);
diff --git a/modules/platforms/dotnet/Apache.Ignite/IgniteClientGroup.cs
b/modules/platforms/dotnet/Apache.Ignite/IgniteClientGroup.cs
index 978352d3d61..453b49ac39d 100644
--- a/modules/platforms/dotnet/Apache.Ignite/IgniteClientGroup.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/IgniteClientGroup.cs
@@ -60,6 +60,9 @@ public sealed class IgniteClientGroup : IDisposable
private readonly SemaphoreSlim _clientsLock = new(1);
+ /** Shared hybrid timestamp tracker to ensure consistency across client
instances. */
+ private readonly HybridTimestampTracker _hybridTs = new();
+
private int _disposed;
private int _clientIndex;
@@ -158,7 +161,11 @@ public sealed class IgniteClientGroup : IDisposable
private async Task<IgniteClientInternal> CreateClientAsync()
{
- var client = await
IgniteClient.StartAsync(Configuration.ClientConfiguration).ConfigureAwait(false);
+ var client = await IgniteClient.StartInternalAsync(
+ configuration: Configuration.ClientConfiguration,
+ dnsResolver: new DnsResolver(),
+ hybridTs: _hybridTs)
+ .ConfigureAwait(false);
return (IgniteClientInternal)client;
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
index 96b41d8cacf..c4795308671 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
@@ -49,6 +49,9 @@ namespace Apache.Ignite.Internal
/** Logger. */
private readonly ILogger _logger;
+ /** Observable timestamp. */
+ private readonly HybridTimestampTracker _observableTimestamp;
+
/** Cluster node unique name to endpoint map. */
private readonly ConcurrentDictionary<string, SocketEndpoint>
_endpointsByName = new();
@@ -82,9 +85,6 @@ namespace Apache.Ignite.Internal
/** Local index for round-robin balancing within this FailoverSocket.
*/
private long _endPointIndex = Interlocked.Increment(ref
_globalEndPointIndex);
- /** Observable timestamp. */
- private long _observableTimestamp;
-
/// <summary>
/// Initializes a new instance of the <see
cref="ClientFailoverSocket"/> class.
/// </summary>
@@ -104,6 +104,8 @@ namespace Apache.Ignite.Internal
ClientId = Guid.NewGuid();
ClientIdString = ClientId.ToString();
Configuration = configuration;
+
+ _observableTimestamp = configuration.ObservableTimestamp;
}
/// <summary>
@@ -119,7 +121,7 @@ namespace Apache.Ignite.Internal
/// <summary>
/// Gets the observable timestamp.
/// </summary>
- public long ObservableTimestamp => Interlocked.Read(ref
_observableTimestamp);
+ public long ObservableTimestamp => _observableTimestamp.Value;
/// <summary>
/// Gets the client ID.
@@ -370,20 +372,10 @@ namespace Apache.Ignite.Internal
/// <inheritdoc/>
void IClientSocketEventListener.OnObservableTimestampChanged(long
timestamp)
{
- // Atomically update the observable timestamp to max(newTs, curTs).
- while (true)
+ var prevVal = _observableTimestamp.GetAndUpdate(timestamp);
+ if (prevVal < timestamp)
{
- var current = Interlocked.Read(ref _observableTimestamp);
- if (current >= timestamp)
- {
- return;
- }
-
- if (Interlocked.CompareExchange(ref _observableTimestamp,
timestamp, current) == current)
- {
- _logger.LogObservableTsUpdatedTrace(prev: current,
current: timestamp);
- return;
- }
+ _logger.LogObservableTsUpdatedTrace(prev: prevVal, current:
timestamp);
}
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/HybridTimestampTracker.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/HybridTimestampTracker.cs
new file mode 100644
index 00000000000..764b84cbd1c
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/HybridTimestampTracker.cs
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Internal;
+
+using System.Threading;
+
+/// <summary>
+/// Hybrid timestamp tracker.
+/// </summary>
+internal sealed class HybridTimestampTracker
+{
+ private long _val;
+
+ /// <summary>
+ /// Gets the current value.
+ /// </summary>
+ public long Value => Interlocked.Read(ref _val);
+
+ /// <summary>
+ /// Updates the timestamp to max(newVal, currentVal).
+ /// </summary>
+ /// <param name="newVal">New value.</param>
+ /// <returns>Previous value.</returns>
+ public long GetAndUpdate(long newVal)
+ {
+ // Atomically update the observable timestamp to max(newTs, curTs).
+ long current = Interlocked.Read(ref _val);
+
+ while (true)
+ {
+ if (current >= newVal)
+ {
+ // Already up to date or ahead.
+ return current;
+ }
+
+ long previous = Interlocked.CompareExchange(ref _val, newVal,
current);
+
+ if (previous == current)
+ {
+ // Update succeeded.
+ return current;
+ }
+
+ // Update failed, another thread changed the value.
+ current = previous;
+ }
+ }
+}
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientConfigurationInternal.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientConfigurationInternal.cs
index 5a37b1499df..a036f8a42dc 100644
---
a/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientConfigurationInternal.cs
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientConfigurationInternal.cs
@@ -25,7 +25,9 @@ using System.Threading.Tasks;
/// <param name="Configuration">Configuration.</param>
/// <param name="ApiTask">API accessor task.</param>
/// <param name="DnsResolver">DNS resolver.</param>
+/// <param name="ObservableTimestamp">Observable timestamp.</param>
internal sealed record IgniteClientConfigurationInternal(
IgniteClientConfiguration Configuration,
Task<IgniteApiAccessor> ApiTask,
- IDnsResolver DnsResolver);
+ IDnsResolver DnsResolver,
+ HybridTimestampTracker ObservableTimestamp);