This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new f2c02191d0 IGNITE-21289 .NET: Implement job execution interface (#3159)
f2c02191d0 is described below
commit f2c02191d03505d592e8e76aa1b6494568668119
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Wed Feb 7 11:30:52 2024 +0200
IGNITE-21289 .NET: Implement job execution interface (#3159)
Add job execution API:
* Return `IJobExecution<T>` from `ICompute` methods instead of a raw result
* Implement `GetStatusAsync`, `CancelAsync`, `ChangePriorityAsync`
---
.../java/org/apache/ignite/compute/JobState.java | 2 +-
.../java/org/apache/ignite/compute/JobStatus.java | 73 +++++++++
.../Compute/ComputeClusterAwarenessTests.cs | 21 +--
.../Apache.Ignite.Tests/Compute/ComputeTests.cs | 173 ++++++++++++++++-----
.../dotnet/Apache.Ignite.Tests/FakeServer.cs | 22 ++-
.../PartitionAwarenessRealClusterTests.cs | 4 +-
.../Proto/ColocationHashTests.cs | 8 +-
.../Table/SchemaSynchronizationTest.cs | 5 +-
.../dotnet/Apache.Ignite/ClientOperationType.cs | 17 +-
.../dotnet/Apache.Ignite/Compute/ICompute.cs | 8 +-
.../dotnet/Apache.Ignite/Compute/IJobExecution.cs | 67 ++++++++
.../dotnet/Apache.Ignite/Compute/JobState.cs} | 59 +++----
.../dotnet/Apache.Ignite/Compute/JobStatus.cs} | 51 ++----
.../Apache.Ignite/Internal/Compute/Compute.cs | 129 +++++++++++----
.../Apache.Ignite/Internal/Compute/JobExecution.cs | 95 +++++++++++
.../Apache.Ignite/Internal/Proto/ClientOp.cs | 11 +-
.../Internal/Proto/ClientOpExtensions.cs | 3 +
.../Internal/Proto/MsgPack/MsgPackReader.cs | 15 ++
.../dotnet/Apache.Ignite/RetryReadPolicy.cs | 3 +
19 files changed, 618 insertions(+), 148 deletions(-)
diff --git a/modules/api/src/main/java/org/apache/ignite/compute/JobState.java
b/modules/api/src/main/java/org/apache/ignite/compute/JobState.java
index 4c7d18a4cc..a60c025217 100644
--- a/modules/api/src/main/java/org/apache/ignite/compute/JobState.java
+++ b/modules/api/src/main/java/org/apache/ignite/compute/JobState.java
@@ -42,7 +42,7 @@ public enum JobState {
COMPLETED,
/**
- * The job has received the cancel command, but it is still running.
+ * The job has received the cancel command, but is still running.
*/
CANCELING,
diff --git a/modules/api/src/main/java/org/apache/ignite/compute/JobStatus.java
b/modules/api/src/main/java/org/apache/ignite/compute/JobStatus.java
index 3ee4ac2910..e04f27c74e 100644
--- a/modules/api/src/main/java/org/apache/ignite/compute/JobStatus.java
+++ b/modules/api/src/main/java/org/apache/ignite/compute/JobStatus.java
@@ -64,32 +64,67 @@ public class JobStatus implements Serializable {
this.finishTime = builder.finishTime;
}
+ /**
+ * Creates a new builder.
+ *
+ * @return Builder.
+ */
public static Builder builder() {
return new Builder();
}
+ /**
+ * Returns job ID.
+ *
+ * @return Job ID.
+ */
public UUID id() {
return id;
}
+ /**
+ * Returns job state.
+ *
+ * @return Job state.
+ */
public JobState state() {
return state;
}
+ /**
+ * Returns job create time.
+ *
+ * @return Job create time.
+ */
public Instant createTime() {
return createTime;
}
+ /**
+ * Returns job start time. {@code null} if the job has not started yet.
+ *
+ * @return Job start time. {@code null} if the job has not started yet.
+ */
@Nullable
public Instant startTime() {
return startTime;
}
+ /**
+ * Returns job finish time. {@code null} if the job has not finished yet.
+ *
+ * @return Job finish time. {@code null} if the job has not finished yet.
+ */
@Nullable
public Instant finishTime() {
return finishTime;
}
+ /**
+ * Returns a new builder with the same property values as this JobStatus.
+ *
+ * @return Builder.
+ */
public Builder toBuilder() {
return new Builder(this);
}
@@ -106,6 +141,9 @@ public class JobStatus implements Serializable {
@Nullable
private Instant finishTime;
+ /**
+ * Constructor.
+ */
public Builder() {
}
@@ -117,31 +155,66 @@ public class JobStatus implements Serializable {
this.finishTime = status.finishTime;
}
+ /**
+ * Sets job ID.
+ *
+ * @param id Job ID.
+ * @return This builder.
+ */
public Builder id(UUID id) {
this.id = id;
return this;
}
+ /**
+ * Sets job state.
+ *
+ * @param state Job state.
+ * @return This builder.
+ */
public Builder state(JobState state) {
this.state = state;
return this;
}
+ /**
+ * Sets job create time.
+ *
+ * @param createTime Job create time.
+ * @return This builder.
+ */
public Builder createTime(Instant createTime) {
this.createTime = createTime;
return this;
}
+ /**
+ * Sets job start time.
+ *
+ * @param startTime Job start time.
+ * @return This builder.
+ */
public Builder startTime(@Nullable Instant startTime) {
this.startTime = startTime;
return this;
}
+ /**
+ * Sets job finish time.
+ *
+ * @param finishTime Job finish time.
+ * @return This builder.
+ */
public Builder finishTime(@Nullable Instant finishTime) {
this.finishTime = finishTime;
return this;
}
+ /**
+ * Builds a new JobStatus.
+ *
+ * @return JobStatus.
+ */
public JobStatus build() {
return new JobStatus(this);
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeClusterAwarenessTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeClusterAwarenessTests.cs
index 7b1fede7cc..25ac8b6c99 100644
---
a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeClusterAwarenessTests.cs
+++
b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeClusterAwarenessTests.cs
@@ -45,14 +45,14 @@ namespace Apache.Ignite.Tests.Compute
using var client = await IgniteClient.StartAsync(clientCfg);
client.WaitForConnections(3);
- var res2 = await client.Compute.ExecuteAsync<string>(
+ IJobExecution<string> exec2 = await
client.Compute.ExecuteAsync<string>(
new[] { server2.Node }, Array.Empty<DeploymentUnit>(),
jobClassName: string.Empty);
- var res3 = await client.Compute.ExecuteAsync<string>(
+ IJobExecution<string> exec3 = await
client.Compute.ExecuteAsync<string>(
new[] { server3.Node }, Array.Empty<DeploymentUnit>(),
jobClassName: string.Empty);
- Assert.AreEqual("s2", res2);
- Assert.AreEqual("s3", res3);
+ Assert.AreEqual("s2", await exec2.GetResultAsync());
+ Assert.AreEqual("s3", await exec3.GetResultAsync());
Assert.AreEqual(ClientOp.ComputeExecute,
server2.ClientOps.Single());
Assert.AreEqual(ClientOp.ComputeExecute,
server3.ClientOps.Single());
@@ -69,14 +69,14 @@ namespace Apache.Ignite.Tests.Compute
using var client = await server1.ConnectClientAsync();
- var res2 = await client.Compute.ExecuteAsync<string>(
+ IJobExecution<string> exec2 = await
client.Compute.ExecuteAsync<string>(
new[] { server2.Node }, Array.Empty<DeploymentUnit>(),
jobClassName: string.Empty);
- var res3 = await client.Compute.ExecuteAsync<string>(
+ IJobExecution<string> exec3 = await
client.Compute.ExecuteAsync<string>(
new[] { server3.Node }, Array.Empty<DeploymentUnit>(),
jobClassName: string.Empty);
- Assert.AreEqual("s1", res2);
- Assert.AreEqual("s1", res3);
+ Assert.AreEqual("s1", await exec2.GetResultAsync());
+ Assert.AreEqual("s1", await exec3.GetResultAsync());
Assert.AreEqual(new[] { ClientOp.ComputeExecute,
ClientOp.ComputeExecute }, server1.ClientOps);
Assert.IsEmpty(server2.ClientOps);
@@ -105,9 +105,12 @@ namespace Apache.Ignite.Tests.Compute
for (int i = 0; i < 100; i++)
{
var node = i % 2 == 0 ? server1.Node : server2.Node;
- var res = await client.Compute.ExecuteAsync<string>(
+
+ IJobExecution<string> jobExecution = await
client.Compute.ExecuteAsync<string>(
new[] { node }, Array.Empty<DeploymentUnit>(),
jobClassName: string.Empty);
+ string res = await jobExecution.GetResultAsync();
+
nodeNames.Add(res);
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
index 145357c8b7..838265a57d 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
@@ -26,6 +26,7 @@ namespace Apache.Ignite.Tests.Compute
using System.Threading.Tasks;
using Ignite.Compute;
using Ignite.Table;
+ using Internal.Compute;
using Internal.Network;
using Internal.Proto;
using Network;
@@ -86,14 +87,15 @@ namespace Apache.Ignite.Tests.Compute
var res1 = await Client.Compute.ExecuteAsync<string>(await
GetNodeAsync(0), Units, NodeNameJob, "-", 11);
var res2 = await Client.Compute.ExecuteAsync<string>(await
GetNodeAsync(1), Units, NodeNameJob, ":", 22);
- Assert.AreEqual(PlatformTestNodeRunner + "-_11", res1);
- Assert.AreEqual(PlatformTestNodeRunner + "_2:_22", res2);
+ Assert.AreEqual(PlatformTestNodeRunner + "-_11", await
res1.GetResultAsync());
+ Assert.AreEqual(PlatformTestNodeRunner + "_2:_22", await
res2.GetResultAsync());
}
[Test]
public async Task TestExecuteOnRandomNode()
{
- var res = await Client.Compute.ExecuteAsync<string>(await
Client.GetClusterNodesAsync(), Units, NodeNameJob);
+ var jobExecution = await Client.Compute.ExecuteAsync<string>(await
Client.GetClusterNodesAsync(), Units, NodeNameJob);
+ var res = await jobExecution.GetResultAsync();
var expectedNodeNames = Enumerable.Range(1, 4)
.Select(x => x == 1 ? PlatformTestNodeRunner :
PlatformTestNodeRunner + "_" + x)
@@ -103,10 +105,10 @@ namespace Apache.Ignite.Tests.Compute
}
[Test]
- public void TestExecuteResultTypeMismatchThrowsInvalidCastException()
+ public async Task
TestExecuteResultTypeMismatchThrowsInvalidCastException()
{
- Assert.ThrowsAsync<InvalidCastException>(async () =>
- await Client.Compute.ExecuteAsync<Guid>(await
Client.GetClusterNodesAsync(), Units, NodeNameJob));
+ var jobExecution = await Client.Compute.ExecuteAsync<Guid>(await
Client.GetClusterNodesAsync(), Units, NodeNameJob);
+ Assert.ThrowsAsync<InvalidCastException>(async () => await
jobExecution.GetResultAsync());
}
[Test]
@@ -114,13 +116,13 @@ namespace Apache.Ignite.Tests.Compute
{
var nodes = await GetNodeAsync(0);
- IDictionary<IClusterNode, Task<string>> taskMap =
Client.Compute.BroadcastAsync<string>(nodes, Units, NodeNameJob, "123");
+ IDictionary<IClusterNode, Task<IJobExecution<string>>> taskMap =
Client.Compute.BroadcastAsync<string>(nodes, Units, NodeNameJob, "123");
var res = await taskMap[nodes[0]];
Assert.AreEqual(1, taskMap.Count);
Assert.AreSame(nodes[0], taskMap.Keys.Single());
- Assert.AreEqual(PlatformTestNodeRunner + "123", res);
+ Assert.AreEqual(PlatformTestNodeRunner + "123", await
res.GetResultAsync());
}
[Test]
@@ -128,7 +130,7 @@ namespace Apache.Ignite.Tests.Compute
{
var nodes = await Client.GetClusterNodesAsync();
- IDictionary<IClusterNode, Task<string>> taskMap =
Client.Compute.BroadcastAsync<string>(nodes, Units, NodeNameJob, "123");
+ IDictionary<IClusterNode, Task<IJobExecution<string>>> taskMap =
Client.Compute.BroadcastAsync<string>(nodes, Units, NodeNameJob, "123");
var res1 = await taskMap[nodes[0]];
var res2 = await taskMap[nodes[1]];
var res3 = await taskMap[nodes[2]];
@@ -136,10 +138,10 @@ namespace Apache.Ignite.Tests.Compute
Assert.AreEqual(4, taskMap.Count);
- Assert.AreEqual(nodes[0].Name + "123", res1);
- Assert.AreEqual(nodes[1].Name + "123", res2);
- Assert.AreEqual(nodes[2].Name + "123", res3);
- Assert.AreEqual(nodes[3].Name + "123", res4);
+ Assert.AreEqual(nodes[0].Name + "123", await
res1.GetResultAsync());
+ Assert.AreEqual(nodes[1].Name + "123", await
res2.GetResultAsync());
+ Assert.AreEqual(nodes[2].Name + "123", await
res3.GetResultAsync());
+ Assert.AreEqual(nodes[3].Name + "123", await
res4.GetResultAsync());
}
[Test]
@@ -147,7 +149,7 @@ namespace Apache.Ignite.Tests.Compute
{
var res = await Client.Compute.ExecuteAsync<string>(await
Client.GetClusterNodesAsync(), Units, ConcatJob, 1.1, Guid.Empty, "3", null);
- Assert.AreEqual("1.1_00000000-0000-0000-0000-000000000000_3_null",
res);
+ Assert.AreEqual("1.1_00000000-0000-0000-0000-000000000000_3_null",
await res.GetResultAsync());
}
[Test]
@@ -155,14 +157,14 @@ namespace Apache.Ignite.Tests.Compute
{
var res = await Client.Compute.ExecuteAsync<string>(await
Client.GetClusterNodesAsync(), Units, ConcatJob, args: null);
- Assert.IsNull(res);
+ Assert.IsNull(await res.GetResultAsync());
}
[Test]
- public void TestJobErrorPropagatesToClientWithClassAndMessage()
+ public async Task TestJobErrorPropagatesToClientWithClassAndMessage()
{
- var ex = Assert.ThrowsAsync<IgniteException>(async () =>
- await Client.Compute.ExecuteAsync<string>(await
Client.GetClusterNodesAsync(), Units, ErrorJob, "unused"));
+ var jobExecution = await Client.Compute.ExecuteAsync<string>(await
Client.GetClusterNodesAsync(), Units, ErrorJob, "unused");
+ var ex = Assert.ThrowsAsync<IgniteException>(async () => await
jobExecution.GetResultAsync());
StringAssert.Contains("Custom job error", ex!.Message);
@@ -230,7 +232,8 @@ namespace Apache.Ignite.Tests.Compute
{
var nodes = await Client.GetClusterNodesAsync();
var str = expectedStr ?? val.ToString()!.Replace("E+", "E");
- var res = await Client.Compute.ExecuteAsync<object>(nodes,
Units, EchoJob, val, str);
+ IJobExecution<object> resExec = await
Client.Compute.ExecuteAsync<object>(nodes, Units, EchoJob, val, str);
+ object res = await resExec.GetResultAsync();
Assert.AreEqual(val, res);
}
@@ -268,9 +271,9 @@ namespace Apache.Ignite.Tests.Compute
var nodeName = nodeIdx == 1 ? string.Empty : "_" + nodeIdx;
var expectedNodeName = PlatformTestNodeRunner + nodeName;
- Assert.AreEqual(expectedNodeName, resNodeName);
- Assert.AreEqual(expectedNodeName, resNodeName2);
- Assert.AreEqual(expectedNodeName, resNodeName3);
+ Assert.AreEqual(expectedNodeName, await
resNodeName.GetResultAsync());
+ Assert.AreEqual(expectedNodeName, await
resNodeName2.GetResultAsync());
+ Assert.AreEqual(expectedNodeName, await
resNodeName3.GetResultAsync());
// We only connect to 2 of 4 nodes because of different auth
settings.
if (nodeIdx < 3)
@@ -306,17 +309,22 @@ namespace Apache.Ignite.Tests.Compute
{
// Create table and use it in ExecuteColocated.
var nodes = await GetNodeAsync(0);
- var tableName = await Client.Compute.ExecuteAsync<string>(nodes,
Units, CreateTableJob, "drop_me");
+ var tableNameExec = await
Client.Compute.ExecuteAsync<string>(nodes, Units, CreateTableJob, "drop_me");
+ var tableName = await tableNameExec.GetResultAsync();
try
{
var keyTuple = new IgniteTuple { [KeyCol] = 1L };
- var resNodeName = await
Client.Compute.ExecuteColocatedAsync<string>(tableName, keyTuple, Units,
NodeNameJob);
+ var resNodeNameExec = await
Client.Compute.ExecuteColocatedAsync<string>(tableName, keyTuple, Units,
NodeNameJob);
+ var resNodeName = await resNodeNameExec.GetResultAsync();
// Drop table and create a new one with a different ID, then
execute a computation again.
// This should update the cached table and complete the
computation successfully.
- await Client.Compute.ExecuteAsync<string>(nodes, Units,
DropTableJob, tableName);
- await Client.Compute.ExecuteAsync<string>(nodes, Units,
CreateTableJob, tableName);
+ var dropExec = await
Client.Compute.ExecuteAsync<string>(nodes, Units, DropTableJob, tableName);
+ await dropExec.GetResultAsync();
+
+ var createExec = await
Client.Compute.ExecuteAsync<string>(nodes, Units, CreateTableJob, tableName);
+ await createExec.GetResultAsync();
if (forceLoadAssignment)
{
@@ -324,21 +332,22 @@ namespace Apache.Ignite.Tests.Compute
table.SetFieldValue("_partitionAssignment", null);
}
- var resNodeName2 = await
Client.Compute.ExecuteColocatedAsync<string>(tableName, keyTuple, Units,
NodeNameJob);
+ var resNodeName2Exec = await
Client.Compute.ExecuteColocatedAsync<string>(tableName, keyTuple, Units,
NodeNameJob);
+ var resNodeName2 = await resNodeName2Exec.GetResultAsync();
Assert.AreEqual(resNodeName, resNodeName2);
}
finally
{
- await Client.Compute.ExecuteAsync<string>(nodes, Units,
DropTableJob, tableName);
+ await (await Client.Compute.ExecuteAsync<string>(nodes, Units,
DropTableJob, tableName)).GetResultAsync();
}
}
[Test]
- public void
TestExceptionInJobWithSendServerExceptionStackTraceToClientPropagatesToClientWithStackTrace()
+ public async Task
TestExceptionInJobWithSendServerExceptionStackTraceToClientPropagatesToClientWithStackTrace()
{
- var ex = Assert.ThrowsAsync<IgniteException>(async () =>
- await Client.Compute.ExecuteAsync<object>(await
GetNodeAsync(1), Units, ExceptionJob, "foo-bar"));
+ var jobExecution = await Client.Compute.ExecuteAsync<object>(await
GetNodeAsync(1), Units, ExceptionJob, "foo-bar");
+ var ex = Assert.ThrowsAsync<IgniteException>(async () => await
jobExecution.GetResultAsync());
Assert.AreEqual("Test exception: foo-bar", ex!.Message);
Assert.IsNotNull(ex.InnerException);
@@ -353,10 +362,10 @@ namespace Apache.Ignite.Tests.Compute
}
[Test]
- public void TestCheckedExceptionInJobPropagatesToClient()
+ public async Task TestCheckedExceptionInJobPropagatesToClient()
{
- var ex = Assert.ThrowsAsync<IgniteException>(async () =>
- await Client.Compute.ExecuteAsync<object>(await
GetNodeAsync(1), Units, CheckedExceptionJob, "foo-bar"));
+ var jobExecution = await Client.Compute.ExecuteAsync<object>(await
GetNodeAsync(1), Units, CheckedExceptionJob, "foo-bar");
+ var ex = Assert.ThrowsAsync<IgniteException>(async () => await
jobExecution.GetResultAsync());
Assert.AreEqual("TestCheckedEx: foo-bar", ex!.Message);
Assert.IsNotNull(ex.InnerException);
@@ -377,18 +386,18 @@ namespace Apache.Ignite.Tests.Compute
using var client = await server.ConnectClientAsync();
var res = await client.Compute.ExecuteAsync<string>(await
GetNodeAsync(1), units, FakeServer.GetDetailsJob);
- StringAssert.Contains("Units = unit-latest|latest, unit1|1.0.0",
res);
+ StringAssert.Contains("Units = unit-latest|latest, unit1|1.0.0",
await res.GetResultAsync());
// Lazy enumerable.
var res2 = await client.Compute.ExecuteAsync<string>(await
GetNodeAsync(1), units.Reverse(), FakeServer.GetDetailsJob);
- StringAssert.Contains("Units = unit1|1.0.0, unit-latest|latest",
res2);
+ StringAssert.Contains("Units = unit1|1.0.0, unit-latest|latest",
await res2.GetResultAsync());
// Colocated.
var keyTuple = new IgniteTuple { ["ID"] = 1 };
var res3 = await client.Compute.ExecuteColocatedAsync<string>(
FakeServer.ExistingTableName, keyTuple, units,
FakeServer.GetDetailsJob);
- StringAssert.Contains("Units = unit-latest|latest, unit1|1.0.0",
res3);
+ StringAssert.Contains("Units = unit-latest|latest, unit1|1.0.0",
await res3.GetResultAsync());
}
[Test]
@@ -466,7 +475,8 @@ namespace Apache.Ignite.Tests.Compute
using var client = await IgniteClient.StartAsync(GetConfig());
const int sleepMs = 3000;
- var jobTask = client.Compute.ExecuteAsync<string>(await
GetNodeAsync(1), Units, SleepJob, sleepMs);
+ var jobExecution = await client.Compute.ExecuteAsync<string>(await
GetNodeAsync(1), Units, SleepJob, sleepMs);
+ var jobTask = jobExecution.GetResultAsync();
// Wait a bit and close the connection.
await Task.Delay(10);
@@ -476,6 +486,93 @@ namespace Apache.Ignite.Tests.Compute
Assert.AreEqual("Connection closed.", ex!.Message);
}
+ [Test]
+ public async Task TestJobExecutionStatusExecuting()
+ {
+ const int sleepMs = 3000;
+ var beforeStart = SystemClock.Instance.GetCurrentInstant();
+
+ var jobExecution = await Client.Compute.ExecuteAsync<string>(await
GetNodeAsync(1), Units, SleepJob, sleepMs);
+
+ await AssertJobStatus(jobExecution, JobState.Executing,
beforeStart);
+ }
+
+ [Test]
+ public async Task TestJobExecutionStatusCompleted()
+ {
+ const int sleepMs = 1;
+ var beforeStart = SystemClock.Instance.GetCurrentInstant();
+
+ var jobExecution = await Client.Compute.ExecuteAsync<string>(await
GetNodeAsync(1), Units, SleepJob, sleepMs);
+ await jobExecution.GetResultAsync();
+
+ await AssertJobStatus(jobExecution, JobState.Completed,
beforeStart);
+ }
+
+ [Test]
+ public async Task TestJobExecutionStatusFailed()
+ {
+ var beforeStart = SystemClock.Instance.GetCurrentInstant();
+
+ var jobExecution = await Client.Compute.ExecuteAsync<string>(await
GetNodeAsync(1), Units, ErrorJob, "unused");
+ Assert.CatchAsync(async () => await jobExecution.GetResultAsync());
+
+ await AssertJobStatus(jobExecution, JobState.Failed, beforeStart);
+ }
+
+ [Test]
+ public async Task TestJobExecutionStatusNull()
+ {
+ var fakeJobExecution = new JobExecution<int>(
+ Guid.NewGuid(), Task.FromException<(int, JobStatus)>(new
Exception("x")), (Compute)Client.Compute);
+
+ var status = await fakeJobExecution.GetStatusAsync();
+
+ Assert.IsNull(status);
+ }
+
+ [Test]
+ public async Task TestJobExecutionCancel()
+ {
+ const int sleepMs = 5000;
+ var beforeStart = SystemClock.Instance.GetCurrentInstant();
+
+ var jobExecution = await Client.Compute.ExecuteAsync<string>(await
GetNodeAsync(1), Units, SleepJob, sleepMs);
+ await jobExecution.CancelAsync();
+
+ await AssertJobStatus(jobExecution, JobState.Canceled,
beforeStart);
+ }
+
+ [Test]
+ public async Task TestChangePriority()
+ {
+ var jobExecution = await Client.Compute.ExecuteAsync<string>(await
GetNodeAsync(1), Units, SleepJob, 5000);
+ var res = await jobExecution.ChangePriorityAsync(123);
+
+ // Job exists, but is already executing.
+ Assert.IsFalse(res);
+ }
+
+ private static async Task AssertJobStatus<T>(IJobExecution<T>
jobExecution, JobState state, Instant beforeStart)
+ {
+ JobStatus? status = await jobExecution.GetStatusAsync();
+
+ Assert.IsNotNull(status);
+ Assert.AreEqual(jobExecution.Id, status!.Id);
+ Assert.AreEqual(state, status.State);
+ Assert.Greater(status.CreateTime, beforeStart);
+ Assert.Greater(status.StartTime, status.CreateTime);
+
+ if (state is JobState.Canceled or JobState.Completed or
JobState.Failed)
+ {
+ Assert.Greater(status.FinishTime, status.StartTime);
+ }
+ else
+ {
+ Assert.IsNull(status.FinishTime);
+ }
+ }
+
private async Task<List<IClusterNode>> GetNodeAsync(int index) =>
(await Client.GetClusterNodesAsync()).OrderBy(n =>
n.Name).Skip(index).Take(1).ToList();
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
index 493a6ab897..dc1a974633 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
@@ -299,7 +299,19 @@ namespace Apache.Ignite.Tests
case ClientOp.ComputeExecuteColocated:
{
using var pooledArrayBuffer = ComputeExecute(reader,
colocated: opCode == ClientOp.ComputeExecuteColocated);
- Send(handler, requestId, ReadOnlyMemory<byte>.Empty);
+
+ using var resWriter = new PooledArrayBuffer();
+
+ var rw = resWriter.MessageWriter;
+ if (opCode == ClientOp.ComputeExecuteColocated)
+ {
+ // Schema version.
+ rw.Write(1);
+ }
+
+ rw.Write(Guid.NewGuid());
+
+ Send(handler, requestId, resWriter);
Send(handler, requestId, pooledArrayBuffer,
isNotification: true);
continue;
}
@@ -656,6 +668,14 @@ namespace Apache.Ignite.Tests
writer.Write(builder.Build().Span);
+ // Status
+ writer.Write(Guid.NewGuid());
+ writer.Write(0); // State.
+ writer.Write(0L); // Create time.
+ writer.Write(0);
+ writer.WriteNil(); // Start time.
+ writer.WriteNil(); // Finish time.
+
return arrayBufferWriter;
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessRealClusterTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessRealClusterTests.cs
index cba89ada6a..2c24bb3ebb 100644
---
a/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessRealClusterTests.cs
+++
b/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessRealClusterTests.cs
@@ -50,12 +50,14 @@ public class PartitionAwarenessRealClusterTests :
IgniteTestsBase
{
var keyTuple = new IgniteTuple { ["KEY"] = key };
- var primaryNodeName = await
client.Compute.ExecuteColocatedAsync<string>(
+ var primaryNodeNameExec = await
client.Compute.ExecuteColocatedAsync<string>(
TableName,
keyTuple,
Array.Empty<DeploymentUnit>(),
ComputeTests.NodeNameJob);
+ var primaryNodeName = await primaryNodeNameExec.GetResultAsync();
+
if (primaryNodeName.EndsWith("_3", StringComparison.Ordinal) ||
primaryNodeName.EndsWith("_4", StringComparison.Ordinal))
{
// Skip nodes without direct client connection.
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/ColocationHashTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/ColocationHashTests.cs
index 4539084c7c..3e0d6282b4 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/ColocationHashTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/ColocationHashTests.cs
@@ -194,13 +194,15 @@ public class ColocationHashTests : IgniteTestsBase
using var writer = ProtoCommon.GetMessageWriter();
var clientColocationHash = ser.Write(writer, null, schema, key);
- var serverColocationHash = await Client.Compute.ExecuteAsync<int>(
+ var serverColocationHashExec = await
Client.Compute.ExecuteAsync<int>(
clusterNodes,
Array.Empty<DeploymentUnit>(),
TableRowColocationHashJob,
tableName,
i);
+ var serverColocationHash = await
serverColocationHashExec.GetResultAsync();
+
Assert.AreEqual(serverColocationHash, clientColocationHash,
key.ToString());
}
}
@@ -327,7 +329,7 @@ public class ColocationHashTests : IgniteTestsBase
{
var nodes = await Client.GetClusterNodesAsync();
- return await Client.Compute.ExecuteAsync<int>(
+ IJobExecution<int> jobExecution = await
Client.Compute.ExecuteAsync<int>(
nodes,
Array.Empty<DeploymentUnit>(),
ColocationHashJob,
@@ -335,6 +337,8 @@ public class ColocationHashTests : IgniteTestsBase
bytes,
timePrecision,
timestampPrecision);
+
+ return await jobExecution.GetResultAsync();
}
private record TestIndexProvider(Func<int, int> ColumnOrderDelegate, int
HashedColumnCount) : IHashedColumnIndexProvider
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
index b5ff24bccf..f164946556 100644
---
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
+++
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
@@ -292,8 +292,11 @@ public class SchemaSynchronizationTest : IgniteTestsBase
break;
case TestMode.Compute:
- await Client.Compute.ExecuteColocatedAsync<string, Poco>(
+ var jobExecution = await
Client.Compute.ExecuteColocatedAsync<string, Poco>(
table.Name, new Poco(1, "foo"),
Array.Empty<DeploymentUnit>(), ComputeTests.NodeNameJob);
+
+ await jobExecution.GetResultAsync();
+
break;
default:
diff --git a/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs
b/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs
index 7e26b024f8..78f7c130b2 100644
--- a/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs
@@ -129,6 +129,21 @@ namespace Apache.Ignite
/// <summary>
/// SQL script (<see cref="ISql.ExecuteScriptAsync"/>).
/// </summary>
- SqlExecuteScript
+ SqlExecuteScript,
+
+ /// <summary>
+ /// Get status of a compute job (<see
cref="IJobExecution{T}.GetStatusAsync"/>).
+ /// </summary>
+ ComputeGetStatus,
+
+ /// <summary>
+ /// Cancel compute job (<see cref="IJobExecution{T}.CancelAsync"/>).
+ /// </summary>
+ ComputeCancel,
+
+ /// <summary>
+ /// Change compute job priority (<see
cref="IJobExecution{T}.ChangePriorityAsync"/>).
+ /// </summary>
+ ComputeChangePriority
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Compute/ICompute.cs
b/modules/platforms/dotnet/Apache.Ignite/Compute/ICompute.cs
index 5175b475cf..d8b52db983 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Compute/ICompute.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Compute/ICompute.cs
@@ -36,7 +36,7 @@ public interface ICompute
/// <param name="args">Job arguments.</param>
/// <typeparam name="T">Job result type.</typeparam>
/// <returns>A <see cref="Task"/> representing the asynchronous
operation.</returns>
- Task<T> ExecuteAsync<T>(
+ Task<IJobExecution<T>> ExecuteAsync<T>(
IEnumerable<IClusterNode> nodes,
IEnumerable<DeploymentUnit> units,
string jobClassName,
@@ -52,7 +52,7 @@ public interface ICompute
/// <param name="args">Job arguments.</param>
/// <typeparam name="T">Job result type.</typeparam>
/// <returns>A <see cref="Task"/> representing the asynchronous
operation.</returns>
- Task<T> ExecuteColocatedAsync<T>(
+ Task<IJobExecution<T>> ExecuteColocatedAsync<T>(
string tableName,
IIgniteTuple key,
IEnumerable<DeploymentUnit> units,
@@ -70,7 +70,7 @@ public interface ICompute
/// <typeparam name="T">Job result type.</typeparam>
/// <typeparam name="TKey">Key type.</typeparam>
/// <returns>A <see cref="Task"/> representing the asynchronous
operation.</returns>
- Task<T> ExecuteColocatedAsync<T, TKey>(
+ Task<IJobExecution<T>> ExecuteColocatedAsync<T, TKey>(
string tableName,
TKey key,
IEnumerable<DeploymentUnit> units,
@@ -87,7 +87,7 @@ public interface ICompute
/// <param name="args">Job arguments.</param>
/// <typeparam name="T">Job result type.</typeparam>
/// <returns>A map of <see cref="Task"/> representing the asynchronous
operation for every node.</returns>
- IDictionary<IClusterNode, Task<T>> BroadcastAsync<T>(
+ IDictionary<IClusterNode, Task<IJobExecution<T>>> BroadcastAsync<T>(
IEnumerable<IClusterNode> nodes,
IEnumerable<DeploymentUnit> units,
string jobClassName,
diff --git a/modules/platforms/dotnet/Apache.Ignite/Compute/IJobExecution.cs
b/modules/platforms/dotnet/Apache.Ignite/Compute/IJobExecution.cs
new file mode 100644
index 0000000000..5d050a1c2a
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite/Compute/IJobExecution.cs
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Compute;
+
+using System;
+using System.Threading.Tasks;
+
+/// <summary>
+/// Job control object, provides information about the job execution process
and result, allows cancelling the job.
+/// </summary>
+/// <typeparam name="T">Job result type.</typeparam>
+public interface IJobExecution<T>
+{
+ /// <summary>
+ /// Gets the job ID.
+ /// </summary>
+ Guid Id { get; }
+
+ /// <summary>
+ /// Gets the job execution result.
+ /// </summary>
+ /// <returns>Job execution result.</returns>
+ Task<T> GetResultAsync();
+
+ /// <summary>
+ /// Gets the job execution status. Can be <c>null</c> if the job status no
longer exists due to exceeding the retention time limit.
+ /// </summary>
+ /// <returns>
+ /// Job execution status. Can be <c>null</c> if the job status no longer
exists due to exceeding the retention time limit.
+ /// </returns>
+ Task<JobStatus?> GetStatusAsync();
+
+ /// <summary>
+ /// Cancels the job execution.
+ /// </summary>
+ /// <returns>
+ /// Returns <c>true</c> if the job was successfully cancelled,
<c>false</c> if the job has already finished,
+ /// <c>null</c> if the job was not found (no longer exists due to
exceeding the retention time limit).
+ /// </returns>
+ Task<bool?> CancelAsync();
+
+ /// <summary>
+ /// Changes the job priority. After priority change the job will be the
last in the queue of jobs with the same priority.
+ /// </summary>
+ /// <param name="priority">New priority.</param>
+ /// <returns>
+ /// Returns <c>true</c> if the priority was successfully changed,
+ /// <c>false</c> when the priority couldn't be changed (job is already
executing or completed),
+ /// <c>null</c> if the job was not found (no longer exists due to
exceeding the retention time limit).
+ /// </returns>
+ Task<bool?> ChangePriorityAsync(int priority);
+}
diff --git a/modules/api/src/main/java/org/apache/ignite/compute/JobState.java
b/modules/platforms/dotnet/Apache.Ignite/Compute/JobState.cs
similarity index 51%
copy from modules/api/src/main/java/org/apache/ignite/compute/JobState.java
copy to modules/platforms/dotnet/Apache.Ignite/Compute/JobState.cs
index 4c7d18a4cc..ada84bf8e6 100644
--- a/modules/api/src/main/java/org/apache/ignite/compute/JobState.java
+++ b/modules/platforms/dotnet/Apache.Ignite/Compute/JobState.cs
@@ -15,39 +15,40 @@
* limitations under the License.
*/
-package org.apache.ignite.compute;
+namespace Apache.Ignite.Compute;
-/**
- * Compute job's state enum.
- */
-public enum JobState {
- /**
- * The job is submitted and waiting for an execution start.
- */
- QUEUED,
+/// <summary>
+/// Compute job state.
+/// </summary>
+public enum JobState
+{
+ /// <summary>
+ /// The job is submitted and waiting for an execution start.
+ /// </summary>
+ Queued,
- /**
- * The job is being executed.
- */
- EXECUTING,
+ /// <summary>
+ /// The job is being executed.
+ /// </summary>
+ Executing,
- /**
- * The job was unexpectedly terminated during execution.
- */
- FAILED,
+ /// <summary>
+ /// The job was unexpectedly terminated during execution.
+ /// </summary>
+ Failed,
- /**
- * The job was executed successfully and the execution result was returned.
- */
- COMPLETED,
+ /// <summary>
+ /// The job was executed successfully and the execution result was
returned.
+ /// </summary>
+ Completed,
- /**
- * The job has received the cancel command, but it is still running.
- */
- CANCELING,
+ /// <summary>
+ /// The job has received the cancel command, but is still running.
+ /// </summary>
+ Canceling,
- /**
- * The job was successfully cancelled.
- */
- CANCELED;
+ /// <summary>
+ /// The job was successfully cancelled.
+ /// </summary>
+ Canceled
}
diff --git a/modules/api/src/main/java/org/apache/ignite/compute/JobState.java
b/modules/platforms/dotnet/Apache.Ignite/Compute/JobStatus.cs
similarity index 55%
copy from modules/api/src/main/java/org/apache/ignite/compute/JobState.java
copy to modules/platforms/dotnet/Apache.Ignite/Compute/JobStatus.cs
index 4c7d18a4cc..b5bb9232d8 100644
--- a/modules/api/src/main/java/org/apache/ignite/compute/JobState.java
+++ b/modules/platforms/dotnet/Apache.Ignite/Compute/JobStatus.cs
@@ -15,39 +15,22 @@
* limitations under the License.
*/
-package org.apache.ignite.compute;
+namespace Apache.Ignite.Compute;
-/**
- * Compute job's state enum.
- */
-public enum JobState {
- /**
- * The job is submitted and waiting for an execution start.
- */
- QUEUED,
-
- /**
- * The job is being executed.
- */
- EXECUTING,
-
- /**
- * The job was unexpectedly terminated during execution.
- */
- FAILED,
-
- /**
- * The job was executed successfully and the execution result was returned.
- */
- COMPLETED,
-
- /**
- * The job has received the cancel command, but it is still running.
- */
- CANCELING,
+using System;
+using NodaTime;
- /**
- * The job was successfully cancelled.
- */
- CANCELED;
-}
+/// <summary>
+/// Compute job status.
+/// </summary>
+/// <param name="Id">Job ID.</param>
+/// <param name="State">State.</param>
+/// <param name="CreateTime">Create time.</param>
+/// <param name="StartTime">Start time (<c>null</c> when not yet
started).</param>
+/// <param name="FinishTime">Finish time (<c>null</c> when not yet
finished).</param>
+public sealed record JobStatus(
+ Guid Id,
+ JobState State,
+ Instant CreateTime,
+ Instant? StartTime,
+ Instant? FinishTime);
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
index 5d938590d0..208e4ee04d 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
@@ -60,7 +60,7 @@ namespace Apache.Ignite.Internal.Compute
}
/// <inheritdoc/>
- public async Task<T> ExecuteAsync<T>(
+ public async Task<IJobExecution<T>> ExecuteAsync<T>(
IEnumerable<IClusterNode> nodes,
IEnumerable<DeploymentUnit> units,
string jobClassName,
@@ -76,7 +76,7 @@ namespace Apache.Ignite.Internal.Compute
}
/// <inheritdoc/>
- public async Task<T> ExecuteColocatedAsync<T>(
+ public async Task<IJobExecution<T>> ExecuteColocatedAsync<T>(
string tableName,
IIgniteTuple key,
IEnumerable<DeploymentUnit> units,
@@ -92,7 +92,7 @@ namespace Apache.Ignite.Internal.Compute
.ConfigureAwait(false);
/// <inheritdoc/>
- public async Task<T> ExecuteColocatedAsync<T, TKey>(
+ public async Task<IJobExecution<T>> ExecuteColocatedAsync<T, TKey>(
string tableName,
TKey key,
IEnumerable<DeploymentUnit> units,
@@ -109,7 +109,7 @@ namespace Apache.Ignite.Internal.Compute
.ConfigureAwait(false);
/// <inheritdoc/>
- public IDictionary<IClusterNode, Task<T>> BroadcastAsync<T>(
+ public IDictionary<IClusterNode, Task<IJobExecution<T>>>
BroadcastAsync<T>(
IEnumerable<IClusterNode> nodes,
IEnumerable<DeploymentUnit> units,
string jobClassName,
@@ -119,12 +119,12 @@ namespace Apache.Ignite.Internal.Compute
IgniteArgumentCheck.NotNull(jobClassName);
IgniteArgumentCheck.NotNull(units);
- var res = new Dictionary<IClusterNode, Task<T>>();
+ var res = new Dictionary<IClusterNode, Task<IJobExecution<T>>>();
var units0 = units as ICollection<DeploymentUnit> ??
units.ToList(); // Avoid multiple enumeration.
foreach (var node in nodes)
{
- var task = ExecuteOnNodes<T>(new[] { node }, units0,
jobClassName, args);
+ Task<IJobExecution<T>> task = ExecuteOnNodes<T>(new[] { node
}, units0, jobClassName, args);
res[node] = task;
}
@@ -135,6 +135,59 @@ namespace Apache.Ignite.Internal.Compute
/// <inheritdoc/>
public override string ToString() =>
IgniteToStringBuilder.Build(GetType());
+ /// <summary>
+ /// Gets the job status.
+ /// </summary>
+ /// <param name="jobId">Job ID.</param>
+ /// <returns>Status.</returns>
+ internal async Task<JobStatus?> GetJobStatusAsync(Guid jobId)
+ {
+ using var writer = ProtoCommon.GetMessageWriter();
+ writer.MessageWriter.Write(jobId);
+
+ using var res = await
_socket.DoOutInOpAsync(ClientOp.ComputeGetStatus, writer).ConfigureAwait(false);
+ return Read(res.GetReader());
+
+ JobStatus? Read(MsgPackReader reader) => reader.TryReadNil() ?
null : ReadJobStatus(reader);
+ }
+
+ /// <summary>
+ /// Cancels the job.
+ /// </summary>
+ /// <param name="jobId">Job id.</param>
+ /// <returns>
+ /// <c>true</c> when the job is cancelled, <c>false</c> when the job
couldn't be cancelled
+ /// (either it's not yet started, or it's already completed), or <c>
null</c> if there's no job with the specified id.
+ /// </returns>
+ internal async Task<bool?> CancelJobAsync(Guid jobId)
+ {
+ using var writer = ProtoCommon.GetMessageWriter();
+ writer.MessageWriter.Write(jobId);
+
+ using var res = await
_socket.DoOutInOpAsync(ClientOp.ComputeCancel, writer).ConfigureAwait(false);
+ return res.GetReader().ReadBooleanNullable();
+ }
+
+ /// <summary>
+ /// Changes the job priority. After priority change the job will be
the last in the queue of jobs with the same priority.
+ /// </summary>
+ /// <param name="jobId">Job id.</param>
+ /// <param name="priority">New priority.</param>
+ /// <returns>
+ /// Returns <c>true</c> if the priority was successfully changed,
+ /// <c>false</c> when the priority couldn't be changed (job is already
executing or completed),
+ /// <c>null</c> if the job was not found (no longer exists due to
exceeding the retention time limit).
+ /// </returns>
+ internal async Task<bool?> ChangeJobPriorityAsync(Guid jobId, int
priority)
+ {
+ using var writer = ProtoCommon.GetMessageWriter();
+ writer.MessageWriter.Write(jobId);
+ writer.MessageWriter.Write(priority);
+
+ using var res = await
_socket.DoOutInOpAsync(ClientOp.ComputeChangePriority,
writer).ConfigureAwait(false);
+ return res.GetReader().ReadBooleanNullable();
+ }
+
[SuppressMessage("Security", "CA5394:Do not use insecure randomness",
Justification = "Secure random is not required here.")]
private static IClusterNode GetRandomNode(ICollection<IClusterNode>
nodes)
{
@@ -198,7 +251,47 @@ namespace Apache.Ignite.Internal.Compute
});
}
- private async Task<T> ExecuteOnNodes<T>(
+ private static JobStatus ReadJobStatus(MsgPackReader reader)
+ {
+ var id = reader.ReadGuid();
+ var state = (JobState)reader.ReadInt32();
+ var createTime = reader.ReadInstantNullable();
+ var startTime = reader.ReadInstantNullable();
+ var endTime = reader.ReadInstantNullable();
+
+ return new JobStatus(id, state, createTime.GetValueOrDefault(),
startTime, endTime);
+ }
+
+ private IJobExecution<T> GetJobExecution<T>(PooledBuffer
computeExecuteResult, bool readSchema)
+ {
+ var reader = computeExecuteResult.GetReader();
+
+ if (readSchema)
+ {
+ _ = reader.ReadInt32();
+ }
+
+ var jobId = reader.ReadGuid();
+ var resultTask =
GetResult((NotificationHandler)computeExecuteResult.Metadata!);
+
+ return new JobExecution<T>(jobId, resultTask, this);
+
+ static async Task<(T, JobStatus)> GetResult(NotificationHandler
handler)
+ {
+ using var notificationRes = await
handler.Task.ConfigureAwait(false);
+ return Read(notificationRes.GetReader());
+ }
+
+ static (T, JobStatus) Read(MsgPackReader reader)
+ {
+ var res = (T)reader.ReadObjectFromBinaryTuple()!;
+ var status = ReadJobStatus(reader);
+
+ return (res, status);
+ }
+ }
+
+ private async Task<IJobExecution<T>> ExecuteOnNodes<T>(
ICollection<IClusterNode> nodes,
IEnumerable<DeploymentUnit> units,
string jobClassName,
@@ -213,9 +306,7 @@ namespace Apache.Ignite.Internal.Compute
ClientOp.ComputeExecute, writer,
PreferredNode.FromName(node.Name), expectNotifications: true)
.ConfigureAwait(false);
- var notificationHandler = (NotificationHandler)res.Metadata!;
- using var notificationRes = await
notificationHandler.Task.ConfigureAwait(false);
- return Read(notificationRes);
+ return GetJobExecution<T>(res, readSchema: false);
void Write()
{
@@ -231,13 +322,6 @@ namespace Apache.Ignite.Internal.Compute
w.WriteObjectCollectionAsBinaryTuple(args);
}
-
- static T Read(in PooledBuffer buf)
- {
- var reader = buf.GetReader();
-
- return (T)reader.ReadObjectFromBinaryTuple()!;
- }
}
private async Task<Table> GetTableAsync(string tableName)
@@ -261,7 +345,7 @@ namespace Apache.Ignite.Internal.Compute
}
[SuppressMessage("Maintainability", "CA1508:Avoid dead conditional
code", Justification = "False positive")]
- private async Task<T> ExecuteColocatedAsync<T, TKey>(
+ private async Task<IJobExecution<T>> ExecuteColocatedAsync<T, TKey>(
string tableName,
TKey key,
Func<Table, IRecordSerializerHandler<TKey>> serializerHandlerFunc,
@@ -292,9 +376,7 @@ namespace Apache.Ignite.Internal.Compute
ClientOp.ComputeExecuteColocated, bufferWriter,
preferredNode, expectNotifications: true)
.ConfigureAwait(false);
- var notificationHandler =
(NotificationHandler)res.Metadata!;
- using var notificationRes = await
notificationHandler.Task.ConfigureAwait(false);
- return Read(notificationRes);
+ return GetJobExecution<T>(res, readSchema: true);
}
catch (IgniteException e) when (e.Code ==
ErrorGroups.Client.TableIdNotFound)
{
@@ -336,11 +418,6 @@ namespace Apache.Ignite.Internal.Compute
return colocationHash;
}
-
- static T Read(in PooledBuffer buf)
- {
- return (T)buf.GetReader().ReadObjectFromBinaryTuple()!;
- }
}
}
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/JobExecution.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/JobExecution.cs
new file mode 100644
index 0000000000..441f7068cd
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/JobExecution.cs
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Internal.Compute;
+
+using System;
+using System.Threading.Tasks;
+using Ignite.Compute;
+
+/// <summary>
+/// Job execution.
+/// </summary>
+/// <typeparam name="T">Job result type.</typeparam>
+internal sealed record JobExecution<T> : IJobExecution<T>
+{
+ private readonly Task<(T Result, JobStatus Status)> _resultTask;
+
+ private readonly Compute _compute;
+
+ private volatile JobStatus? _finalStatus;
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="JobExecution{T}"/> class.
+ /// </summary>
+ /// <param name="id">Job id.</param>
+ /// <param name="resultTask">Result task.</param>
+ /// <param name="compute">Compute.</param>
+ public JobExecution(Guid id, Task<(T Result, JobStatus Status)>
resultTask, Compute compute)
+ {
+ Id = id;
+ _resultTask = resultTask;
+ _compute = compute;
+
+ // Wait for completion in background and cache the status.
+ _ = CacheStatusOnCompletion();
+ }
+
+ /// <inheritdoc/>
+ public Guid Id { get; }
+
+ /// <inheritdoc/>
+ public async Task<T> GetResultAsync()
+ {
+ var (result, _) = await _resultTask.ConfigureAwait(false);
+ return result;
+ }
+
+ /// <inheritdoc/>
+ public async Task<JobStatus?> GetStatusAsync()
+ {
+ var finalStatus = _finalStatus;
+ if (finalStatus != null)
+ {
+ return finalStatus;
+ }
+
+ var status = await
_compute.GetJobStatusAsync(Id).ConfigureAwait(false);
+ if (status is { State: JobState.Completed or JobState.Failed or
JobState.Canceled })
+ {
+ // Can't be transitioned to another state, cache it.
+ _finalStatus = status;
+ }
+
+ return status;
+ }
+
+ /// <inheritdoc/>
+ public async Task<bool?> CancelAsync() =>
+ await _compute.CancelJobAsync(Id).ConfigureAwait(false);
+
+ /// <inheritdoc/>
+ public async Task<bool?> ChangePriorityAsync(int priority) =>
+ await _compute.ChangeJobPriorityAsync(Id,
priority).ConfigureAwait(false);
+
+ private async Task CacheStatusOnCompletion()
+ {
+ var (_, status) = await _resultTask.ConfigureAwait(false);
+
+ _finalStatus = status;
+ }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs
index 213990c209..80b3cecdfd 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs
@@ -119,6 +119,15 @@ namespace Apache.Ignite.Internal.Proto
SqlExecScript = 56,
/** SQL parameter metadata. */
- SqlParamMeta = 57
+ SqlParamMeta = 57,
+
+ /** Get compute job status. */
+ ComputeGetStatus = 59,
+
+ /** Cancel compute job. */
+ ComputeCancel = 60,
+
+ /** Change compute job priority. */
+ ComputeChangePriority = 61
}
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs
index 71c0803cf8..abab50c462 100644
---
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs
@@ -54,6 +54,9 @@ namespace Apache.Ignite.Internal.Proto
ClientOp.TupleContainsKey =>
ClientOperationType.TupleContainsKey,
ClientOp.ComputeExecute => ClientOperationType.ComputeExecute,
ClientOp.ComputeExecuteColocated =>
ClientOperationType.ComputeExecute,
+ ClientOp.ComputeGetStatus =>
ClientOperationType.ComputeGetStatus,
+ ClientOp.ComputeCancel => ClientOperationType.ComputeCancel,
+ ClientOp.ComputeChangePriority =>
ClientOperationType.ComputeChangePriority,
ClientOp.SqlExec => ClientOperationType.SqlExecute,
ClientOp.SqlExecScript => ClientOperationType.SqlExecuteScript,
ClientOp.SqlCursorNextPage => null,
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackReader.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackReader.cs
index e86f3defa1..30357ca7bc 100644
---
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackReader.cs
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackReader.cs
@@ -22,6 +22,7 @@ using System.Buffers.Binary;
using System.IO;
using BinaryTuple;
using Ignite.Sql;
+using NodaTime;
/// <summary>
/// MsgPack reader.
@@ -80,6 +81,12 @@ internal ref struct MsgPackReader
var invalid => throw GetInvalidCodeException("bool", invalid)
};
+ /// <summary>
+ /// Reads a nullable boolean value.
+ /// </summary>
+ /// <returns>The value.</returns>
+ public bool ReadBooleanNullable() => TryReadNil() ? default :
ReadBoolean();
+
/// <summary>
/// Reads a short value.
/// </summary>
@@ -214,6 +221,14 @@ internal ref struct MsgPackReader
return UuidSerializer.Read(GetSpan(16));
}
+ /// <summary>
+ /// Reads Instant value.
+ /// </summary>
+ /// <returns>Instant.</returns>
+ public Instant? ReadInstantNullable() => TryReadNil()
+ ? null
+ :
Instant.FromUnixTimeSeconds(ReadInt64()).PlusNanoseconds(ReadInt32());
+
/// <summary>
/// Skips a value.
/// </summary>
diff --git a/modules/platforms/dotnet/Apache.Ignite/RetryReadPolicy.cs
b/modules/platforms/dotnet/Apache.Ignite/RetryReadPolicy.cs
index 925f04afb2..41dcbcfa53 100644
--- a/modules/platforms/dotnet/Apache.Ignite/RetryReadPolicy.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/RetryReadPolicy.cs
@@ -58,6 +58,9 @@ namespace Apache.Ignite
ClientOperationType.ComputeExecute => false,
ClientOperationType.SqlExecute => false,
ClientOperationType.SqlExecuteScript => false,
+ ClientOperationType.ComputeCancel => false,
+ ClientOperationType.ComputeChangePriority => false,
+ ClientOperationType.ComputeGetStatus => true,
var unsupported => throw new
NotSupportedException("Unsupported operation type: " + unsupported)
};
}