This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new cb94b212762 IGNITE-27592 .NET: Support COMPUTE_OBSERVABLE_TS feature 
(#7629)
cb94b212762 is described below

commit cb94b21276288bd6a7be56406795b2f7c2e5cb42
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Thu Feb 19 12:57:18 2026 +0200

    IGNITE-27592 .NET: Support COMPUTE_OBSERVABLE_TS feature (#7629)
    
    Propagate observableTs to compute jobs when supported by the server.
---
 .../Apache.Ignite.Tests/Compute/ComputeTests.cs    |  35 ++++-
 .../dotnet/Apache.Ignite/Internal/ClientSocket.cs  |   3 +-
 .../Apache.Ignite/Internal/Compute/Compute.cs      | 163 ++++++++++-----------
 .../Internal/Compute/ComputePacker.cs              |   8 +-
 .../Internal/Proto/ProtocolBitmaskFeature.cs       |   5 +
 5 files changed, 121 insertions(+), 93 deletions(-)

diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
index 7e018f422b5..10ee0717e53 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
@@ -191,7 +191,7 @@ namespace Apache.Ignite.Tests.Compute
         }
 
         [Test]
-        public async Task TestAllSupportedArgTypes()
+        public async Task TestAllSupportedArgTypes([Values(true, false)] bool 
colocated)
         {
             await Test(sbyte.MinValue);
             await Test(sbyte.MaxValue);
@@ -224,7 +224,22 @@ namespace Apache.Ignite.Tests.Compute
             await Test(new Guid(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 
11, 12, 13, 14, 15, 16 }));
             await Test(Guid.NewGuid());
 
+            await Test(new IgniteTuple { ["foo"] = "bar", ["baz"] = 42 }, 
"TupleImpl [FOO=bar, BAZ=42]");
+
             async Task Test(object val, string? expectedStr = null)
+            {
+                if (colocated)
+                {
+                    await Test0(val, expectedStr, 
JobTarget.Colocated(TableName, 1L));
+                }
+                else
+                {
+                    await Test0(val, expectedStr, JobTarget.AnyNode(await 
Client.GetClusterNodesAsync()));
+                }
+            }
+
+            async Task Test0<TTarget>(object val, string? expectedStr, 
IJobTarget<TTarget> target)
+                where TTarget : notnull
             {
                 var nodes = JobTarget.AnyNode(await 
Client.GetClusterNodesAsync());
 
@@ -994,7 +1009,7 @@ namespace Apache.Ignite.Tests.Compute
         }
 
         [Test]
-        public async Task TestCustomMarshaller()
+        public async Task TestCustomMarshaller([Values(true, false)] bool 
colocated)
         {
             var job = new JobDescriptor<Nested, Nested>(PlatformTestNodeRunner 
+ "$ToStringMarshallerJob")
             {
@@ -1004,16 +1019,22 @@ namespace Apache.Ignite.Tests.Compute
 
             var arg = new Nested(Guid.NewGuid(), 1.234m);
 
-            var exec = await Client.Compute.SubmitAsync(await GetNodeAsync(1), 
job, arg);
-            Nested res = await exec.GetResultAsync();
-
-            var nullExec = await Client.Compute.SubmitAsync(await 
GetNodeAsync(1), job, null!);
-            Nested nullRes = await nullExec.GetResultAsync();
+            Nested res = await ExecJob(arg);
+            Nested nullRes = await ExecJob(null);
 
             Assert.AreEqual(arg.Id, res.Id);
             Assert.AreEqual(arg.Price + 1, res.Price);
 
             Assert.IsNull(nullRes);
+
+            async Task<Nested> ExecJob(Nested? arg0)
+            {
+                var jobExec = colocated
+                    ? await 
Client.Compute.SubmitAsync(JobTarget.Colocated(TableName, 1L), job, arg0!)
+                    : await Client.Compute.SubmitAsync(await GetNodeAsync(1), 
job, arg0!);
+
+                return await jobExec.GetResultAsync();
+            }
         }
 
         [Test]
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
index 4c4484f0667..06ceed22d0d 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
@@ -53,7 +53,8 @@ namespace Apache.Ignite.Internal
             ProtocolBitmaskFeature.PlatformComputeExecutor |
             ProtocolBitmaskFeature.StreamerReceiverExecutionOptions |
             ProtocolBitmaskFeature.SqlPartitionAwareness |
-            ProtocolBitmaskFeature.SqlPartitionAwarenessTableName;
+            ProtocolBitmaskFeature.SqlPartitionAwarenessTableName |
+            ProtocolBitmaskFeature.ComputeObservableTs;
 
         /** Features as a byte array */
         private static readonly byte[] FeatureBytes = Features.ToBytes();
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
index 863e0d57690..f8553fc4b6c 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
@@ -130,23 +130,27 @@ namespace Apache.Ignite.Internal.Compute
             IgniteArgumentCheck.NotNull(taskDescriptor.TaskClassName);
 
             using var writer = ProtoCommon.GetMessageWriter();
-            Write();
 
-            using PooledBuffer res = await _socket.DoOutInOpAsync(
-                    ClientOp.ComputeExecuteMapReduce, writer, 
expectNotifications: true, cancellationToken: cancellationToken)
-                .ConfigureAwait(false);
+            using var res = await _socket.DoOutInOpAndGetSocketAsync(
+                ClientOp.ComputeExecuteMapReduce,
+                tx: null,
+                arg: (writer, taskDescriptor, arg, _socket, cancellationToken),
+                requestWriter: static (socket, args) =>
+                {
+                    args.writer.Reset();
+                    var w = args.writer.MessageWriter;
 
-            return GetTaskExecution<TResult>(res, cancellationToken);
+                    WriteUnits(args.taskDescriptor.DeploymentUnits, 
args.writer);
+                    w.Write(args.taskDescriptor.TaskClassName);
 
-            void Write()
-            {
-                var w = writer.MessageWriter;
+                    ComputePacker.PackArgOrResult(ref w, args.arg, null, 
GetObservableTimestamp(socket, args._socket));
 
-                WriteUnits(taskDescriptor.DeploymentUnits, writer);
-                w.Write(taskDescriptor.TaskClassName);
+                    return args.writer;
+                },
+                expectNotifications: true,
+                cancellationToken: cancellationToken).ConfigureAwait(false);
 
-                ComputePacker.PackArgOrResult(ref w, arg, null);
-            }
+            return GetTaskExecution<TResult>(res.Buffer, cancellationToken);
         }
 
         /// <inheritdoc/>
@@ -285,6 +289,16 @@ namespace Apache.Ignite.Internal.Compute
         private static bool CanWriteJobExecType(ClientSocket socket) =>
             
socket.ConnectionContext.ServerHasFeature(ProtocolBitmaskFeature.PlatformComputeJob);
 
+        private static long? GetObservableTimestamp(ClientSocket socket, 
ClientFailoverSocket failoverSocket)
+        {
+            if 
(!socket.ConnectionContext.ServerHasFeature(ProtocolBitmaskFeature.ComputeObservableTs))
+            {
+                return null;
+            }
+
+            return failoverSocket.ObservableTimestamp;
+        }
+
         private static JobState ReadJobState(MsgPackReader reader)
         {
             var id = reader.ReadGuid();
@@ -443,36 +457,31 @@ namespace Apache.Ignite.Internal.Compute
             TArg arg,
             CancellationToken cancellationToken)
         {
-            IClusterNode node = GetRandomNode(nodes);
-
-            using var buf = await _socket.DoWithRetryAsync(
-                    (nodes, jobDescriptor, arg, cancellationToken),
-                    static (_, _) => ClientOp.ComputeExecute,
-                    async static (socket, args) =>
-                    {
-                        using var writer = ProtoCommon.GetMessageWriter();
-                        Write(writer, args, CanWriteJobExecType(socket));
-
-                        return await socket.DoOutInOpAsync(
-                            ClientOp.ComputeExecute, writer, 
expectNotifications: true, cancellationToken: args.cancellationToken)
-                            .ConfigureAwait(false);
-                    },
-                    PreferredNode.FromName(node.Name))
-                .ConfigureAwait(false);
+            using var writer = ProtoCommon.GetMessageWriter();
 
-            return GetJobExecution(buf, readSchema: false, 
jobDescriptor.ResultMarshaller, cancellationToken);
+            using var res = await _socket.DoOutInOpAndGetSocketAsync(
+                ClientOp.ComputeExecute,
+                tx: null,
+                arg: (writer, nodes, jobDescriptor, arg, _socket, 
cancellationToken),
+                requestWriter: static (socket, args) =>
+                {
+                    args.writer.Reset();
+                    var w = args.writer.MessageWriter;
 
-            static void Write(
-                PooledArrayBuffer writer,
-                (ICollection<IClusterNode> Nodes, JobDescriptor<TArg, TResult> 
Desc, TArg Arg, CancellationToken Ct) args,
-                bool canWriteJobExecType)
-            {
-                WriteNodeNames(writer, args.Nodes);
-                WriteJob(writer, args.Desc, canWriteJobExecType);
+                    WriteNodeNames(args.writer, args.nodes);
+                    WriteJob(args.writer, args.jobDescriptor, 
CanWriteJobExecType(socket));
 
-                var w = writer.MessageWriter;
-                ComputePacker.PackArgOrResult(ref w, args.Arg, 
args.Desc.ArgMarshaller);
-            }
+                    ComputePacker.PackArgOrResult(
+                        ref w, args.arg, args.jobDescriptor.ArgMarshaller, 
GetObservableTimestamp(socket, args._socket));
+
+                    return args.writer;
+                },
+                PreferredNode.FromName(GetRandomNode(nodes).Name),
+                expectNotifications: true,
+                cancellationToken: cancellationToken)
+                .ConfigureAwait(false);
+
+            return GetJobExecution(res.Buffer, readSchema: false, 
jobDescriptor.ResultMarshaller, cancellationToken);
         }
 
         private async Task<Table> GetTableAsync(QualifiedName tableName)
@@ -514,33 +523,37 @@ namespace Apache.Ignite.Internal.Compute
 
                 try
                 {
-                    // Write the job executor type optimistically, compute 
hash.
-                    using var bufferWriter = ProtoCommon.GetMessageWriter();
-                    var colocationHash = Write(bufferWriter, table, schema, 
key, serializerHandlerFunc, descriptor, arg, true);
+                    var serializerHandler = serializerHandlerFunc(table);
+                    int colocationHash = ComputeKeyHash(serializerHandler, 
schema, key);
                     var preferredNode = await 
table.GetPreferredNode(colocationHash, null).ConfigureAwait(false);
+                    using var writer = ProtoCommon.GetMessageWriter();
 
-                    using var resBuf = await _socket.DoWithRetryAsync(
-                            (table, schema, key, serializerHandlerFunc, 
descriptor, arg, bufferWriter, cancellationToken),
-                            static (_, _) => ClientOp.ComputeExecuteColocated,
-                            async static (socket, args) =>
-                            {
-                                if (CanWriteJobExecType(socket))
-                                {
-                                    return await 
socket.DoOutInOpAsync(ClientOp.ComputeExecuteColocated, args.bufferWriter, 
expectNotifications: true, cancellationToken: args.cancellationToken)
-                                        .ConfigureAwait(false);
-                                }
-
-                                // Rewrite the message without a job executor 
type.
-                                using var writer = 
ProtoCommon.GetMessageWriter();
-                                Write(writer, args.table, args.schema, 
args.key, args.serializerHandlerFunc, args.descriptor, args.arg, false);
-
-                                return await 
socket.DoOutInOpAsync(ClientOp.ComputeExecuteColocated, writer, 
expectNotifications: true, cancellationToken: args.cancellationToken)
-                                    .ConfigureAwait(false);
-                            },
-                            preferredNode)
-                        .ConfigureAwait(false);
+                    using var res = await _socket.DoOutInOpAndGetSocketAsync(
+                        ClientOp.ComputeExecuteColocated,
+                        tx: null,
+                        arg: (writer, table, schema, key, serializerHandler, 
descriptor, arg, _socket, cancellationToken),
+                        requestWriter: static (socket, args) =>
+                        {
+                            args.writer.Reset();
+                            var w = args.writer.MessageWriter;
+
+                            w.Write(args.table.Id);
+                            w.Write(args.schema.Version);
+
+                            args.serializerHandler.Write(ref w, args.schema, 
args.key, keyOnly: true, computeHash: false);
+
+                            WriteJob(args.writer, args.descriptor, 
CanWriteJobExecType(socket));
 
-                    return GetJobExecution(resBuf, readSchema: true, 
marshaller: descriptor.ResultMarshaller, cancellationToken);
+                            ComputePacker.PackArgOrResult(
+                                ref w, args.arg, 
args.descriptor.ArgMarshaller, GetObservableTimestamp(socket, args._socket));
+
+                            return args.writer;
+                        },
+                        preferredNode: preferredNode,
+                        expectNotifications: true,
+                        cancellationToken: 
cancellationToken).ConfigureAwait(false);
+
+                    return GetJobExecution(res.Buffer, readSchema: true, 
marshaller: descriptor.ResultMarshaller, cancellationToken);
                 }
                 catch (IgniteException e) when (e.Code == 
ErrorGroups.Client.TableIdNotFound)
                 {
@@ -561,29 +574,11 @@ namespace Apache.Ignite.Internal.Compute
                 }
             }
 
-            static int Write(
-                PooledArrayBuffer bufferWriter,
-                Table table,
-                Schema schema,
-                TKey key,
-                Func<Table, IRecordSerializerHandler<TKey>> 
serializerHandlerFunc,
-                JobDescriptor<TArg, TResult> descriptor,
-                TArg arg,
-                bool canWriteJobExecType)
+            static int ComputeKeyHash(IRecordSerializerHandler<TKey> 
serializerHandler, Schema schema, TKey key)
             {
+                using var bufferWriter = ProtoCommon.GetMessageWriter();
                 var w = bufferWriter.MessageWriter;
-
-                w.Write(table.Id);
-                w.Write(schema.Version);
-
-                var serializerHandler = serializerHandlerFunc(table);
-                var colocationHash = serializerHandler.Write(ref w, schema, 
key, keyOnly: true, computeHash: true);
-
-                WriteJob(bufferWriter, descriptor, canWriteJobExecType);
-
-                w.WriteObjectAsBinaryTuple(arg);
-
-                return colocationHash;
+                return serializerHandler.Write(ref w, schema, key, keyOnly: 
true, computeHash: true);
             }
         }
 
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/ComputePacker.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/ComputePacker.cs
index 0198924c163..7d5fbb54d0f 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/ComputePacker.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/ComputePacker.cs
@@ -50,9 +50,15 @@ internal static class ComputePacker
     /// <param name="w">Packer.</param>
     /// <param name="obj">Arg.</param>
     /// <param name="marshaller">Marshaller.</param>
+    /// <param name="observableTimestamp">Observable timestamp. Not packed 
when null.</param>
     /// <typeparam name="T">Arg type.</typeparam>
-    internal static void PackArgOrResult<T>(ref MsgPackWriter w, T obj, 
IMarshaller<T>? marshaller)
+    internal static void PackArgOrResult<T>(ref MsgPackWriter w, T obj, 
IMarshaller<T>? marshaller, long? observableTimestamp = null)
     {
+        if (observableTimestamp != null)
+        {
+            w.Write(observableTimestamp.Value);
+        }
+
         if (obj == null)
         {
             w.WriteNil();
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ProtocolBitmaskFeature.cs
 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ProtocolBitmaskFeature.cs
index e75d6b81100..e516ec318c5 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ProtocolBitmaskFeature.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ProtocolBitmaskFeature.cs
@@ -60,6 +60,11 @@ internal enum ProtocolBitmaskFeature
     /// </summary>
     SqlPartitionAwareness = 1 << 9,
 
+    /// <summary>
+    /// Compute tasks and jobs accept observable timestamp from the client.
+    /// </summary>
+    ComputeObservableTs = 1 << 14,
+
     /// <summary>
     /// Partition awareness for SQL requests with table name in metadata.
     /// </summary>

Reply via email to