This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 6989419b03  IGNITE-24197 .NET: Unify Сompute API (#5050)
6989419b03 is described below

commit 6989419b03f9c69c4e10568399a9c4ae21fc8037
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Thu Jan 16 15:45:26 2025 +0200

     IGNITE-24197 .NET: Unify Сompute API (#5050)
    
    Use Target and Execution objects in all Compute API methods.
    * Add `IBroadcastJobTarget`
    * Add `IBroadcastExecution`
    * Remove `CancelAsync` from `JobExecution` and `TaskExecution` (use 
`CancellationToken` for cancellation - to be added separately)
    
    Reflects Java API changes from IGNITE-24174 (#5016)
---
 .../Apache.Ignite.Tests/Compute/ComputeTests.cs    | 48 +++++++++---------
 .../dotnet/Apache.Ignite.Tests/FakeServer.cs       | 33 ++++++++----
 .../Proto/ColocationHashTests.cs                   |  2 +-
 .../dotnet/Apache.Ignite/ClientOperationType.cs    |  4 +-
 .../Apache.Ignite/Compute/BroadcastJobTarget.cs    | 58 ++++++++++++++++++++++
 .../Apache.Ignite/Compute/IBroadcastExecution.cs   | 32 ++++++++++++
 .../Apache.Ignite/Compute/IBroadcastJobTarget.cs   | 31 ++++++++++++
 .../dotnet/Apache.Ignite/Compute/ICompute.cs       | 18 +++----
 .../dotnet/Apache.Ignite/Compute/IJobExecution.cs  | 15 +++---
 .../dotnet/Apache.Ignite/Compute/ITaskExecution.cs |  9 ----
 .../Internal/Compute/BroadcastExecution.cs         | 28 +++++++++++
 .../Apache.Ignite/Internal/Compute/Compute.cs      | 47 ++++++++++++------
 .../Apache.Ignite/Internal/Compute/JobExecution.cs | 13 +++--
 .../Internal/Compute/TaskExecution.cs              |  4 --
 14 files changed, 255 insertions(+), 87 deletions(-)

diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
index 44586787ba..de2ab5d7f6 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
@@ -133,17 +133,15 @@ namespace Apache.Ignite.Tests.Compute
         {
             var node = (await GetNodeAsync(0)).Data;
 
-            IDictionary<IClusterNode, Task<IJobExecution<string>>> taskMap = 
Client.Compute.SubmitBroadcast(
-                new[] { node },
+            var broadcastExec = await Client.Compute.SubmitBroadcastAsync(
+                BroadcastJobTarget.Nodes(node),
                 NodeNameJob,
                 "123");
 
-            var res = await taskMap[node];
+            var jobExec = broadcastExec.JobExecutions.Single();
 
-            Assert.AreEqual(1, taskMap.Count);
-            Assert.AreSame(node, taskMap.Keys.Single());
-
-            Assert.AreEqual(PlatformTestNodeRunner + "123", await 
res.GetResultAsync());
+            Assert.AreEqual(node, jobExec.Node);
+            Assert.AreEqual(PlatformTestNodeRunner + "123", await 
jobExec.GetResultAsync());
         }
 
         [Test]
@@ -151,15 +149,17 @@ namespace Apache.Ignite.Tests.Compute
         {
             var nodes = await Client.GetClusterNodesAsync();
 
-            IDictionary<IClusterNode, Task<IJobExecution<string>>> taskMap = 
Client.Compute.SubmitBroadcast(
-                nodes,
+            IBroadcastExecution<string> broadcastExecution = await 
Client.Compute.SubmitBroadcastAsync(
+                BroadcastJobTarget.Nodes(nodes),
                 NodeNameJob,
                 "123");
 
-            var res1 = await taskMap[nodes[0]];
-            var res2 = await taskMap[nodes[1]];
-            var res3 = await taskMap[nodes[2]];
-            var res4 = await taskMap[nodes[3]];
+            var taskMap = broadcastExecution.JobExecutions.ToDictionary(x => 
x.Node);
+
+            IJobExecution<string> res1 = taskMap[nodes[0]];
+            IJobExecution<string> res2 = taskMap[nodes[1]];
+            IJobExecution<string> res3 = taskMap[nodes[2]];
+            IJobExecution<string> res4 = taskMap[nodes[3]];
 
             Assert.AreEqual(4, taskMap.Count);
 
@@ -212,10 +212,8 @@ namespace Apache.Ignite.Tests.Compute
         {
             var unknownNode = new ClusterNode(Guid.NewGuid(), "y", new 
IPEndPoint(IPAddress.Loopback, 0));
 
-            IDictionary<IClusterNode, Task<IJobExecution<object>>> taskMap =
-                Client.Compute.SubmitBroadcast(new[] { unknownNode }, EchoJob, 
"unused");
-
-            var ex = Assert.ThrowsAsync<NodeNotFoundException>(async () => 
await taskMap[unknownNode]);
+            var ex = Assert.ThrowsAsync<NodeNotFoundException>(
+                async () => await 
Client.Compute.SubmitBroadcastAsync(BroadcastJobTarget.Nodes(unknownNode), 
EchoJob, "unused"));
 
             StringAssert.Contains("None of the specified nodes are present in 
the cluster: [y]", ex!.Message);
             Assert.AreEqual(ErrorGroups.Compute.NodeNotFound, ex.Code);
@@ -633,7 +631,7 @@ namespace Apache.Ignite.Tests.Compute
         public async Task TestJobExecutionStatusNull()
         {
             var fakeJobExecution = new JobExecution<int>(
-                Guid.NewGuid(), Task.FromException<(int, JobState)>(new 
Exception("x")), (Compute)Client.Compute);
+                Guid.NewGuid(), Task.FromException<(int, JobState)>(new 
Exception("x")), (Compute)Client.Compute, null!);
 
             var status = await fakeJobExecution.GetStateAsync();
 
@@ -641,14 +639,15 @@ namespace Apache.Ignite.Tests.Compute
         }
 
         [Test]
+        [Ignore("IGNITE-23495")]
         public async Task TestJobExecutionCancel()
         {
             const int sleepMs = 5000;
             var beforeStart = GetCurrentInstant();
 
             var jobExecution = await Client.Compute.SubmitAsync(await 
GetNodeAsync(1), SleepJob, sleepMs);
-            await jobExecution.CancelAsync();
 
+            // await jobExecution.CancelAsync();
             await AssertJobStatus(jobExecution, JobStatus.Canceled, 
beforeStart);
         }
 
@@ -804,19 +803,22 @@ namespace Apache.Ignite.Tests.Compute
         }
 
         [Test]
+        [Ignore("IGNITE-23495")]
         public async Task TestCancelCompletedTask()
         {
             var taskExec = await 
Client.Compute.SubmitMapReduceAsync(NodeNameTask, "arg");
 
             await taskExec.GetResultAsync();
-            var cancelRes = await taskExec.CancelAsync();
+
+            // var cancelRes = await taskExec.CancelAsync();
             var state = await taskExec.GetStateAsync();
 
-            Assert.IsFalse(cancelRes);
+            // Assert.IsFalse(cancelRes);
             Assert.AreEqual(TaskStatus.Completed, state!.Status);
         }
 
         [Test]
+        [Ignore("IGNITE-23495")]
         public async Task TestCancelExecutingTask()
         {
             var taskExec = await 
Client.Compute.SubmitMapReduceAsync(SleepTask, 3000);
@@ -824,10 +826,10 @@ namespace Apache.Ignite.Tests.Compute
             var state1 = await taskExec.GetStateAsync();
             Assert.AreEqual(TaskStatus.Executing, state1!.Status);
 
-            var cancelRes = await taskExec.CancelAsync();
+            // var cancelRes = await taskExec.CancelAsync();
             var state2 = await taskExec.GetStateAsync();
 
-            Assert.IsTrue(cancelRes);
+            // Assert.IsTrue(cancelRes);
             Assert.AreEqual(TaskStatus.Failed, state2!.Status);
 
             var ex = Assert.ThrowsAsync<ComputeException>(async () => await 
taskExec.GetResultAsync());
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
index c0bdd678ac..1c2df0219a 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
@@ -328,7 +328,8 @@ namespace Apache.Ignite.Tests
                             rw.Write(1);
                         }
 
-                        rw.Write(Guid.NewGuid());
+                        rw.Write(Guid.NewGuid()); // Job id.
+                        WriteNode(Node, ref rw);
 
                         Send(handler, requestId, resWriter);
                         Send(handler, requestId, pooledArrayBuffer, 
isNotification: true);
@@ -419,16 +420,7 @@ namespace Apache.Ignite.Tests
 
                         foreach (var node in ClusterNodes)
                         {
-                            writer.Write(4); // Field count.
-                            writer.Write(node.Id);
-                            writer.Write(node.Name);
-
-                            var addr = node.Address is IPEndPoint ip
-                                ? (ip.Address.ToString(), ip.Port)
-                                : (((DnsEndPoint)node.Address).Host, 
((DnsEndPoint)node.Address).Port);
-
-                            writer.Write(addr.Item1);
-                            writer.Write(addr.Port);
+                            WriteNode(node, ref writer);
                         }
 
                         Send(handler, requestId, arrayBufferWriter);
@@ -452,6 +444,25 @@ namespace Apache.Ignite.Tests
             handler.Disconnect(true);
         }
 
+        private static (string Host, int Port) GetNodeAddress(IClusterNode 
node)
+        {
+            return node.Address is IPEndPoint ip
+                ? (ip.Address.ToString(), ip.Port)
+                : (((DnsEndPoint)node.Address).Host, 
((DnsEndPoint)node.Address).Port);
+        }
+
+        private static void WriteNode(IClusterNode node, ref MsgPackWriter 
writer)
+        {
+            writer.Write(4); // Field count.
+
+            writer.Write(node.Id);
+            writer.Write(node.Name);
+
+            var (host, port) = GetNodeAddress(node);
+            writer.Write(host);
+            writer.Write(port);
+        }
+
         private void Send(Socket socket, long requestId, PooledArrayBuffer 
writer, bool isError = false, bool isNotification = false)
             => Send(socket, requestId, writer.GetWrittenMemory(), isError, 
isNotification);
 
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/ColocationHashTests.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/ColocationHashTests.cs
index a41dbf7dd7..1496000e2b 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/ColocationHashTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/ColocationHashTests.cs
@@ -174,7 +174,7 @@ public class ColocationHashTests : IgniteTestsBase
         var schemas = table.GetFieldValue<IDictionary<int, 
Task<Schema>>>("_schemas");
         var schema = schemas[1].GetAwaiter().GetResult();
         var clusterNodes = await Client.GetClusterNodesAsync();
-        var jobTarget = JobTarget.AnyNode(clusterNodes);
+        var jobTarget = JobTarget.AnyNode(clusterNodes.ToArray());
         var job = new JobDescriptor<object, int>(TableRowColocationHashJob);
 
         for (int i = 0; i < 100; i++)
diff --git a/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs 
b/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs
index 9b9b108447..037b47afe6 100644
--- a/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs
@@ -117,7 +117,7 @@ namespace Apache.Ignite
         TupleContainsKey,
 
         /// <summary>
-        /// Compute (<see cref="ICompute.SubmitAsync{TTarget,TArg,TResult}"/>, 
<see cref="ICompute.SubmitBroadcast{TArg,TResult}"/>).
+        /// Compute (<see cref="ICompute.SubmitAsync{TTarget,TArg,TResult}"/>, 
<see cref="ICompute.SubmitBroadcastAsync{TTarget,TArg,TResult}"/>).
         /// </summary>
         ComputeExecute,
 
@@ -142,7 +142,7 @@ namespace Apache.Ignite
         ComputeGetStatus,
 
         /// <summary>
-        /// Cancel compute job (<see cref="IJobExecution{T}.CancelAsync"/>).
+        /// Cancel compute job.
         /// </summary>
         ComputeCancel,
 
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Compute/BroadcastJobTarget.cs 
b/modules/platforms/dotnet/Apache.Ignite/Compute/BroadcastJobTarget.cs
new file mode 100644
index 0000000000..8015f4f78f
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite/Compute/BroadcastJobTarget.cs
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Compute;
+
+using System.Collections.Generic;
+using Internal.Common;
+using Network;
+
+/// <summary>
+/// Compute broadcast job target.
+/// </summary>
+public static class BroadcastJobTarget
+{
+    /// <summary>
+    /// Creates a broadcast job target for all specified nodes.
+    /// </summary>
+    /// <param name="nodes">Nodes to run the job on.</param>
+    /// <returns>Job target.</returns>
+    public static IBroadcastJobTarget<IEnumerable<IClusterNode>> 
Nodes(IEnumerable<IClusterNode> nodes)
+    {
+        IgniteArgumentCheck.NotNull(nodes);
+
+        return new AllNodesTarget(nodes);
+    }
+
+    /// <summary>
+    /// Creates a broadcast job target for all specified nodes.
+    /// </summary>
+    /// <param name="nodes">Nodes to run the job on.</param>
+    /// <returns>Job target.</returns>
+    public static IBroadcastJobTarget<IEnumerable<IClusterNode>> Nodes(params 
IClusterNode[] nodes)
+    {
+        IgniteArgumentCheck.NotNull(nodes);
+
+        return new AllNodesTarget(nodes);
+    }
+
+    /// <summary>
+    /// All nodes broadcast job target.
+    /// </summary>
+    /// <param name="Data">Nodes.</param>
+    internal record AllNodesTarget(IEnumerable<IClusterNode> Data) : 
IBroadcastJobTarget<IEnumerable<IClusterNode>>;
+}
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Compute/IBroadcastExecution.cs 
b/modules/platforms/dotnet/Apache.Ignite/Compute/IBroadcastExecution.cs
new file mode 100644
index 0000000000..ddc8ada510
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite/Compute/IBroadcastExecution.cs
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Compute;
+
+using System.Collections.Generic;
+
+/// <summary>
+/// Broadcast execution control object, provides information about the 
broadcast execution process and result.
+/// </summary>
+/// <typeparam name="TResult">Job result type.</typeparam>
+public interface IBroadcastExecution<TResult>
+{
+    /// <summary>
+    /// Gets the job executions.
+    /// </summary>
+    IReadOnlyList<IJobExecution<TResult>> JobExecutions { get; }
+}
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Compute/IBroadcastJobTarget.cs 
b/modules/platforms/dotnet/Apache.Ignite/Compute/IBroadcastJobTarget.cs
new file mode 100644
index 0000000000..113bc53591
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite/Compute/IBroadcastJobTarget.cs
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Compute;
+
+/// <summary>
+/// Compute broadcast job target.
+/// </summary>
+/// <typeparam name="T">Underlying data type.</typeparam>
+public interface IBroadcastJobTarget<out T>
+    where T : notnull
+{
+    /// <summary>
+    /// Gets the target data.
+    /// </summary>
+    T Data { get; }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Compute/ICompute.cs 
b/modules/platforms/dotnet/Apache.Ignite/Compute/ICompute.cs
index 9c875d2308..d42f936b91 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Compute/ICompute.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Compute/ICompute.cs
@@ -17,9 +17,7 @@
 
 namespace Apache.Ignite.Compute;
 
-using System.Collections.Generic;
 using System.Threading.Tasks;
-using Network;
 
 /// <summary>
 /// Ignite Compute API provides distributed job execution functionality.
@@ -29,7 +27,7 @@ public interface ICompute
     /// <summary>
     /// Submits a compute job represented by the given class for an execution 
on one of the specified nodes.
     /// </summary>
-    /// <param name="target">Job execution target.</param>
+    /// <param name="target">Job execution target. See factory methods in <see 
cref="JobTarget"/>.</param>
     /// <param name="jobDescriptor">Job descriptor.</param>
     /// <param name="arg">Job argument.</param>
     /// <typeparam name="TTarget">Job target type.</typeparam>
@@ -43,18 +41,20 @@ public interface ICompute
         where TTarget : notnull;
 
     /// <summary>
-    /// Submits a compute job represented by the given class for an execution 
on all of the specified nodes.
+    /// Submits a compute job represented by the given class for an execution 
on the specified target.
     /// </summary>
-    /// <param name="nodes">Nodes to use for the job execution.</param>
+    /// <param name="target">Job target. See factory methods in <see 
cref="BroadcastJobTarget"/>.</param>
     /// <param name="jobDescriptor">Job descriptor.</param>
     /// <param name="arg">Job argument.</param>
+    /// <typeparam name="TTarget">Job target type.</typeparam>
     /// <typeparam name="TArg">Job argument type.</typeparam>
     /// <typeparam name="TResult">Job result type.</typeparam>
-    /// <returns>A map of <see cref="Task"/> representing the asynchronous 
operation for every node.</returns>
-    IDictionary<IClusterNode, Task<IJobExecution<TResult>>> 
SubmitBroadcast<TArg, TResult>(
-        IEnumerable<IClusterNode> nodes,
+    /// <returns>A <see cref="Task"/> representing the asynchronous 
operation.</returns>
+    Task<IBroadcastExecution<TResult>> SubmitBroadcastAsync<TTarget, TArg, 
TResult>(
+        IBroadcastJobTarget<TTarget> target,
         JobDescriptor<TArg, TResult> jobDescriptor,
-        TArg arg);
+        TArg arg)
+        where TTarget : notnull;
 
     /// <summary>
     /// Submits a compute map-reduce task represented by the given class.
diff --git a/modules/platforms/dotnet/Apache.Ignite/Compute/IJobExecution.cs 
b/modules/platforms/dotnet/Apache.Ignite/Compute/IJobExecution.cs
index 03dd11a2e5..98681754d2 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Compute/IJobExecution.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Compute/IJobExecution.cs
@@ -19,6 +19,7 @@ namespace Apache.Ignite.Compute;
 
 using System;
 using System.Threading.Tasks;
+using Network;
 
 /// <summary>
 /// Job control object, provides information about the job execution process 
and result, allows cancelling the job.
@@ -31,6 +32,11 @@ public interface IJobExecution<T>
     /// </summary>
     Guid Id { get; }
 
+    /// <summary>
+    /// Gets the node where the job is executing.
+    /// </summary>
+    IClusterNode Node { get; }
+
     /// <summary>
     /// Gets the job execution result.
     /// </summary>
@@ -45,15 +51,6 @@ public interface IJobExecution<T>
     /// </returns>
     Task<JobState?> GetStateAsync();
 
-    /// <summary>
-    /// Cancels the job execution.
-    /// </summary>
-    /// <returns>
-    /// Returns <c>true</c> if the job was successfully cancelled, 
<c>false</c> if the job has already finished,
-    /// <c>null</c> if the job was not found (no longer exists due to 
exceeding the retention time limit).
-    /// </returns>
-    Task<bool?> CancelAsync();
-
     /// <summary>
     /// Changes the job priority. After priority change the job will be the 
last in the queue of jobs with the same priority.
     /// </summary>
diff --git a/modules/platforms/dotnet/Apache.Ignite/Compute/ITaskExecution.cs 
b/modules/platforms/dotnet/Apache.Ignite/Compute/ITaskExecution.cs
index f007978f30..8194f341c3 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Compute/ITaskExecution.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Compute/ITaskExecution.cs
@@ -51,15 +51,6 @@ public interface ITaskExecution<T>
     /// </returns>
     Task<TaskState?> GetStateAsync();
 
-    /// <summary>
-    /// Cancels the task execution.
-    /// </summary>
-    /// <returns>
-    /// Returns <c>true</c> if the task was successfully cancelled, 
<c>false</c> if the task has already finished,
-    /// <c>null</c> if the task was not found (no longer exists due to 
exceeding the retention time limit).
-    /// </returns>
-    Task<bool?> CancelAsync();
-
     /// <summary>
     /// Changes the task priority. After priority change the task will be the 
last in the queue of tasks with the same priority.
     /// </summary>
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/BroadcastExecution.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/BroadcastExecution.cs
new file mode 100644
index 0000000000..41a9866aa7
--- /dev/null
+++ 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/BroadcastExecution.cs
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Internal.Compute;
+
+using System.Collections.Generic;
+using Ignite.Compute;
+
+/// <summary>
+/// Broadcast execution.
+/// </summary>
+/// <param name="JobExecutions">Job executions.</param>
+/// <typeparam name="TResult">Job result type.</typeparam>
+internal sealed record 
BroadcastExecution<TResult>(IReadOnlyList<IJobExecution<TResult>> 
JobExecutions) : IBroadcastExecution<TResult>;
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
index 275d66fbf5..82fcaef2be 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
@@ -30,6 +30,7 @@ namespace Apache.Ignite.Internal.Compute
     using Ignite.Network;
     using Ignite.Table;
     using Marshalling;
+    using Network;
     using Proto;
     using Proto.MsgPack;
     using Table;
@@ -82,24 +83,37 @@ namespace Apache.Ignite.Internal.Compute
         }
 
         /// <inheritdoc/>
-        public IDictionary<IClusterNode, Task<IJobExecution<TResult>>> 
SubmitBroadcast<TArg, TResult>(
-            IEnumerable<IClusterNode> nodes,
+        public async Task<IBroadcastExecution<TResult>> 
SubmitBroadcastAsync<TTarget, TArg, TResult>(
+            IBroadcastJobTarget<TTarget> target,
             JobDescriptor<TArg, TResult> jobDescriptor,
             TArg arg)
+            where TTarget : notnull
         {
-            IgniteArgumentCheck.NotNull(nodes);
+            IgniteArgumentCheck.NotNull(target);
             IgniteArgumentCheck.NotNull(jobDescriptor);
             IgniteArgumentCheck.NotNull(jobDescriptor.JobClassName);
 
-            var res = new Dictionary<IClusterNode, 
Task<IJobExecution<TResult>>>();
+            return target switch
+            {
+                BroadcastJobTarget.AllNodesTarget allNodes => await 
SubmitBroadcastAsyncInternal(allNodes.Data)
+                    .ConfigureAwait(false),
+
+                _ => throw new ArgumentException("Unsupported broadcast job 
target: " + target)
+            };
 
-            foreach (var node in nodes)
+            async Task<IBroadcastExecution<TResult>> 
SubmitBroadcastAsyncInternal(IEnumerable<IClusterNode> nodes)
             {
-                Task<IJobExecution<TResult>> task = ExecuteOnNodes(new[] { 
node }, jobDescriptor, arg);
-                res[node] = task;
-            }
+                var jobExecutions = new List<IJobExecution<TResult>>();
 
-            return res;
+                foreach (var node in nodes)
+                {
+                    IJobExecution<TResult> jobExec = await 
ExecuteOnNodes([node], jobDescriptor, arg).ConfigureAwait(false);
+
+                    jobExecutions.Add(jobExec);
+                }
+
+                return new BroadcastExecution<TResult>(jobExecutions);
+            }
         }
 
         /// <inheritdoc/>
@@ -314,8 +328,9 @@ namespace Apache.Ignite.Internal.Compute
 
             var jobId = reader.ReadGuid();
             var resultTask = 
GetResult((NotificationHandler)computeExecuteResult.Metadata!);
+            var node = ClusterNode.Read(ref reader);
 
-            return new JobExecution<T>(jobId, resultTask, this);
+            return new JobExecution<T>(jobId, resultTask, this, node);
 
             async Task<(T, JobState)> GetResult(NotificationHandler handler)
             {
@@ -377,10 +392,12 @@ namespace Apache.Ignite.Internal.Compute
             using var writer = ProtoCommon.GetMessageWriter();
             Write();
 
-            using PooledBuffer res = await _socket.DoOutInOpAsync(
-                    ClientOp.ComputeExecute, writer, 
PreferredNode.FromName(node.Name), expectNotifications: true)
+            var (buf, sock) = await _socket.DoOutInOpAndGetSocketAsync(
+                    ClientOp.ComputeExecute, tx: null, writer, 
PreferredNode.FromName(node.Name), expectNotifications: true)
                 .ConfigureAwait(false);
 
+            using var res = buf;
+
             return GetJobExecution(res, readSchema: false, 
jobDescriptor.ResultMarshaller);
 
             void Write()
@@ -442,10 +459,12 @@ namespace Apache.Ignite.Internal.Compute
                     var colocationHash = Write(bufferWriter, table, schema);
                     var preferredNode = await 
table.GetPreferredNode(colocationHash, null).ConfigureAwait(false);
 
-                    using var res = await _socket.DoOutInOpAsync(
-                            ClientOp.ComputeExecuteColocated, bufferWriter, 
preferredNode, expectNotifications: true)
+                    var (resBuf, sock) = await 
_socket.DoOutInOpAndGetSocketAsync(
+                            ClientOp.ComputeExecuteColocated, tx: null, 
bufferWriter, preferredNode, expectNotifications: true)
                         .ConfigureAwait(false);
 
+                    using var res = resBuf;
+
                     return GetJobExecution(res, readSchema: true, 
descriptor.ResultMarshaller);
                 }
                 catch (IgniteException e) when (e.Code == 
ErrorGroups.Client.TableIdNotFound)
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/JobExecution.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/JobExecution.cs
index 942018d345..805ab8b7d0 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/JobExecution.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/JobExecution.cs
@@ -20,6 +20,7 @@ namespace Apache.Ignite.Internal.Compute;
 using System;
 using System.Threading.Tasks;
 using Ignite.Compute;
+using Ignite.Network;
 
 /// <summary>
 /// Job execution.
@@ -39,9 +40,12 @@ internal sealed record JobExecution<T> : IJobExecution<T>
     /// <param name="id">Job id.</param>
     /// <param name="resultTask">Result task.</param>
     /// <param name="compute">Compute.</param>
-    public JobExecution(Guid id, Task<(T Result, JobState Status)> resultTask, 
Compute compute)
+    /// <param name="node">Job node.</param>
+    public JobExecution(Guid id, Task<(T Result, JobState Status)> resultTask, 
Compute compute, IClusterNode node)
     {
         Id = id;
+        Node = node;
+
         _resultTask = resultTask;
         _compute = compute;
 
@@ -52,6 +56,9 @@ internal sealed record JobExecution<T> : IJobExecution<T>
     /// <inheritdoc/>
     public Guid Id { get; }
 
+    /// <inheritdoc/>
+    public IClusterNode Node { get; }
+
     /// <inheritdoc/>
     public async Task<T> GetResultAsync()
     {
@@ -78,10 +85,6 @@ internal sealed record JobExecution<T> : IJobExecution<T>
         return status;
     }
 
-    /// <inheritdoc/>
-    public async Task<bool?> CancelAsync() =>
-        await _compute.CancelJobAsync(Id).ConfigureAwait(false);
-
     /// <inheritdoc/>
     public async Task<bool?> ChangePriorityAsync(int priority) =>
         await _compute.ChangeJobPriorityAsync(Id, 
priority).ConfigureAwait(false);
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/TaskExecution.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/TaskExecution.cs
index d7b5ac55fe..3eda0dc45f 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/TaskExecution.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/TaskExecution.cs
@@ -86,10 +86,6 @@ internal sealed record TaskExecution<T> : ITaskExecution<T>
         return state;
     }
 
-    /// <inheritdoc/>
-    public async Task<bool?> CancelAsync() =>
-        await _compute.CancelJobAsync(Id).ConfigureAwait(false);
-
     /// <inheritdoc/>
     public async Task<bool?> ChangePriorityAsync(int priority) =>
         await _compute.ChangeJobPriorityAsync(Id, 
priority).ConfigureAwait(false);

Reply via email to