This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new e490bfe781 IGNITE-20056 .NET: Track observable timestamp (#2426)
e490bfe781 is described below
commit e490bfe781dd0cdc57660e841307ecd8a6ffec20
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Wed Aug 9 15:46:55 2023 +0300
IGNITE-20056 .NET: Track observable timestamp (#2426)
Track observable timestamp in .NET client:
* Maintain latest value in `ClientFailoverSocket`
* Send back to server in `TxBegin` and `SqlExec`
---
.../dotnet/Apache.Ignite.Tests/.editorconfig | 3 ++
.../Apache.Ignite.Tests/ClientSocketTests.cs | 23 +++++++--
.../dotnet/Apache.Ignite.Tests/FakeServer.cs | 20 +++++++-
.../Transactions/TransactionsTests.cs | 27 ++++++++++
.../Apache.Ignite/Internal/ClientFailoverSocket.cs | 60 +++++++++++++++-------
.../dotnet/Apache.Ignite/Internal/ClientSocket.cs | 22 ++++----
.../Internal/IClientSocketEventListener.cs | 36 +++++++++++++
.../dotnet/Apache.Ignite/Internal/Sql/Sql.cs | 3 +-
.../Internal/Transactions/Transactions.cs | 4 +-
9 files changed, 159 insertions(+), 39 deletions(-)
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/.editorconfig
b/modules/platforms/dotnet/Apache.Ignite.Tests/.editorconfig
index 44f85e5015..3937bd3ecc 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/.editorconfig
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/.editorconfig
@@ -46,3 +46,6 @@ dotnet_diagnostic.NUnit2005.severity = none # Consider using
the constraint mode
dotnet_diagnostic.NUnit2006.severity = none # Consider using the constraint
model
dotnet_diagnostic.NUnit2015.severity = none # Consider using the constraint
model
dotnet_diagnostic.NUnit2031.severity = none # Consider using the constraint
model
+
+# ReSharper (refer to
https://www.jetbrains.com/help/resharper/Reference__Code_Inspections_CSHARP.html)
+resharper_using_statement_resource_initialization_highlighting = none # Do not
use object initializer for 'using' variable
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/ClientSocketTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/ClientSocketTests.cs
index 1d873755b8..64d23585d5 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/ClientSocketTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/ClientSocketTests.cs
@@ -30,10 +30,12 @@ namespace Apache.Ignite.Tests
/// </summary>
public class ClientSocketTests : IgniteTestsBase
{
+ private static readonly IClientSocketEventListener Listener = new
NoOpListener();
+
[Test]
public async Task TestConnectAndSendRequestReturnsResponse()
{
- using var socket = await ClientSocket.ConnectAsync(GetEndPoint(),
new(), _ => {});
+ using var socket = await ClientSocket.ConnectAsync(GetEndPoint(),
new(), Listener);
using var requestWriter = ProtoCommon.GetMessageWriter();
requestWriter.MessageWriter.Write("non-existent-table");
@@ -45,7 +47,7 @@ namespace Apache.Ignite.Tests
[Test]
public async Task
TestConnectAndSendRequestWithInvalidOpCodeThrowsError()
{
- using var socket = await ClientSocket.ConnectAsync(GetEndPoint(),
new(), _ => {});
+ using var socket = await ClientSocket.ConnectAsync(GetEndPoint(),
new(), Listener);
using var requestWriter = ProtoCommon.GetMessageWriter();
requestWriter.MessageWriter.Write(123);
@@ -59,7 +61,7 @@ namespace Apache.Ignite.Tests
[Test]
public async Task TestDisposedSocketThrowsExceptionOnSend()
{
- var socket = await ClientSocket.ConnectAsync(GetEndPoint(), new(),
_ => {});
+ var socket = await ClientSocket.ConnectAsync(GetEndPoint(), new(),
Listener);
socket.Dispose();
@@ -78,10 +80,23 @@ namespace Apache.Ignite.Tests
[Test]
public void TestConnectWithoutServerThrowsException()
{
- Assert.CatchAsync(async () => await
ClientSocket.ConnectAsync(GetEndPoint(569), new(), _ => { }));
+ Assert.CatchAsync(async () => await
ClientSocket.ConnectAsync(GetEndPoint(569), new(), Listener));
}
private static SocketEndpoint GetEndPoint(int? serverPort = null) =>
new(new(IPAddress.Loopback, serverPort ?? ServerPort),
string.Empty);
+
+ private class NoOpListener : IClientSocketEventListener
+ {
+ public void OnAssignmentChanged(ClientSocket clientSocket)
+ {
+ // No-op.
+ }
+
+ public void OnObservableTimestampChanged(long timestamp)
+ {
+ // No-op.
+ }
+ }
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
index 81c668e199..ac1f46e962 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
@@ -120,6 +120,10 @@ namespace Apache.Ignite.Tests
public int RequestCount { get; set; }
+ public long ObservableTimestamp { get; set; }
+
+ public long LastClientObservableTimestamp { get; set; }
+
internal IList<ClientOp> ClientOps => _ops?.ToList() ?? throw new
Exception("Ops tracking is disabled");
public async Task<IIgniteClient>
ConnectClientAsync(IgniteClientConfiguration? cfg = null)
@@ -281,6 +285,9 @@ namespace Apache.Ignite.Tests
continue;
case ClientOp.TxBegin:
+ reader.Skip(); // Read only.
+ LastClientObservableTimestamp = reader.ReadInt64();
+
Send(handler, requestId, new byte[] { 0 }.AsMemory());
continue;
@@ -339,7 +346,7 @@ namespace Apache.Ignite.Tests
writer.Write(0); // Message type.
writer.Write(requestId);
writer.Write(PartitionAssignmentChanged ?
(int)ResponseFlags.PartitionAssignmentChanged : 0);
- writer.Write(0); // Observable timestamp.
+ writer.Write(ObservableTimestamp); // Observable timestamp.
if (!isError)
{
@@ -411,6 +418,17 @@ namespace Apache.Ignite.Tests
var sql = reader.ReadString();
props["sql"] = sql;
+ if (!reader.TryReadNil())
+ {
+ var argCount = reader.ReadInt32();
+ if (argCount > 0)
+ {
+ reader.Skip();
+ }
+ }
+
+ LastClientObservableTimestamp = reader.ReadInt64();
+
LastSql = sql;
LastSqlPageSize = pageSize;
LastSqlTimeoutMs = timeoutMs;
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs
index c493020f54..d4e7da66c5 100644
---
a/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs
+++
b/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs
@@ -283,6 +283,33 @@ namespace Apache.Ignite.Tests.Transactions
Assert.AreEqual($"Transaction {{ Id = {id + 2}, State = Committed,
IsReadOnly = False }}", tx3.ToString());
}
+ [Test]
+ public async Task TestObservableTimestampPropagation([Values(true,
false)] bool sql)
+ {
+ using var server = new FakeServer();
+ using var client = await server.ConnectClientAsync();
+
+ server.ObservableTimestamp = 123;
+
+ // Non-transactional operations do not propagate timestamp.
+ await client.Tables.GetTablesAsync();
+ await client.Tables.GetTablesAsync();
+
+ Assert.AreEqual(0, server.LastClientObservableTimestamp);
+
+ // Transactional operations propagate timestamp.
+ if (sql)
+ {
+ await client.Sql.ExecuteAsync(null, "select 1");
+ }
+ else
+ {
+ await client.Transactions.BeginAsync();
+ }
+
+ Assert.AreEqual(123, server.LastClientObservableTimestamp);
+ }
+
private class CustomTx : ITransaction
{
public bool IsReadOnly => false;
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
index 08a3140ec2..5f2e64f4df 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
@@ -38,7 +38,7 @@ namespace Apache.Ignite.Internal
/// <summary>
/// Client socket wrapper with reconnect/failover functionality.
/// </summary>
- internal sealed class ClientFailoverSocket : IDisposable
+ internal sealed class ClientFailoverSocket : IDisposable,
IClientSocketEventListener
{
/** Current global endpoint index for Round-robin. */
private static long _globalEndPointIndex;
@@ -75,6 +75,9 @@ namespace Apache.Ignite.Internal
/** Local index for round-robin balancing within this FailoverSocket.
*/
private long _endPointIndex = Interlocked.Increment(ref
_globalEndPointIndex);
+ /** Observable timestamp. */
+ private long _observableTimestamp;
+
/// <summary>
/// Initializes a new instance of the <see
cref="ClientFailoverSocket"/> class.
/// </summary>
@@ -104,6 +107,11 @@ namespace Apache.Ignite.Internal
/// </summary>
public int PartitionAssignmentVersion =>
Interlocked.CompareExchange(ref _assignmentVersion, -1, -1);
+ /// <summary>
+ /// Gets the observable timestamp.
+ /// </summary>
+ public long ObservableTimestamp => Interlocked.Read(ref
_observableTimestamp);
+
/// <summary>
/// Connects the socket.
/// </summary>
@@ -245,6 +253,38 @@ namespace Apache.Ignite.Internal
return res;
}
+ /// <inheritdoc/>
+ void IClientSocketEventListener.OnAssignmentChanged(ClientSocket
clientSocket)
+ {
+ // NOTE: Multiple channels will send the same update to us,
resulting in multiple cache invalidations.
+ // This could be solved with a cluster-wide AssignmentVersion, but
we don't have that.
+ // So we only react to updates from the last known good channel.
When no user-initiated operations are performed on that
+ // channel, heartbeat messages will trigger updates.
+ if (clientSocket == _lastConnectedSocket)
+ {
+ Interlocked.Increment(ref _assignmentVersion);
+ }
+ }
+
+ /// <inheritdoc/>
+ void IClientSocketEventListener.OnObservableTimestampChanged(long
timestamp)
+ {
+ // Atomically update the observable timestamp to max(newTs, curTs).
+ while (true)
+ {
+ var current = Interlocked.Read(ref _observableTimestamp);
+ if (current >= timestamp)
+ {
+ return;
+ }
+
+ if (Interlocked.CompareExchange(ref _observableTimestamp,
timestamp, current) == current)
+ {
+ return;
+ }
+ }
+ }
+
/// <summary>
/// Gets a socket. Reconnects if necessary.
/// </summary>
@@ -412,7 +452,7 @@ namespace Apache.Ignite.Internal
try
{
- var socket = await ClientSocket.ConnectAsync(endpoint,
Configuration, OnAssignmentChanged).ConfigureAwait(false);
+ var socket = await ClientSocket.ConnectAsync(endpoint,
Configuration, this).ConfigureAwait(false);
if (_clusterId == null)
{
@@ -440,22 +480,6 @@ namespace Apache.Ignite.Internal
}
}
- /// <summary>
- /// Called when an assignment update is detected.
- /// </summary>
- /// <param name="clientSocket">Socket.</param>
- private void OnAssignmentChanged(ClientSocket clientSocket)
- {
- // NOTE: Multiple channels will send the same update to us,
resulting in multiple cache invalidations.
- // This could be solved with a cluster-wide AssignmentVersion, but
we don't have that.
- // So we only react to updates from the last known good channel.
When no user-initiated operations are performed on that
- // channel, heartbeat messages will trigger updates.
- if (clientSocket == _lastConnectedSocket)
- {
- Interlocked.Increment(ref _assignmentVersion);
- }
- }
-
/// <summary>
/// Gets the endpoints: all combinations of IP addresses and ports
according to configuration.
/// </summary>
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
index d61dd176e0..2c5039eb8c 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
@@ -92,8 +92,8 @@ namespace Apache.Ignite.Internal
/** Logger. */
private readonly IIgniteLogger? _logger;
- /** Partition assignment change callback. */
- private readonly Action<ClientSocket> _assignmentChangeCallback;
+ /** Event listener. */
+ private readonly IClientSocketEventListener _listener;
/** Pre-allocated buffer for message size + op code + request id. To
be used under <see cref="_sendLock"/>. */
private readonly byte[] _prefixBuffer = new
byte[ProtoCommon.MessagePrefixSize];
@@ -110,18 +110,18 @@ namespace Apache.Ignite.Internal
/// <param name="stream">Network stream.</param>
/// <param name="configuration">Configuration.</param>
/// <param name="connectionContext">Connection context.</param>
- /// <param name="assignmentChangeCallback">Partition assignment change
callback.</param>
+ /// <param name="listener">Event listener.</param>
/// <param name="logger">Logger.</param>
private ClientSocket(
Stream stream,
IgniteClientConfiguration configuration,
ConnectionContext connectionContext,
- Action<ClientSocket> assignmentChangeCallback,
+ IClientSocketEventListener listener,
IIgniteLogger? logger)
{
_stream = stream;
ConnectionContext = connectionContext;
- _assignmentChangeCallback = assignmentChangeCallback;
+ _listener = listener;
_logger = logger;
_socketTimeout = configuration.SocketTimeout;
@@ -154,7 +154,7 @@ namespace Apache.Ignite.Internal
/// </summary>
/// <param name="endPoint">Specific endpoint to connect to.</param>
/// <param name="configuration">Configuration.</param>
- /// <param name="assignmentChangeCallback">Partition assignment change
callback.</param>
+ /// <param name="listener">Event listener.</param>
/// <returns>A <see cref="Task{TResult}"/> representing the result of
the asynchronous operation.</returns>
[SuppressMessage(
"Microsoft.Reliability",
@@ -163,7 +163,7 @@ namespace Apache.Ignite.Internal
public static async Task<ClientSocket> ConnectAsync(
SocketEndpoint endPoint,
IgniteClientConfiguration configuration,
- Action<ClientSocket> assignmentChangeCallback)
+ IClientSocketEventListener listener)
{
var socket = new Socket(SocketType.Stream, ProtocolType.Tcp)
{
@@ -209,7 +209,7 @@ namespace Apache.Ignite.Internal
logger.Debug($"Handshake succeeded
[remoteAddress={socket.RemoteEndPoint}]: {context}.");
}
- return new ClientSocket(stream, configuration, context,
assignmentChangeCallback, logger);
+ return new ClientSocket(stream, configuration, context,
listener, logger);
}
catch (Exception e)
{
@@ -697,11 +697,11 @@ namespace Apache.Ignite.Internal
$"Partition assignment change notification received
[remoteAddress={ConnectionContext.ClusterNode.Address}]");
}
- _assignmentChangeCallback(this);
+ _listener.OnAssignmentChanged(this);
}
- // TODO IGNITE-20056 .NET: Thin 3.0: Track observable timestamp
- _ = reader.ReadInt64();
+ var observableTimestamp = reader.ReadInt64();
+ _listener.OnObservableTimestampChanged(observableTimestamp);
var exception = ReadError(ref reader);
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/IClientSocketEventListener.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/IClientSocketEventListener.cs
new file mode 100644
index 0000000000..469afbfd2d
--- /dev/null
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/IClientSocketEventListener.cs
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Internal;
+
+/// <summary>
+/// <see cref="ClientSocket"/> event listener.
+/// </summary>
+internal interface IClientSocketEventListener
+{
+ /// <summary>
+ /// Called when partition assignment changes.
+ /// </summary>
+ /// <param name="clientSocket">Source socket.</param>
+ void OnAssignmentChanged(ClientSocket clientSocket);
+
+ /// <summary>
+ /// Called when observable timestamp changes.
+ /// </summary>
+ /// <param name="timestamp">Timestamp.</param>
+ void OnObservableTimestampChanged(long timestamp);
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs
index bc7a569d28..c7f544613b 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs
@@ -177,8 +177,7 @@ namespace Apache.Ignite.Internal.Sql
w.Write(statement.Query);
w.WriteObjectCollectionAsBinaryTuple(args);
- // TODO IGNITE-20056 .NET: Thin 3.0: Track observable timestamp
- w.Write(0);
+ w.Write(_socket.ObservableTimestamp);
return writer;
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Transactions/Transactions.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Transactions/Transactions.cs
index 31129c944c..bf4698341c 100644
---
a/modules/platforms/dotnet/Apache.Ignite/Internal/Transactions/Transactions.cs
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/Transactions/Transactions.cs
@@ -59,9 +59,7 @@ namespace Apache.Ignite.Internal.Transactions
{
var w = writer.MessageWriter;
w.Write(options.ReadOnly);
-
- // TODO IGNITE-20056 .NET: Thin 3.0: Track observable timestamp
- w.Write(0);
+ w.Write(_socket.ObservableTimestamp);
}
}