This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new cb94b212762 IGNITE-27592 .NET: Support COMPUTE_OBSERVABLE_TS feature
(#7629)
cb94b212762 is described below
commit cb94b21276288bd6a7be56406795b2f7c2e5cb42
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Thu Feb 19 12:57:18 2026 +0200
IGNITE-27592 .NET: Support COMPUTE_OBSERVABLE_TS feature (#7629)
Propagate observableTs to compute jobs when supported by the server.
---
.../Apache.Ignite.Tests/Compute/ComputeTests.cs | 35 ++++-
.../dotnet/Apache.Ignite/Internal/ClientSocket.cs | 3 +-
.../Apache.Ignite/Internal/Compute/Compute.cs | 163 ++++++++++-----------
.../Internal/Compute/ComputePacker.cs | 8 +-
.../Internal/Proto/ProtocolBitmaskFeature.cs | 5 +
5 files changed, 121 insertions(+), 93 deletions(-)
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
index 7e018f422b5..10ee0717e53 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
@@ -191,7 +191,7 @@ namespace Apache.Ignite.Tests.Compute
}
[Test]
- public async Task TestAllSupportedArgTypes()
+ public async Task TestAllSupportedArgTypes([Values(true, false)] bool
colocated)
{
await Test(sbyte.MinValue);
await Test(sbyte.MaxValue);
@@ -224,7 +224,22 @@ namespace Apache.Ignite.Tests.Compute
await Test(new Guid(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
11, 12, 13, 14, 15, 16 }));
await Test(Guid.NewGuid());
+ await Test(new IgniteTuple { ["foo"] = "bar", ["baz"] = 42 },
"TupleImpl [FOO=bar, BAZ=42]");
+
async Task Test(object val, string? expectedStr = null)
+ {
+ if (colocated)
+ {
+ await Test0(val, expectedStr,
JobTarget.Colocated(TableName, 1L));
+ }
+ else
+ {
+ await Test0(val, expectedStr, JobTarget.AnyNode(await
Client.GetClusterNodesAsync()));
+ }
+ }
+
+ async Task Test0<TTarget>(object val, string? expectedStr,
IJobTarget<TTarget> target)
+ where TTarget : notnull
{
var nodes = JobTarget.AnyNode(await
Client.GetClusterNodesAsync());
@@ -994,7 +1009,7 @@ namespace Apache.Ignite.Tests.Compute
}
[Test]
- public async Task TestCustomMarshaller()
+ public async Task TestCustomMarshaller([Values(true, false)] bool
colocated)
{
var job = new JobDescriptor<Nested, Nested>(PlatformTestNodeRunner
+ "$ToStringMarshallerJob")
{
@@ -1004,16 +1019,22 @@ namespace Apache.Ignite.Tests.Compute
var arg = new Nested(Guid.NewGuid(), 1.234m);
- var exec = await Client.Compute.SubmitAsync(await GetNodeAsync(1),
job, arg);
- Nested res = await exec.GetResultAsync();
-
- var nullExec = await Client.Compute.SubmitAsync(await
GetNodeAsync(1), job, null!);
- Nested nullRes = await nullExec.GetResultAsync();
+ Nested res = await ExecJob(arg);
+ Nested nullRes = await ExecJob(null);
Assert.AreEqual(arg.Id, res.Id);
Assert.AreEqual(arg.Price + 1, res.Price);
Assert.IsNull(nullRes);
+
+ async Task<Nested> ExecJob(Nested? arg0)
+ {
+ var jobExec = colocated
+ ? await
Client.Compute.SubmitAsync(JobTarget.Colocated(TableName, 1L), job, arg0!)
+ : await Client.Compute.SubmitAsync(await GetNodeAsync(1),
job, arg0!);
+
+ return await jobExec.GetResultAsync();
+ }
}
[Test]
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
index 4c4484f0667..06ceed22d0d 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
@@ -53,7 +53,8 @@ namespace Apache.Ignite.Internal
ProtocolBitmaskFeature.PlatformComputeExecutor |
ProtocolBitmaskFeature.StreamerReceiverExecutionOptions |
ProtocolBitmaskFeature.SqlPartitionAwareness |
- ProtocolBitmaskFeature.SqlPartitionAwarenessTableName;
+ ProtocolBitmaskFeature.SqlPartitionAwarenessTableName |
+ ProtocolBitmaskFeature.ComputeObservableTs;
/** Features as a byte array */
private static readonly byte[] FeatureBytes = Features.ToBytes();
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
index 863e0d57690..f8553fc4b6c 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
@@ -130,23 +130,27 @@ namespace Apache.Ignite.Internal.Compute
IgniteArgumentCheck.NotNull(taskDescriptor.TaskClassName);
using var writer = ProtoCommon.GetMessageWriter();
- Write();
- using PooledBuffer res = await _socket.DoOutInOpAsync(
- ClientOp.ComputeExecuteMapReduce, writer,
expectNotifications: true, cancellationToken: cancellationToken)
- .ConfigureAwait(false);
+ using var res = await _socket.DoOutInOpAndGetSocketAsync(
+ ClientOp.ComputeExecuteMapReduce,
+ tx: null,
+ arg: (writer, taskDescriptor, arg, _socket, cancellationToken),
+ requestWriter: static (socket, args) =>
+ {
+ args.writer.Reset();
+ var w = args.writer.MessageWriter;
- return GetTaskExecution<TResult>(res, cancellationToken);
+ WriteUnits(args.taskDescriptor.DeploymentUnits,
args.writer);
+ w.Write(args.taskDescriptor.TaskClassName);
- void Write()
- {
- var w = writer.MessageWriter;
+ ComputePacker.PackArgOrResult(ref w, args.arg, null,
GetObservableTimestamp(socket, args._socket));
- WriteUnits(taskDescriptor.DeploymentUnits, writer);
- w.Write(taskDescriptor.TaskClassName);
+ return args.writer;
+ },
+ expectNotifications: true,
+ cancellationToken: cancellationToken).ConfigureAwait(false);
- ComputePacker.PackArgOrResult(ref w, arg, null);
- }
+ return GetTaskExecution<TResult>(res.Buffer, cancellationToken);
}
/// <inheritdoc/>
@@ -285,6 +289,16 @@ namespace Apache.Ignite.Internal.Compute
private static bool CanWriteJobExecType(ClientSocket socket) =>
socket.ConnectionContext.ServerHasFeature(ProtocolBitmaskFeature.PlatformComputeJob);
+ private static long? GetObservableTimestamp(ClientSocket socket,
ClientFailoverSocket failoverSocket)
+ {
+ if
(!socket.ConnectionContext.ServerHasFeature(ProtocolBitmaskFeature.ComputeObservableTs))
+ {
+ return null;
+ }
+
+ return failoverSocket.ObservableTimestamp;
+ }
+
private static JobState ReadJobState(MsgPackReader reader)
{
var id = reader.ReadGuid();
@@ -443,36 +457,31 @@ namespace Apache.Ignite.Internal.Compute
TArg arg,
CancellationToken cancellationToken)
{
- IClusterNode node = GetRandomNode(nodes);
-
- using var buf = await _socket.DoWithRetryAsync(
- (nodes, jobDescriptor, arg, cancellationToken),
- static (_, _) => ClientOp.ComputeExecute,
- async static (socket, args) =>
- {
- using var writer = ProtoCommon.GetMessageWriter();
- Write(writer, args, CanWriteJobExecType(socket));
-
- return await socket.DoOutInOpAsync(
- ClientOp.ComputeExecute, writer,
expectNotifications: true, cancellationToken: args.cancellationToken)
- .ConfigureAwait(false);
- },
- PreferredNode.FromName(node.Name))
- .ConfigureAwait(false);
+ using var writer = ProtoCommon.GetMessageWriter();
- return GetJobExecution(buf, readSchema: false,
jobDescriptor.ResultMarshaller, cancellationToken);
+ using var res = await _socket.DoOutInOpAndGetSocketAsync(
+ ClientOp.ComputeExecute,
+ tx: null,
+ arg: (writer, nodes, jobDescriptor, arg, _socket,
cancellationToken),
+ requestWriter: static (socket, args) =>
+ {
+ args.writer.Reset();
+ var w = args.writer.MessageWriter;
- static void Write(
- PooledArrayBuffer writer,
- (ICollection<IClusterNode> Nodes, JobDescriptor<TArg, TResult>
Desc, TArg Arg, CancellationToken Ct) args,
- bool canWriteJobExecType)
- {
- WriteNodeNames(writer, args.Nodes);
- WriteJob(writer, args.Desc, canWriteJobExecType);
+ WriteNodeNames(args.writer, args.nodes);
+ WriteJob(args.writer, args.jobDescriptor,
CanWriteJobExecType(socket));
- var w = writer.MessageWriter;
- ComputePacker.PackArgOrResult(ref w, args.Arg,
args.Desc.ArgMarshaller);
- }
+ ComputePacker.PackArgOrResult(
+ ref w, args.arg, args.jobDescriptor.ArgMarshaller,
GetObservableTimestamp(socket, args._socket));
+
+ return args.writer;
+ },
+ PreferredNode.FromName(GetRandomNode(nodes).Name),
+ expectNotifications: true,
+ cancellationToken: cancellationToken)
+ .ConfigureAwait(false);
+
+ return GetJobExecution(res.Buffer, readSchema: false,
jobDescriptor.ResultMarshaller, cancellationToken);
}
private async Task<Table> GetTableAsync(QualifiedName tableName)
@@ -514,33 +523,37 @@ namespace Apache.Ignite.Internal.Compute
try
{
- // Write the job executor type optimistically, compute
hash.
- using var bufferWriter = ProtoCommon.GetMessageWriter();
- var colocationHash = Write(bufferWriter, table, schema,
key, serializerHandlerFunc, descriptor, arg, true);
+ var serializerHandler = serializerHandlerFunc(table);
+ int colocationHash = ComputeKeyHash(serializerHandler,
schema, key);
var preferredNode = await
table.GetPreferredNode(colocationHash, null).ConfigureAwait(false);
+ using var writer = ProtoCommon.GetMessageWriter();
- using var resBuf = await _socket.DoWithRetryAsync(
- (table, schema, key, serializerHandlerFunc,
descriptor, arg, bufferWriter, cancellationToken),
- static (_, _) => ClientOp.ComputeExecuteColocated,
- async static (socket, args) =>
- {
- if (CanWriteJobExecType(socket))
- {
- return await
socket.DoOutInOpAsync(ClientOp.ComputeExecuteColocated, args.bufferWriter,
expectNotifications: true, cancellationToken: args.cancellationToken)
- .ConfigureAwait(false);
- }
-
- // Rewrite the message without a job executor
type.
- using var writer =
ProtoCommon.GetMessageWriter();
- Write(writer, args.table, args.schema,
args.key, args.serializerHandlerFunc, args.descriptor, args.arg, false);
-
- return await
socket.DoOutInOpAsync(ClientOp.ComputeExecuteColocated, writer,
expectNotifications: true, cancellationToken: args.cancellationToken)
- .ConfigureAwait(false);
- },
- preferredNode)
- .ConfigureAwait(false);
+ using var res = await _socket.DoOutInOpAndGetSocketAsync(
+ ClientOp.ComputeExecuteColocated,
+ tx: null,
+ arg: (writer, table, schema, key, serializerHandler,
descriptor, arg, _socket, cancellationToken),
+ requestWriter: static (socket, args) =>
+ {
+ args.writer.Reset();
+ var w = args.writer.MessageWriter;
+
+ w.Write(args.table.Id);
+ w.Write(args.schema.Version);
+
+ args.serializerHandler.Write(ref w, args.schema,
args.key, keyOnly: true, computeHash: false);
+
+ WriteJob(args.writer, args.descriptor,
CanWriteJobExecType(socket));
- return GetJobExecution(resBuf, readSchema: true,
marshaller: descriptor.ResultMarshaller, cancellationToken);
+ ComputePacker.PackArgOrResult(
+ ref w, args.arg,
args.descriptor.ArgMarshaller, GetObservableTimestamp(socket, args._socket));
+
+ return args.writer;
+ },
+ preferredNode: preferredNode,
+ expectNotifications: true,
+ cancellationToken:
cancellationToken).ConfigureAwait(false);
+
+ return GetJobExecution(res.Buffer, readSchema: true,
marshaller: descriptor.ResultMarshaller, cancellationToken);
}
catch (IgniteException e) when (e.Code ==
ErrorGroups.Client.TableIdNotFound)
{
@@ -561,29 +574,11 @@ namespace Apache.Ignite.Internal.Compute
}
}
- static int Write(
- PooledArrayBuffer bufferWriter,
- Table table,
- Schema schema,
- TKey key,
- Func<Table, IRecordSerializerHandler<TKey>>
serializerHandlerFunc,
- JobDescriptor<TArg, TResult> descriptor,
- TArg arg,
- bool canWriteJobExecType)
+ static int ComputeKeyHash(IRecordSerializerHandler<TKey>
serializerHandler, Schema schema, TKey key)
{
+ using var bufferWriter = ProtoCommon.GetMessageWriter();
var w = bufferWriter.MessageWriter;
-
- w.Write(table.Id);
- w.Write(schema.Version);
-
- var serializerHandler = serializerHandlerFunc(table);
- var colocationHash = serializerHandler.Write(ref w, schema,
key, keyOnly: true, computeHash: true);
-
- WriteJob(bufferWriter, descriptor, canWriteJobExecType);
-
- w.WriteObjectAsBinaryTuple(arg);
-
- return colocationHash;
+ return serializerHandler.Write(ref w, schema, key, keyOnly:
true, computeHash: true);
}
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/ComputePacker.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/ComputePacker.cs
index 0198924c163..7d5fbb54d0f 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/ComputePacker.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/ComputePacker.cs
@@ -50,9 +50,15 @@ internal static class ComputePacker
/// <param name="w">Packer.</param>
/// <param name="obj">Arg.</param>
/// <param name="marshaller">Marshaller.</param>
+ /// <param name="observableTimestamp">Observable timestamp. Not packed
when null.</param>
/// <typeparam name="T">Arg type.</typeparam>
- internal static void PackArgOrResult<T>(ref MsgPackWriter w, T obj,
IMarshaller<T>? marshaller)
+ internal static void PackArgOrResult<T>(ref MsgPackWriter w, T obj,
IMarshaller<T>? marshaller, long? observableTimestamp = null)
{
+ if (observableTimestamp != null)
+ {
+ w.Write(observableTimestamp.Value);
+ }
+
if (obj == null)
{
w.WriteNil();
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ProtocolBitmaskFeature.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ProtocolBitmaskFeature.cs
index e75d6b81100..e516ec318c5 100644
---
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ProtocolBitmaskFeature.cs
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ProtocolBitmaskFeature.cs
@@ -60,6 +60,11 @@ internal enum ProtocolBitmaskFeature
/// </summary>
SqlPartitionAwareness = 1 << 9,
+ /// <summary>
+ /// Compute tasks and jobs accept observable timestamp from the client.
+ /// </summary>
+ ComputeObservableTs = 1 << 14,
+
/// <summary>
/// Partition awareness for SQL requests with table name in metadata.
/// </summary>