This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 6989419b03 IGNITE-24197 .NET: Unify Сompute API (#5050)
6989419b03 is described below
commit 6989419b03f9c69c4e10568399a9c4ae21fc8037
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Thu Jan 16 15:45:26 2025 +0200
IGNITE-24197 .NET: Unify Сompute API (#5050)
Use Target and Execution objects in all Compute API methods.
* Add `IBroadcastJobTarget`
* Add `IBroadcastExecution`
* Remove `CancelAsync` from `JobExecution` and `TaskExecution` (use
`CancellationToken` for cancellation - to be added separately)
Reflects Java API changes from IGNITE-24174 (#5016)
---
.../Apache.Ignite.Tests/Compute/ComputeTests.cs | 48 +++++++++---------
.../dotnet/Apache.Ignite.Tests/FakeServer.cs | 33 ++++++++----
.../Proto/ColocationHashTests.cs | 2 +-
.../dotnet/Apache.Ignite/ClientOperationType.cs | 4 +-
.../Apache.Ignite/Compute/BroadcastJobTarget.cs | 58 ++++++++++++++++++++++
.../Apache.Ignite/Compute/IBroadcastExecution.cs | 32 ++++++++++++
.../Apache.Ignite/Compute/IBroadcastJobTarget.cs | 31 ++++++++++++
.../dotnet/Apache.Ignite/Compute/ICompute.cs | 18 +++----
.../dotnet/Apache.Ignite/Compute/IJobExecution.cs | 15 +++---
.../dotnet/Apache.Ignite/Compute/ITaskExecution.cs | 9 ----
.../Internal/Compute/BroadcastExecution.cs | 28 +++++++++++
.../Apache.Ignite/Internal/Compute/Compute.cs | 47 ++++++++++++------
.../Apache.Ignite/Internal/Compute/JobExecution.cs | 13 +++--
.../Internal/Compute/TaskExecution.cs | 4 --
14 files changed, 255 insertions(+), 87 deletions(-)
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
index 44586787ba..de2ab5d7f6 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
@@ -133,17 +133,15 @@ namespace Apache.Ignite.Tests.Compute
{
var node = (await GetNodeAsync(0)).Data;
- IDictionary<IClusterNode, Task<IJobExecution<string>>> taskMap =
Client.Compute.SubmitBroadcast(
- new[] { node },
+ var broadcastExec = await Client.Compute.SubmitBroadcastAsync(
+ BroadcastJobTarget.Nodes(node),
NodeNameJob,
"123");
- var res = await taskMap[node];
+ var jobExec = broadcastExec.JobExecutions.Single();
- Assert.AreEqual(1, taskMap.Count);
- Assert.AreSame(node, taskMap.Keys.Single());
-
- Assert.AreEqual(PlatformTestNodeRunner + "123", await
res.GetResultAsync());
+ Assert.AreEqual(node, jobExec.Node);
+ Assert.AreEqual(PlatformTestNodeRunner + "123", await
jobExec.GetResultAsync());
}
[Test]
@@ -151,15 +149,17 @@ namespace Apache.Ignite.Tests.Compute
{
var nodes = await Client.GetClusterNodesAsync();
- IDictionary<IClusterNode, Task<IJobExecution<string>>> taskMap =
Client.Compute.SubmitBroadcast(
- nodes,
+ IBroadcastExecution<string> broadcastExecution = await
Client.Compute.SubmitBroadcastAsync(
+ BroadcastJobTarget.Nodes(nodes),
NodeNameJob,
"123");
- var res1 = await taskMap[nodes[0]];
- var res2 = await taskMap[nodes[1]];
- var res3 = await taskMap[nodes[2]];
- var res4 = await taskMap[nodes[3]];
+ var taskMap = broadcastExecution.JobExecutions.ToDictionary(x =>
x.Node);
+
+ IJobExecution<string> res1 = taskMap[nodes[0]];
+ IJobExecution<string> res2 = taskMap[nodes[1]];
+ IJobExecution<string> res3 = taskMap[nodes[2]];
+ IJobExecution<string> res4 = taskMap[nodes[3]];
Assert.AreEqual(4, taskMap.Count);
@@ -212,10 +212,8 @@ namespace Apache.Ignite.Tests.Compute
{
var unknownNode = new ClusterNode(Guid.NewGuid(), "y", new
IPEndPoint(IPAddress.Loopback, 0));
- IDictionary<IClusterNode, Task<IJobExecution<object>>> taskMap =
- Client.Compute.SubmitBroadcast(new[] { unknownNode }, EchoJob,
"unused");
-
- var ex = Assert.ThrowsAsync<NodeNotFoundException>(async () =>
await taskMap[unknownNode]);
+ var ex = Assert.ThrowsAsync<NodeNotFoundException>(
+ async () => await
Client.Compute.SubmitBroadcastAsync(BroadcastJobTarget.Nodes(unknownNode),
EchoJob, "unused"));
StringAssert.Contains("None of the specified nodes are present in
the cluster: [y]", ex!.Message);
Assert.AreEqual(ErrorGroups.Compute.NodeNotFound, ex.Code);
@@ -633,7 +631,7 @@ namespace Apache.Ignite.Tests.Compute
public async Task TestJobExecutionStatusNull()
{
var fakeJobExecution = new JobExecution<int>(
- Guid.NewGuid(), Task.FromException<(int, JobState)>(new
Exception("x")), (Compute)Client.Compute);
+ Guid.NewGuid(), Task.FromException<(int, JobState)>(new
Exception("x")), (Compute)Client.Compute, null!);
var status = await fakeJobExecution.GetStateAsync();
@@ -641,14 +639,15 @@ namespace Apache.Ignite.Tests.Compute
}
[Test]
+ [Ignore("IGNITE-23495")]
public async Task TestJobExecutionCancel()
{
const int sleepMs = 5000;
var beforeStart = GetCurrentInstant();
var jobExecution = await Client.Compute.SubmitAsync(await
GetNodeAsync(1), SleepJob, sleepMs);
- await jobExecution.CancelAsync();
+ // await jobExecution.CancelAsync();
await AssertJobStatus(jobExecution, JobStatus.Canceled,
beforeStart);
}
@@ -804,19 +803,22 @@ namespace Apache.Ignite.Tests.Compute
}
[Test]
+ [Ignore("IGNITE-23495")]
public async Task TestCancelCompletedTask()
{
var taskExec = await
Client.Compute.SubmitMapReduceAsync(NodeNameTask, "arg");
await taskExec.GetResultAsync();
- var cancelRes = await taskExec.CancelAsync();
+
+ // var cancelRes = await taskExec.CancelAsync();
var state = await taskExec.GetStateAsync();
- Assert.IsFalse(cancelRes);
+ // Assert.IsFalse(cancelRes);
Assert.AreEqual(TaskStatus.Completed, state!.Status);
}
[Test]
+ [Ignore("IGNITE-23495")]
public async Task TestCancelExecutingTask()
{
var taskExec = await
Client.Compute.SubmitMapReduceAsync(SleepTask, 3000);
@@ -824,10 +826,10 @@ namespace Apache.Ignite.Tests.Compute
var state1 = await taskExec.GetStateAsync();
Assert.AreEqual(TaskStatus.Executing, state1!.Status);
- var cancelRes = await taskExec.CancelAsync();
+ // var cancelRes = await taskExec.CancelAsync();
var state2 = await taskExec.GetStateAsync();
- Assert.IsTrue(cancelRes);
+ // Assert.IsTrue(cancelRes);
Assert.AreEqual(TaskStatus.Failed, state2!.Status);
var ex = Assert.ThrowsAsync<ComputeException>(async () => await
taskExec.GetResultAsync());
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
index c0bdd678ac..1c2df0219a 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
@@ -328,7 +328,8 @@ namespace Apache.Ignite.Tests
rw.Write(1);
}
- rw.Write(Guid.NewGuid());
+ rw.Write(Guid.NewGuid()); // Job id.
+ WriteNode(Node, ref rw);
Send(handler, requestId, resWriter);
Send(handler, requestId, pooledArrayBuffer,
isNotification: true);
@@ -419,16 +420,7 @@ namespace Apache.Ignite.Tests
foreach (var node in ClusterNodes)
{
- writer.Write(4); // Field count.
- writer.Write(node.Id);
- writer.Write(node.Name);
-
- var addr = node.Address is IPEndPoint ip
- ? (ip.Address.ToString(), ip.Port)
- : (((DnsEndPoint)node.Address).Host,
((DnsEndPoint)node.Address).Port);
-
- writer.Write(addr.Item1);
- writer.Write(addr.Port);
+ WriteNode(node, ref writer);
}
Send(handler, requestId, arrayBufferWriter);
@@ -452,6 +444,25 @@ namespace Apache.Ignite.Tests
handler.Disconnect(true);
}
+ private static (string Host, int Port) GetNodeAddress(IClusterNode
node)
+ {
+ return node.Address is IPEndPoint ip
+ ? (ip.Address.ToString(), ip.Port)
+ : (((DnsEndPoint)node.Address).Host,
((DnsEndPoint)node.Address).Port);
+ }
+
+ private static void WriteNode(IClusterNode node, ref MsgPackWriter
writer)
+ {
+ writer.Write(4); // Field count.
+
+ writer.Write(node.Id);
+ writer.Write(node.Name);
+
+ var (host, port) = GetNodeAddress(node);
+ writer.Write(host);
+ writer.Write(port);
+ }
+
private void Send(Socket socket, long requestId, PooledArrayBuffer
writer, bool isError = false, bool isNotification = false)
=> Send(socket, requestId, writer.GetWrittenMemory(), isError,
isNotification);
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/ColocationHashTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/ColocationHashTests.cs
index a41dbf7dd7..1496000e2b 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/ColocationHashTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/ColocationHashTests.cs
@@ -174,7 +174,7 @@ public class ColocationHashTests : IgniteTestsBase
var schemas = table.GetFieldValue<IDictionary<int,
Task<Schema>>>("_schemas");
var schema = schemas[1].GetAwaiter().GetResult();
var clusterNodes = await Client.GetClusterNodesAsync();
- var jobTarget = JobTarget.AnyNode(clusterNodes);
+ var jobTarget = JobTarget.AnyNode(clusterNodes.ToArray());
var job = new JobDescriptor<object, int>(TableRowColocationHashJob);
for (int i = 0; i < 100; i++)
diff --git a/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs
b/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs
index 9b9b108447..037b47afe6 100644
--- a/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs
@@ -117,7 +117,7 @@ namespace Apache.Ignite
TupleContainsKey,
/// <summary>
- /// Compute (<see cref="ICompute.SubmitAsync{TTarget,TArg,TResult}"/>,
<see cref="ICompute.SubmitBroadcast{TArg,TResult}"/>).
+ /// Compute (<see cref="ICompute.SubmitAsync{TTarget,TArg,TResult}"/>,
<see cref="ICompute.SubmitBroadcastAsync{TTarget,TArg,TResult}"/>).
/// </summary>
ComputeExecute,
@@ -142,7 +142,7 @@ namespace Apache.Ignite
ComputeGetStatus,
/// <summary>
- /// Cancel compute job (<see cref="IJobExecution{T}.CancelAsync"/>).
+ /// Cancel compute job.
/// </summary>
ComputeCancel,
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Compute/BroadcastJobTarget.cs
b/modules/platforms/dotnet/Apache.Ignite/Compute/BroadcastJobTarget.cs
new file mode 100644
index 0000000000..8015f4f78f
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite/Compute/BroadcastJobTarget.cs
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Compute;
+
+using System.Collections.Generic;
+using Internal.Common;
+using Network;
+
+/// <summary>
+/// Compute broadcast job target.
+/// </summary>
+public static class BroadcastJobTarget
+{
+ /// <summary>
+ /// Creates a broadcast job target for all specified nodes.
+ /// </summary>
+ /// <param name="nodes">Nodes to run the job on.</param>
+ /// <returns>Job target.</returns>
+ public static IBroadcastJobTarget<IEnumerable<IClusterNode>>
Nodes(IEnumerable<IClusterNode> nodes)
+ {
+ IgniteArgumentCheck.NotNull(nodes);
+
+ return new AllNodesTarget(nodes);
+ }
+
+ /// <summary>
+ /// Creates a broadcast job target for all specified nodes.
+ /// </summary>
+ /// <param name="nodes">Nodes to run the job on.</param>
+ /// <returns>Job target.</returns>
+ public static IBroadcastJobTarget<IEnumerable<IClusterNode>> Nodes(params
IClusterNode[] nodes)
+ {
+ IgniteArgumentCheck.NotNull(nodes);
+
+ return new AllNodesTarget(nodes);
+ }
+
+ /// <summary>
+ /// All nodes broadcast job target.
+ /// </summary>
+ /// <param name="Data">Nodes.</param>
+ internal record AllNodesTarget(IEnumerable<IClusterNode> Data) :
IBroadcastJobTarget<IEnumerable<IClusterNode>>;
+}
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Compute/IBroadcastExecution.cs
b/modules/platforms/dotnet/Apache.Ignite/Compute/IBroadcastExecution.cs
new file mode 100644
index 0000000000..ddc8ada510
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite/Compute/IBroadcastExecution.cs
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Compute;
+
+using System.Collections.Generic;
+
+/// <summary>
+/// Broadcast execution control object, provides information about the
broadcast execution process and result.
+/// </summary>
+/// <typeparam name="TResult">Job result type.</typeparam>
+public interface IBroadcastExecution<TResult>
+{
+ /// <summary>
+ /// Gets the job executions.
+ /// </summary>
+ IReadOnlyList<IJobExecution<TResult>> JobExecutions { get; }
+}
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Compute/IBroadcastJobTarget.cs
b/modules/platforms/dotnet/Apache.Ignite/Compute/IBroadcastJobTarget.cs
new file mode 100644
index 0000000000..113bc53591
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite/Compute/IBroadcastJobTarget.cs
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Compute;
+
+/// <summary>
+/// Compute broadcast job target.
+/// </summary>
+/// <typeparam name="T">Underlying data type.</typeparam>
+public interface IBroadcastJobTarget<out T>
+ where T : notnull
+{
+ /// <summary>
+ /// Gets the target data.
+ /// </summary>
+ T Data { get; }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Compute/ICompute.cs
b/modules/platforms/dotnet/Apache.Ignite/Compute/ICompute.cs
index 9c875d2308..d42f936b91 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Compute/ICompute.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Compute/ICompute.cs
@@ -17,9 +17,7 @@
namespace Apache.Ignite.Compute;
-using System.Collections.Generic;
using System.Threading.Tasks;
-using Network;
/// <summary>
/// Ignite Compute API provides distributed job execution functionality.
@@ -29,7 +27,7 @@ public interface ICompute
/// <summary>
/// Submits a compute job represented by the given class for an execution
on one of the specified nodes.
/// </summary>
- /// <param name="target">Job execution target.</param>
+ /// <param name="target">Job execution target. See factory methods in <see
cref="JobTarget"/>.</param>
/// <param name="jobDescriptor">Job descriptor.</param>
/// <param name="arg">Job argument.</param>
/// <typeparam name="TTarget">Job target type.</typeparam>
@@ -43,18 +41,20 @@ public interface ICompute
where TTarget : notnull;
/// <summary>
- /// Submits a compute job represented by the given class for an execution
on all of the specified nodes.
+ /// Submits a compute job represented by the given class for an execution
on the specified target.
/// </summary>
- /// <param name="nodes">Nodes to use for the job execution.</param>
+ /// <param name="target">Job target. See factory methods in <see
cref="BroadcastJobTarget"/>.</param>
/// <param name="jobDescriptor">Job descriptor.</param>
/// <param name="arg">Job argument.</param>
+ /// <typeparam name="TTarget">Job target type.</typeparam>
/// <typeparam name="TArg">Job argument type.</typeparam>
/// <typeparam name="TResult">Job result type.</typeparam>
- /// <returns>A map of <see cref="Task"/> representing the asynchronous
operation for every node.</returns>
- IDictionary<IClusterNode, Task<IJobExecution<TResult>>>
SubmitBroadcast<TArg, TResult>(
- IEnumerable<IClusterNode> nodes,
+ /// <returns>A <see cref="Task"/> representing the asynchronous
operation.</returns>
+ Task<IBroadcastExecution<TResult>> SubmitBroadcastAsync<TTarget, TArg,
TResult>(
+ IBroadcastJobTarget<TTarget> target,
JobDescriptor<TArg, TResult> jobDescriptor,
- TArg arg);
+ TArg arg)
+ where TTarget : notnull;
/// <summary>
/// Submits a compute map-reduce task represented by the given class.
diff --git a/modules/platforms/dotnet/Apache.Ignite/Compute/IJobExecution.cs
b/modules/platforms/dotnet/Apache.Ignite/Compute/IJobExecution.cs
index 03dd11a2e5..98681754d2 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Compute/IJobExecution.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Compute/IJobExecution.cs
@@ -19,6 +19,7 @@ namespace Apache.Ignite.Compute;
using System;
using System.Threading.Tasks;
+using Network;
/// <summary>
/// Job control object, provides information about the job execution process
and result, allows cancelling the job.
@@ -31,6 +32,11 @@ public interface IJobExecution<T>
/// </summary>
Guid Id { get; }
+ /// <summary>
+ /// Gets the node where the job is executing.
+ /// </summary>
+ IClusterNode Node { get; }
+
/// <summary>
/// Gets the job execution result.
/// </summary>
@@ -45,15 +51,6 @@ public interface IJobExecution<T>
/// </returns>
Task<JobState?> GetStateAsync();
- /// <summary>
- /// Cancels the job execution.
- /// </summary>
- /// <returns>
- /// Returns <c>true</c> if the job was successfully cancelled,
<c>false</c> if the job has already finished,
- /// <c>null</c> if the job was not found (no longer exists due to
exceeding the retention time limit).
- /// </returns>
- Task<bool?> CancelAsync();
-
/// <summary>
/// Changes the job priority. After priority change the job will be the
last in the queue of jobs with the same priority.
/// </summary>
diff --git a/modules/platforms/dotnet/Apache.Ignite/Compute/ITaskExecution.cs
b/modules/platforms/dotnet/Apache.Ignite/Compute/ITaskExecution.cs
index f007978f30..8194f341c3 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Compute/ITaskExecution.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Compute/ITaskExecution.cs
@@ -51,15 +51,6 @@ public interface ITaskExecution<T>
/// </returns>
Task<TaskState?> GetStateAsync();
- /// <summary>
- /// Cancels the task execution.
- /// </summary>
- /// <returns>
- /// Returns <c>true</c> if the task was successfully cancelled,
<c>false</c> if the task has already finished,
- /// <c>null</c> if the task was not found (no longer exists due to
exceeding the retention time limit).
- /// </returns>
- Task<bool?> CancelAsync();
-
/// <summary>
/// Changes the task priority. After priority change the task will be the
last in the queue of tasks with the same priority.
/// </summary>
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/BroadcastExecution.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/BroadcastExecution.cs
new file mode 100644
index 0000000000..41a9866aa7
--- /dev/null
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/BroadcastExecution.cs
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Internal.Compute;
+
+using System.Collections.Generic;
+using Ignite.Compute;
+
+/// <summary>
+/// Broadcast execution.
+/// </summary>
+/// <param name="JobExecutions">Job executions.</param>
+/// <typeparam name="TResult">Job result type.</typeparam>
+internal sealed record
BroadcastExecution<TResult>(IReadOnlyList<IJobExecution<TResult>>
JobExecutions) : IBroadcastExecution<TResult>;
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
index 275d66fbf5..82fcaef2be 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
@@ -30,6 +30,7 @@ namespace Apache.Ignite.Internal.Compute
using Ignite.Network;
using Ignite.Table;
using Marshalling;
+ using Network;
using Proto;
using Proto.MsgPack;
using Table;
@@ -82,24 +83,37 @@ namespace Apache.Ignite.Internal.Compute
}
/// <inheritdoc/>
- public IDictionary<IClusterNode, Task<IJobExecution<TResult>>>
SubmitBroadcast<TArg, TResult>(
- IEnumerable<IClusterNode> nodes,
+ public async Task<IBroadcastExecution<TResult>>
SubmitBroadcastAsync<TTarget, TArg, TResult>(
+ IBroadcastJobTarget<TTarget> target,
JobDescriptor<TArg, TResult> jobDescriptor,
TArg arg)
+ where TTarget : notnull
{
- IgniteArgumentCheck.NotNull(nodes);
+ IgniteArgumentCheck.NotNull(target);
IgniteArgumentCheck.NotNull(jobDescriptor);
IgniteArgumentCheck.NotNull(jobDescriptor.JobClassName);
- var res = new Dictionary<IClusterNode,
Task<IJobExecution<TResult>>>();
+ return target switch
+ {
+ BroadcastJobTarget.AllNodesTarget allNodes => await
SubmitBroadcastAsyncInternal(allNodes.Data)
+ .ConfigureAwait(false),
+
+ _ => throw new ArgumentException("Unsupported broadcast job
target: " + target)
+ };
- foreach (var node in nodes)
+ async Task<IBroadcastExecution<TResult>>
SubmitBroadcastAsyncInternal(IEnumerable<IClusterNode> nodes)
{
- Task<IJobExecution<TResult>> task = ExecuteOnNodes(new[] {
node }, jobDescriptor, arg);
- res[node] = task;
- }
+ var jobExecutions = new List<IJobExecution<TResult>>();
- return res;
+ foreach (var node in nodes)
+ {
+ IJobExecution<TResult> jobExec = await
ExecuteOnNodes([node], jobDescriptor, arg).ConfigureAwait(false);
+
+ jobExecutions.Add(jobExec);
+ }
+
+ return new BroadcastExecution<TResult>(jobExecutions);
+ }
}
/// <inheritdoc/>
@@ -314,8 +328,9 @@ namespace Apache.Ignite.Internal.Compute
var jobId = reader.ReadGuid();
var resultTask =
GetResult((NotificationHandler)computeExecuteResult.Metadata!);
+ var node = ClusterNode.Read(ref reader);
- return new JobExecution<T>(jobId, resultTask, this);
+ return new JobExecution<T>(jobId, resultTask, this, node);
async Task<(T, JobState)> GetResult(NotificationHandler handler)
{
@@ -377,10 +392,12 @@ namespace Apache.Ignite.Internal.Compute
using var writer = ProtoCommon.GetMessageWriter();
Write();
- using PooledBuffer res = await _socket.DoOutInOpAsync(
- ClientOp.ComputeExecute, writer,
PreferredNode.FromName(node.Name), expectNotifications: true)
+ var (buf, sock) = await _socket.DoOutInOpAndGetSocketAsync(
+ ClientOp.ComputeExecute, tx: null, writer,
PreferredNode.FromName(node.Name), expectNotifications: true)
.ConfigureAwait(false);
+ using var res = buf;
+
return GetJobExecution(res, readSchema: false,
jobDescriptor.ResultMarshaller);
void Write()
@@ -442,10 +459,12 @@ namespace Apache.Ignite.Internal.Compute
var colocationHash = Write(bufferWriter, table, schema);
var preferredNode = await
table.GetPreferredNode(colocationHash, null).ConfigureAwait(false);
- using var res = await _socket.DoOutInOpAsync(
- ClientOp.ComputeExecuteColocated, bufferWriter,
preferredNode, expectNotifications: true)
+ var (resBuf, sock) = await
_socket.DoOutInOpAndGetSocketAsync(
+ ClientOp.ComputeExecuteColocated, tx: null,
bufferWriter, preferredNode, expectNotifications: true)
.ConfigureAwait(false);
+ using var res = resBuf;
+
return GetJobExecution(res, readSchema: true,
descriptor.ResultMarshaller);
}
catch (IgniteException e) when (e.Code ==
ErrorGroups.Client.TableIdNotFound)
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/JobExecution.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/JobExecution.cs
index 942018d345..805ab8b7d0 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/JobExecution.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/JobExecution.cs
@@ -20,6 +20,7 @@ namespace Apache.Ignite.Internal.Compute;
using System;
using System.Threading.Tasks;
using Ignite.Compute;
+using Ignite.Network;
/// <summary>
/// Job execution.
@@ -39,9 +40,12 @@ internal sealed record JobExecution<T> : IJobExecution<T>
/// <param name="id">Job id.</param>
/// <param name="resultTask">Result task.</param>
/// <param name="compute">Compute.</param>
- public JobExecution(Guid id, Task<(T Result, JobState Status)> resultTask,
Compute compute)
+ /// <param name="node">Job node.</param>
+ public JobExecution(Guid id, Task<(T Result, JobState Status)> resultTask,
Compute compute, IClusterNode node)
{
Id = id;
+ Node = node;
+
_resultTask = resultTask;
_compute = compute;
@@ -52,6 +56,9 @@ internal sealed record JobExecution<T> : IJobExecution<T>
/// <inheritdoc/>
public Guid Id { get; }
+ /// <inheritdoc/>
+ public IClusterNode Node { get; }
+
/// <inheritdoc/>
public async Task<T> GetResultAsync()
{
@@ -78,10 +85,6 @@ internal sealed record JobExecution<T> : IJobExecution<T>
return status;
}
- /// <inheritdoc/>
- public async Task<bool?> CancelAsync() =>
- await _compute.CancelJobAsync(Id).ConfigureAwait(false);
-
/// <inheritdoc/>
public async Task<bool?> ChangePriorityAsync(int priority) =>
await _compute.ChangeJobPriorityAsync(Id,
priority).ConfigureAwait(false);
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/TaskExecution.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/TaskExecution.cs
index d7b5ac55fe..3eda0dc45f 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/TaskExecution.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/TaskExecution.cs
@@ -86,10 +86,6 @@ internal sealed record TaskExecution<T> : ITaskExecution<T>
return state;
}
- /// <inheritdoc/>
- public async Task<bool?> CancelAsync() =>
- await _compute.CancelJobAsync(Id).ConfigureAwait(false);
-
/// <inheritdoc/>
public async Task<bool?> ChangePriorityAsync(int priority) =>
await _compute.ChangeJobPriorityAsync(Id,
priority).ConfigureAwait(false);