This is an automated email from the ASF dual-hosted git repository.
mpochatkin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new dc962df81b IGNITE-22434 Do not inherit TaskExecution from JobExecution
(#4007)
dc962df81b is described below
commit dc962df81bc27746e3e6e0084bde0245620fc9ec
Author: Mikhail <[email protected]>
AuthorDate: Fri Jun 28 19:27:52 2024 +0300
IGNITE-22434 Do not inherit TaskExecution from JobExecution (#4007)
---
.../java/org/apache/ignite/compute/TaskState.java | 65 ++++++++++
.../java/org/apache/ignite/compute/TaskStatus.java | 53 ++++++++
.../apache/ignite/compute/task/TaskExecution.java | 47 +++++++-
.../ClientComputeExecuteMapReduceRequest.java | 3 +-
.../compute/ClientComputeGetStateRequest.java | 13 ++
.../client/compute/ClientJobExecution.java | 40 +++++++
.../client/compute/ClientTaskExecution.java | 11 +-
.../apache/ignite/client/ClientComputeTest.java | 6 +-
.../apache/ignite/client/fakes/FakeCompute.java | 32 ++---
.../ignite/internal/compute/ItMapReduceTest.java | 55 ++++-----
.../internal/compute/AntiHijackJobExecution.java | 2 +-
.../internal/compute/ComputeComponentImpl.java | 2 +-
.../internal/compute/TaskExecutionWrapper.java | 25 +++-
...ecution.java => TaskToJobExecutionWrapper.java} | 44 +++----
.../compute/task/AntiHijackTaskExecution.java | 33 ++++-
.../compute/task/DelegatingTaskExecution.java | 3 +-
.../compute/task/TaskExecutionInternal.java | 31 ++---
.../ignite/internal/compute/JobStateImpl.java | 2 +-
.../internal/compute/JobTaskStatusMapper.java | 84 +++++++++++++
.../{JobStateImpl.java => TaskStateImpl.java} | 42 ++++---
.../testframework/matchers/TaskStateMatcher.java | 133 +++++++++++++++++++++
modules/runner/build.gradle | 1 +
.../runner/app/client/ItThinClientComputeTest.java | 11 +-
23 files changed, 627 insertions(+), 111 deletions(-)
diff --git a/modules/api/src/main/java/org/apache/ignite/compute/TaskState.java
b/modules/api/src/main/java/org/apache/ignite/compute/TaskState.java
new file mode 100644
index 0000000000..c7b9d7cd92
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/ignite/compute/TaskState.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.compute;
+
+import java.io.Serializable;
+import java.time.Instant;
+import java.util.UUID;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Task state.
+ */
+public interface TaskState extends Serializable {
+ /**
+ * Returns task ID.
+ *
+ * @return Task ID.
+ */
+ UUID id();
+
+ /**
+ * Returns task status.
+ *
+ * @return Task status.
+ */
+ TaskStatus status();
+
+ /**
+ * Returns task create time.
+ *
+ * @return Task create time.
+ */
+ Instant createTime();
+
+ /**
+ * Returns task start time. {@code null} if the task has not started yet.
+ *
+ * @return Task start time. {@code null} if the task has not started yet.
+ */
+ @Nullable
+ Instant startTime();
+
+ /**
+ * Returns task finish time. {@code null} if the task has not finished yet.
+ *
+ * @return Task finish time. {@code null} if the task has not finished yet.
+ */
+ @Nullable
+ Instant finishTime();
+}
diff --git
a/modules/api/src/main/java/org/apache/ignite/compute/TaskStatus.java
b/modules/api/src/main/java/org/apache/ignite/compute/TaskStatus.java
new file mode 100644
index 0000000000..63988a7b45
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/ignite/compute/TaskStatus.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.compute;
+
+/**
+ * Compute task's status.
+ */
+public enum TaskStatus {
+ /**
+ * The task is submitted and waiting for an execution start.
+ */
+ QUEUED,
+
+ /**
+ * The task is being executed.
+ */
+ EXECUTING,
+
+ /**
+ * The task was unexpectedly terminated during execution.
+ */
+ FAILED,
+
+ /**
+ * The task was executed successfully and the execution result was
returned.
+ */
+ COMPLETED,
+
+ /**
+ * The task has received the cancel command, but it is still running.
+ */
+ CANCELING,
+
+ /**
+ * The task was successfully cancelled.
+ */
+ CANCELED;
+}
diff --git
a/modules/api/src/main/java/org/apache/ignite/compute/task/TaskExecution.java
b/modules/api/src/main/java/org/apache/ignite/compute/task/TaskExecution.java
index 280aadfe28..a51805de74 100644
---
a/modules/api/src/main/java/org/apache/ignite/compute/task/TaskExecution.java
+++
b/modules/api/src/main/java/org/apache/ignite/compute/task/TaskExecution.java
@@ -23,6 +23,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.compute.JobState;
+import org.apache.ignite.compute.TaskState;
import org.jetbrains.annotations.Nullable;
/**
@@ -30,7 +31,7 @@ import org.jetbrains.annotations.Nullable;
*
* @param <R> Task result type.
*/
-public interface TaskExecution<R> extends JobExecution<R> {
+public interface TaskExecution<R> {
/**
* Returns a collection of states of the jobs which are executing under
this task. The resulting future is completed only after the
* jobs are submitted for execution. The list could contain {@code null}
values if the time for retaining job state has been exceeded.
@@ -50,4 +51,48 @@ public interface TaskExecution<R> extends JobExecution<R> {
.map(state -> state != null ? state.id() : null)
.collect(Collectors.toList()));
}
+
+ /**
+ * Returns task's execution result.
+ *
+ * @return Task's execution result future.
+ */
+ CompletableFuture<R> resultAsync();
+
+ /**
+ * Returns the current state of the task. The task state may be deleted
and thus return {@code null} if the time for retaining task
+ * state has been exceeded.
+ *
+ * @return The current state of the task, or {@code null} if the task
state no longer exists due to exceeding the retention time limit.
+ */
+ CompletableFuture<@Nullable TaskState> stateAsync();
+
+ /**
+ * Returns the id of the task. The task state may be deleted and thus
return {@code null} if the time for retaining task state has been
+ * exceeded.
+ *
+ * @return The id of the task, or {@code null} if the task state no longer
exists due to exceeding the retention time limit.
+ */
+ default CompletableFuture<@Nullable UUID> idAsync() {
+ return stateAsync().thenApply(state -> state != null ? state.id() :
null);
+ }
+
+ /**
+ * Cancels the task.
+ *
+ * @return The future which will be completed with {@code true} when the
task is cancelled, {@code false} when the task couldn't be
+ * cancelled (if it's already completed or in the process of
cancelling), or {@code null} if the task no longer exists due to
+ * exceeding the retention time limit.
+ */
+ CompletableFuture<@Nullable Boolean> cancelAsync();
+
+ /**
+ * Changes task priority. After priority change task will be the last in
the queue of tasks with the same priority.
+ *
+ * @param newPriority new priority.
+ * @return The future which will be completed with {@code true} when the
priority is changed, {@code false} when the priority couldn't
+ * be changed (if the task is already executing or completed), or
{@code null} if the task no longer exists due to exceeding the
+ * retention time limit.
+ */
+ CompletableFuture<@Nullable Boolean> changePriorityAsync(int newPriority);
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
index 027a0d7dd6..a2ea6396b7 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.client.handler.requests.compute;
import static
org.apache.ignite.client.handler.requests.compute.ClientComputeExecuteRequest.unpackArgs;
import static
org.apache.ignite.client.handler.requests.compute.ClientComputeGetStateRequest.packJobState;
+import static
org.apache.ignite.client.handler.requests.compute.ClientComputeGetStateRequest.packTaskState;
import static org.apache.ignite.internal.util.IgniteUtils.firstNotNull;
import java.util.Collections;
@@ -84,7 +85,7 @@ public class ClientComputeExecuteMapReduceRequest {
execution.statesAsync().whenComplete((states,
errStates) ->
notificationSender.sendNotification(w -> {
w.packObjectAsBinaryTuple(val);
- packJobState(w, state);
+ packTaskState(w, state);
packJobStates(w, states);
}, firstNotNull(err, errState, errStates)))
));
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeGetStateRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeGetStateRequest.java
index 258eeefa11..c396a89ddd 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeGetStateRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeGetStateRequest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.client.handler.requests.compute;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.compute.JobState;
+import org.apache.ignite.compute.TaskState;
import org.apache.ignite.internal.client.proto.ClientMessagePacker;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.internal.compute.IgniteComputeInternal;
@@ -63,4 +64,16 @@ public class ClientComputeGetStateRequest {
out.packInstant(state.finishTime());
}
}
+
+ static void packTaskState(ClientMessagePacker out, @Nullable TaskState
state) {
+ if (state == null) {
+ out.packNil();
+ } else {
+ out.packUuid(state.id());
+ out.packInt(state.status().ordinal());
+ out.packInstant(state.createTime());
+ out.packInstant(state.startTime());
+ out.packInstant(state.finishTime());
+ }
+ }
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientJobExecution.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientJobExecution.java
index 4c2e82c28e..8b277294e9 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientJobExecution.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientJobExecution.java
@@ -25,11 +25,14 @@ import java.util.concurrent.CompletableFuture;
import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.compute.JobState;
import org.apache.ignite.compute.JobStatus;
+import org.apache.ignite.compute.TaskState;
+import org.apache.ignite.compute.TaskStatus;
import org.apache.ignite.internal.client.PayloadInputChannel;
import org.apache.ignite.internal.client.ReliableChannel;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.internal.client.proto.ClientOp;
import org.apache.ignite.internal.compute.JobStateImpl;
+import org.apache.ignite.internal.compute.TaskStateImpl;
import org.apache.ignite.lang.IgniteException;
import org.jetbrains.annotations.Nullable;
@@ -40,6 +43,8 @@ import org.jetbrains.annotations.Nullable;
class ClientJobExecution<R> implements JobExecution<R> {
private static final JobStatus[] JOB_STATUSES = JobStatus.values();
+ private static final TaskStatus[] TASK_STATUSES = TaskStatus.values();
+
private final ReliableChannel ch;
private final CompletableFuture<UUID> jobIdFuture;
@@ -108,6 +113,19 @@ class ClientJobExecution<R> implements JobExecution<R> {
);
}
+ static CompletableFuture<@Nullable TaskState> getTaskState(ReliableChannel
ch, UUID taskId) {
+ // Send the request to any node, the request will be broadcast since
client doesn't know which particular node is running the job
+ // especially in case of colocated execution.
+ return ch.serviceAsync(
+ ClientOp.COMPUTE_GET_STATE,
+ w -> w.out().packUuid(taskId),
+ ClientJobExecution::unpackTaskState,
+ null,
+ null,
+ false
+ );
+ }
+
static CompletableFuture<@Nullable Boolean> cancelJob(ReliableChannel ch,
UUID jobId) {
// Send the request to any node, the request will be broadcast since
client doesn't know which particular node is running the job
// especially in case of colocated execution.
@@ -151,6 +169,20 @@ class ClientJobExecution<R> implements JobExecution<R> {
.build();
}
+ static @Nullable TaskState unpackTaskState(PayloadInputChannel
payloadInputChannel) {
+ ClientMessageUnpacker unpacker = payloadInputChannel.in();
+ if (unpacker.tryUnpackNil()) {
+ return null;
+ }
+ return TaskStateImpl.builder()
+ .id(unpacker.unpackUuid())
+ .status(unpackTaskStatus(unpacker))
+ .createTime(unpacker.unpackInstant())
+ .startTime(unpacker.unpackInstantNullable())
+ .finishTime(unpacker.unpackInstantNullable())
+ .build();
+ }
+
private static @Nullable Boolean unpackBooleanResult(PayloadInputChannel
payloadInputChannel) {
ClientMessageUnpacker unpacker = payloadInputChannel.in();
if (unpacker.tryUnpackNil()) {
@@ -166,4 +198,12 @@ class ClientJobExecution<R> implements JobExecution<R> {
}
throw new IgniteException(PROTOCOL_ERR, "Invalid job status id: " +
id);
}
+
+ private static TaskStatus unpackTaskStatus(ClientMessageUnpacker unpacker)
{
+ int id = unpacker.unpackInt();
+ if (id >= 0 && id < TASK_STATUSES.length) {
+ return TASK_STATUSES[id];
+ }
+ throw new IgniteException(PROTOCOL_ERR, "Invalid task status id: " +
id);
+ }
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientTaskExecution.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientTaskExecution.java
index 3057f9f5ac..84b1270557 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientTaskExecution.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientTaskExecution.java
@@ -20,7 +20,9 @@ package org.apache.ignite.internal.client.compute;
import static
org.apache.ignite.internal.client.compute.ClientJobExecution.cancelJob;
import static
org.apache.ignite.internal.client.compute.ClientJobExecution.changePriority;
import static
org.apache.ignite.internal.client.compute.ClientJobExecution.getJobState;
+import static
org.apache.ignite.internal.client.compute.ClientJobExecution.getTaskState;
import static
org.apache.ignite.internal.client.compute.ClientJobExecution.unpackJobState;
+import static
org.apache.ignite.internal.client.compute.ClientJobExecution.unpackTaskState;
import static org.apache.ignite.internal.util.CompletableFutures.allOfToList;
import static
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
@@ -31,6 +33,7 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.apache.ignite.compute.JobState;
+import org.apache.ignite.compute.TaskState;
import org.apache.ignite.compute.task.TaskExecution;
import org.apache.ignite.internal.client.PayloadInputChannel;
import org.apache.ignite.internal.client.ReliableChannel;
@@ -51,7 +54,7 @@ class ClientTaskExecution<R> implements TaskExecution<R> {
private final CompletableFuture<R> resultAsync;
// Local state cache
- private final CompletableFuture<@Nullable JobState> stateFuture = new
CompletableFuture<>();
+ private final CompletableFuture<@Nullable TaskState> stateFuture = new
CompletableFuture<>();
// Local states cache
private final CompletableFuture<List<@Nullable JobState>> statesFutures =
new CompletableFuture<>();
@@ -68,7 +71,7 @@ class ClientTaskExecution<R> implements TaskExecution<R> {
// Notifications require explicit input close.
try (payloadInputChannel) {
R result = (R)
payloadInputChannel.in().unpackObjectFromBinaryTuple();
-
stateFuture.complete(unpackJobState(payloadInputChannel));
+
stateFuture.complete(unpackTaskState(payloadInputChannel));
statesFutures.complete(unpackJobStates(payloadInputChannel));
return result;
}
@@ -81,11 +84,11 @@ class ClientTaskExecution<R> implements TaskExecution<R> {
}
@Override
- public CompletableFuture<@Nullable JobState> stateAsync() {
+ public CompletableFuture<@Nullable TaskState> stateAsync() {
if (stateFuture.isDone()) {
return stateFuture;
}
- return jobIdFuture.thenCompose(jobId -> getJobState(ch, jobId));
+ return jobIdFuture.thenCompose(jobId -> getTaskState(ch, jobId));
}
@Override
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java
b/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java
index 8920496146..f450823243 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java
@@ -24,6 +24,7 @@ import static org.apache.ignite.compute.JobStatus.FAILED;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static
org.apache.ignite.internal.testframework.matchers.JobStateMatcher.jobStateWithStatus;
+import static
org.apache.ignite.internal.testframework.matchers.TaskStateMatcher.taskStateWithStatus;
import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
import static org.apache.ignite.lang.ErrorGroups.Table.TABLE_NOT_FOUND_ERR;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -50,6 +51,7 @@ import org.apache.ignite.compute.IgniteCompute;
import org.apache.ignite.compute.JobDescriptor;
import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.compute.JobTarget;
+import org.apache.ignite.compute.TaskStatus;
import org.apache.ignite.compute.task.TaskExecution;
import org.apache.ignite.deployment.DeploymentUnit;
import org.apache.ignite.deployment.version.Version;
@@ -248,7 +250,7 @@ public class ClientComputeTest extends
BaseIgniteAbstractTest {
assertThat(task.resultAsync(), willBe("s1"));
- assertThat(task.stateAsync(),
willBe(jobStateWithStatus(COMPLETED)));
+ assertThat(task.stateAsync(),
willBe(taskStateWithStatus(TaskStatus.COMPLETED)));
assertThat(task.statesAsync(),
willBe(everyItem(jobStateWithStatus(COMPLETED))));
assertThat("compute task and sub tasks ids must be different",
@@ -266,7 +268,7 @@ public class ClientComputeTest extends
BaseIgniteAbstractTest {
TaskExecution<Object> execution =
client.compute().submitMapReduce(List.of(), "job");
assertThat(execution.resultAsync(),
willThrowFast(IgniteException.class));
- assertThat(execution.stateAsync(),
willBe(jobStateWithStatus(FAILED)));
+ assertThat(execution.stateAsync(),
willBe(taskStateWithStatus(TaskStatus.FAILED)));
assertThat(execution.statesAsync(),
willBe(everyItem(jobStateWithStatus(FAILED))));
}
}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
index 8d1d6e6ca6..9fe5fdb74e 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
@@ -49,12 +49,14 @@ import org.apache.ignite.compute.JobExecutionOptions;
import org.apache.ignite.compute.JobState;
import org.apache.ignite.compute.JobStatus;
import org.apache.ignite.compute.JobTarget;
+import org.apache.ignite.compute.TaskState;
import org.apache.ignite.compute.task.TaskExecution;
import org.apache.ignite.deployment.DeploymentUnit;
import org.apache.ignite.internal.compute.ComputeUtils;
import org.apache.ignite.internal.compute.IgniteComputeInternal;
import org.apache.ignite.internal.compute.JobExecutionContextImpl;
import org.apache.ignite.internal.compute.JobStateImpl;
+import org.apache.ignite.internal.compute.TaskStateImpl;
import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.network.ClusterNode;
@@ -72,7 +74,7 @@ public class FakeCompute implements IgniteComputeInternal {
public static volatile @Nullable RuntimeException err;
- private final Map<UUID, JobState> states = new ConcurrentHashMap<>();
+ private final Map<UUID, JobState> jobStates = new ConcurrentHashMap<>();
public static volatile CountDownLatch latch = new CountDownLatch(0);
@@ -177,12 +179,12 @@ public class FakeCompute implements IgniteComputeInternal
{
.createTime(Instant.now())
.startTime(Instant.now())
.build();
- states.put(jobId, state);
+ jobStates.put(jobId, state);
result.whenComplete((r, throwable) -> {
JobStatus status = throwable != null ? FAILED : COMPLETED;
JobState newState =
JobStateImpl.toBuilder(state).status(status).finishTime(Instant.now()).build();
- states.put(jobId, newState);
+ jobStates.put(jobId, newState);
});
return new JobExecution<>() {
@Override
@@ -192,7 +194,7 @@ public class FakeCompute implements IgniteComputeInternal {
@Override
public CompletableFuture<@Nullable JobState> stateAsync() {
- return completedFuture(states.get(jobId));
+ return completedFuture(jobStates.get(jobId));
}
@Override
@@ -220,16 +222,16 @@ public class FakeCompute implements IgniteComputeInternal
{
UUID subJobId1 = UUID.randomUUID();
UUID subJobId2 = UUID.randomUUID();
- states.put(jobId, toState.apply(jobId, EXECUTING));
- states.put(subJobId1, toState.apply(subJobId1, EXECUTING));
- states.put(subJobId2, toState.apply(subJobId2, EXECUTING));
+ jobStates.put(jobId, toState.apply(jobId, EXECUTING));
+ jobStates.put(subJobId1, toState.apply(subJobId1, EXECUTING));
+ jobStates.put(subJobId2, toState.apply(subJobId2, EXECUTING));
result.whenComplete((r, throwable) -> {
JobStatus status = throwable != null ? FAILED : COMPLETED;
- states.put(jobId, toState.apply(jobId, status));
- states.put(subJobId1, toState.apply(subJobId1, status));
- states.put(subJobId2, toState.apply(subJobId2, status));
+ jobStates.put(jobId, toState.apply(jobId, status));
+ jobStates.put(subJobId1, toState.apply(subJobId1, status));
+ jobStates.put(subJobId2, toState.apply(subJobId2, status));
});
return new TaskExecution<>() {
@@ -239,13 +241,13 @@ public class FakeCompute implements IgniteComputeInternal
{
}
@Override
- public CompletableFuture<@Nullable JobState> stateAsync() {
- return completedFuture(states.get(jobId));
+ public CompletableFuture<@Nullable TaskState> stateAsync() {
+ return
completedFuture(TaskStateImpl.toBuilder(jobStates.get(jobId)).build());
}
@Override
public CompletableFuture<List<@Nullable JobState>> statesAsync() {
- return completedFuture(List.of(states.get(subJobId1),
states.get(subJobId2)));
+ return completedFuture(List.of(jobStates.get(subJobId1),
jobStates.get(subJobId2)));
}
@Override
@@ -262,12 +264,12 @@ public class FakeCompute implements IgniteComputeInternal
{
@Override
public CompletableFuture<Collection<JobState>> statesAsync() {
- return completedFuture(states.values());
+ return completedFuture(jobStates.values());
}
@Override
public CompletableFuture<@Nullable JobState> stateAsync(UUID jobId) {
- return completedFuture(states.get(jobId));
+ return completedFuture(jobStates.get(jobId));
}
@Override
diff --git
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItMapReduceTest.java
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItMapReduceTest.java
index 834b427fb3..b206938a8a 100644
---
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItMapReduceTest.java
+++
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItMapReduceTest.java
@@ -18,14 +18,14 @@
package org.apache.ignite.internal.compute;
import static java.util.stream.Collectors.toList;
-import static org.apache.ignite.compute.JobStatus.CANCELED;
-import static org.apache.ignite.compute.JobStatus.COMPLETED;
-import static org.apache.ignite.compute.JobStatus.EXECUTING;
-import static org.apache.ignite.compute.JobStatus.FAILED;
+import static org.apache.ignite.compute.TaskStatus.CANCELED;
+import static org.apache.ignite.compute.TaskStatus.COMPLETED;
+import static org.apache.ignite.compute.TaskStatus.EXECUTING;
+import static org.apache.ignite.compute.TaskStatus.FAILED;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static
org.apache.ignite.internal.testframework.matchers.JobStateMatcher.jobStateWithStatus;
-import static
org.apache.ignite.internal.testframework.matchers.JobStateMatcher.jobStateWithStatusAndCreateTimeStartTimeFinishTime;
+import static
org.apache.ignite.internal.testframework.matchers.TaskStateMatcher.taskStateWithStatusAndCreateTimeStartTimeFinishTime;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
@@ -36,8 +36,9 @@ import static org.hamcrest.Matchers.nullValue;
import java.time.Instant;
import java.util.List;
-import org.apache.ignite.compute.JobState;
import org.apache.ignite.compute.JobStatus;
+import org.apache.ignite.compute.TaskState;
+import org.apache.ignite.compute.TaskStatus;
import org.apache.ignite.compute.task.TaskExecution;
import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
import org.apache.ignite.internal.app.IgniteImpl;
@@ -68,12 +69,12 @@ class ItMapReduceTest extends
ClusterPerClassIntegrationTest {
// Given running task.
TaskExecution<List<String>> taskExecution =
entryNode.compute().submitMapReduce(List.of(),
InteractiveTasks.GlobalApi.name());
- TestingJobExecution<List<String>> testExecution = new
TestingJobExecution<>(taskExecution);
+ TestingJobExecution<List<String>> testExecution = new
TestingJobExecution<>(new TaskToJobExecutionWrapper<>(taskExecution));
testExecution.assertExecuting();
InteractiveTasks.GlobalApi.assertAlive();
// Save state before split.
- JobState stateBeforeSplit = taskExecution.stateAsync().join();
+ TaskState stateBeforeSplit = taskExecution.stateAsync().join();
// And states list future is not complete yet.
assertThat(taskExecution.statesAsync().isDone(), is(false));
@@ -86,7 +87,7 @@ class ItMapReduceTest extends ClusterPerClassIntegrationTest {
assertTaskStateIs(taskExecution, EXECUTING, stateBeforeSplit,
nullValue(Instant.class));
// And states list contains states for 3 running nodes.
- assertJobStates(taskExecution, EXECUTING);
+ assertJobStates(taskExecution, JobStatus.EXECUTING);
// When finish the jobs.
InteractiveJobs.all().finishReturnWorkerNames();
@@ -106,7 +107,7 @@ class ItMapReduceTest extends
ClusterPerClassIntegrationTest {
assertTaskStateIs(taskExecution, COMPLETED, stateBeforeSplit,
notNullValue(Instant.class));
// And states list contains states for 3 completed jobs.
- assertJobStates(taskExecution, COMPLETED);
+ assertJobStates(taskExecution, JobStatus.COMPLETED);
}
@Test
@@ -117,7 +118,7 @@ class ItMapReduceTest extends
ClusterPerClassIntegrationTest {
TaskExecution<List<String>> taskExecution = startTask(entryNode);
// Save state before split.
- JobState stateBeforeSplit = taskExecution.stateAsync().join();
+ TaskState stateBeforeSplit = taskExecution.stateAsync().join();
// When the split job throws an exception.
InteractiveTasks.GlobalApi.throwException();
@@ -138,7 +139,7 @@ class ItMapReduceTest extends
ClusterPerClassIntegrationTest {
TaskExecution<List<String>> taskExecution = startTask(entryNode,
cooperativeCancel ? "NO_INTERRUPT" : "");
// Save state before split.
- JobState stateBeforeSplit = taskExecution.stateAsync().join();
+ TaskState stateBeforeSplit = taskExecution.stateAsync().join();
// When cancel the task.
assertThat(taskExecution.cancelAsync(), willBe(true));
@@ -161,7 +162,7 @@ class ItMapReduceTest extends
ClusterPerClassIntegrationTest {
TaskExecution<List<String>> taskExecution = startTask(entryNode);
// Save state before split.
- JobState stateBeforeSplit = taskExecution.stateAsync().join();
+ TaskState stateBeforeSplit = taskExecution.stateAsync().join();
// And finish the split job.
finishSplit(taskExecution);
@@ -179,12 +180,12 @@ class ItMapReduceTest extends
ClusterPerClassIntegrationTest {
// Given running task.
TaskExecution<List<String>> taskExecution =
entryNode.compute().submitMapReduce(List.of(),
InteractiveTasks.GlobalApi.name());
- TestingJobExecution<List<String>> testExecution = new
TestingJobExecution<>(taskExecution);
+ TestingJobExecution<List<String>> testExecution = new
TestingJobExecution<>(new TaskToJobExecutionWrapper<>(taskExecution));
testExecution.assertExecuting();
InteractiveTasks.GlobalApi.assertAlive();
// Save state before split.
- JobState stateBeforeSplit = taskExecution.stateAsync().join();
+ TaskState stateBeforeSplit = taskExecution.stateAsync().join();
// And finish the split job.
finishSplit(taskExecution);
@@ -196,7 +197,7 @@ class ItMapReduceTest extends
ClusterPerClassIntegrationTest {
assertTaskFailed(taskExecution, FAILED, stateBeforeSplit);
// And states list contains canceled states.
- assertJobStates(taskExecution, CANCELED);
+ assertJobStates(taskExecution, JobStatus.CANCELED);
// And second cancel will fail.
assertThat(taskExecution.cancelAsync(), willBe(false));
@@ -210,7 +211,7 @@ class ItMapReduceTest extends
ClusterPerClassIntegrationTest {
TaskExecution<List<String>> taskExecution = startTask(entryNode);
// Save state before split.
- JobState stateBeforeSplit = taskExecution.stateAsync().join();
+ TaskState stateBeforeSplit = taskExecution.stateAsync().join();
// And finish the split job.
finishSplit(taskExecution);
@@ -225,7 +226,7 @@ class ItMapReduceTest extends
ClusterPerClassIntegrationTest {
assertTaskFailed(taskExecution, FAILED, stateBeforeSplit);
// And states list contains completed states.
- assertJobStates(taskExecution, COMPLETED);
+ assertJobStates(taskExecution, JobStatus.COMPLETED);
}
@ParameterizedTest
@@ -236,12 +237,12 @@ class ItMapReduceTest extends
ClusterPerClassIntegrationTest {
// Given running task.
String arg = cooperativeCancel ? "NO_INTERRUPT" : "";
TaskExecution<List<String>> taskExecution =
entryNode.compute().submitMapReduce(List.of(),
InteractiveTasks.GlobalApi.name(), arg);
- TestingJobExecution<List<String>> testExecution = new
TestingJobExecution<>(taskExecution);
+ TestingJobExecution<List<String>> testExecution = new
TestingJobExecution<>(new TaskToJobExecutionWrapper<>(taskExecution));
testExecution.assertExecuting();
InteractiveTasks.GlobalApi.assertAlive();
// Save state before split.
- JobState stateBeforeSplit = taskExecution.stateAsync().join();
+ TaskState stateBeforeSplit = taskExecution.stateAsync().join();
// And finish the split job.
finishSplit(taskExecution);
@@ -259,7 +260,7 @@ class ItMapReduceTest extends
ClusterPerClassIntegrationTest {
assertTaskFailed(taskExecution, CANCELED, stateBeforeSplit);
// And states list contains completed states.
- assertJobStates(taskExecution, COMPLETED);
+ assertJobStates(taskExecution, JobStatus.COMPLETED);
// And second cancel will fail.
assertThat(taskExecution.cancelAsync(), willBe(false));
@@ -267,7 +268,7 @@ class ItMapReduceTest extends
ClusterPerClassIntegrationTest {
private static TaskExecution<List<String>> startTask(IgniteImpl entryNode,
Object... args) throws InterruptedException {
TaskExecution<List<String>> taskExecution =
entryNode.compute().submitMapReduce(List.of(),
InteractiveTasks.GlobalApi.name(), args);
- new TestingJobExecution<>(taskExecution).assertExecuting();
+ new TestingJobExecution<>(new
TaskToJobExecutionWrapper<>(taskExecution)).assertExecuting();
InteractiveTasks.GlobalApi.assertAlive();
return taskExecution;
}
@@ -277,21 +278,21 @@ class ItMapReduceTest extends
ClusterPerClassIntegrationTest {
InteractiveTasks.GlobalApi.finishSplit();
// And wait for states list contains states for 3 running nodes.
- assertJobStates(taskExecution, EXECUTING);
+ assertJobStates(taskExecution, JobStatus.EXECUTING);
}
- private static void assertTaskFailed(TaskExecution<List<String>>
taskExecution, JobStatus status, JobState stateBeforeSplit) {
+ private static void assertTaskFailed(TaskExecution<List<String>>
taskExecution, TaskStatus status, TaskState stateBeforeSplit) {
assertThat(taskExecution.resultAsync(),
willThrow(IgniteException.class));
assertTaskStateIs(taskExecution, status, stateBeforeSplit,
notNullValue(Instant.class));
}
private static void assertTaskStateIs(
TaskExecution<List<String>> taskExecution,
- JobStatus status,
- JobState stateBeforeSplit,
+ TaskStatus status,
+ TaskState stateBeforeSplit,
Matcher<Instant> finishTimeMatcher
) {
- assertThat(taskExecution.stateAsync(),
willBe(jobStateWithStatusAndCreateTimeStartTimeFinishTime(
+ assertThat(taskExecution.stateAsync(),
willBe(taskStateWithStatusAndCreateTimeStartTimeFinishTime(
is(status),
is(stateBeforeSplit.createTime()),
is(stateBeforeSplit.startTime()),
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackJobExecution.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackJobExecution.java
index 20901f947d..d68af92a79 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackJobExecution.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackJobExecution.java
@@ -59,7 +59,7 @@ public class AntiHijackJobExecution<R> implements
JobExecution<R> {
return preventThreadHijack(execution.changePriorityAsync(newPriority));
}
- protected <T> CompletableFuture<T>
preventThreadHijack(CompletableFuture<T> originalFuture) {
+ private <T> CompletableFuture<T> preventThreadHijack(CompletableFuture<T>
originalFuture) {
return PublicApiThreading.preventThreadHijack(originalFuture,
asyncContinuationExecutor);
}
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
index 97cade907a..f4dc3bd5d7 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
@@ -174,7 +174,7 @@ public class ComputeComponentImpl implements
ComputeComponent {
inFlightFutures.registerFuture(taskFuture);
DelegatingTaskExecution<R> result = new
DelegatingTaskExecution<>(taskFuture);
- result.idAsync().thenAccept(jobId ->
executionManager.addExecution(jobId, result));
+ result.idAsync().thenAccept(jobId ->
executionManager.addExecution(jobId, new TaskToJobExecutionWrapper<>(result)));
return result;
} finally {
busyLock.leaveBusy();
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/TaskExecutionWrapper.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/TaskExecutionWrapper.java
index d2cabdc8e7..9c2c3bbe81 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/TaskExecutionWrapper.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/TaskExecutionWrapper.java
@@ -17,11 +17,13 @@
package org.apache.ignite.internal.compute;
+import static
org.apache.ignite.internal.compute.ComputeUtils.convertToComputeFuture;
import static
org.apache.ignite.internal.lang.IgniteExceptionMapperUtil.convertToPublicFuture;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.compute.JobState;
+import org.apache.ignite.compute.TaskState;
import org.apache.ignite.compute.task.TaskExecution;
import org.jetbrains.annotations.Nullable;
@@ -30,11 +32,10 @@ import org.jetbrains.annotations.Nullable;
*
* @param <R> Result type.
*/
-class TaskExecutionWrapper<R> extends JobExecutionWrapper<R> implements
TaskExecution<R> {
+class TaskExecutionWrapper<R> implements TaskExecution<R> {
private final TaskExecution<R> delegate;
TaskExecutionWrapper(TaskExecution<R> delegate) {
- super(delegate);
this.delegate = delegate;
}
@@ -42,4 +43,24 @@ class TaskExecutionWrapper<R> extends JobExecutionWrapper<R>
implements TaskExec
public CompletableFuture<List<@Nullable JobState>> statesAsync() {
return convertToPublicFuture(delegate.statesAsync());
}
+
+ @Override
+ public CompletableFuture<R> resultAsync() {
+ return convertToComputeFuture(delegate.resultAsync());
+ }
+
+ @Override
+ public CompletableFuture<@Nullable TaskState> stateAsync() {
+ return convertToPublicFuture(delegate.stateAsync());
+ }
+
+ @Override
+ public CompletableFuture<@Nullable Boolean> cancelAsync() {
+ return convertToPublicFuture(delegate.cancelAsync());
+ }
+
+ @Override
+ public CompletableFuture<@Nullable Boolean> changePriorityAsync(int
newPriority) {
+ return
convertToPublicFuture(delegate.changePriorityAsync(newPriority));
+ }
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackJobExecution.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/TaskToJobExecutionWrapper.java
similarity index 56%
copy from
modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackJobExecution.java
copy to
modules/compute/src/main/java/org/apache/ignite/internal/compute/TaskToJobExecutionWrapper.java
index 20901f947d..a04fa8cc12 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackJobExecution.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/TaskToJobExecutionWrapper.java
@@ -18,48 +18,52 @@
package org.apache.ignite.internal.compute;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.compute.JobState;
-import org.apache.ignite.internal.thread.PublicApiThreading;
+import org.apache.ignite.compute.task.TaskExecution;
import org.jetbrains.annotations.Nullable;
/**
- * Wrapper around {@link JobExecution} that adds protection against thread
hijacking by users.
+ * Representation of {@link TaskExecution} as {@link JobExecution}.
+ *
+ * @param <R> Job result type.
*/
-public class AntiHijackJobExecution<R> implements JobExecution<R> {
- private final JobExecution<R> execution;
- private final Executor asyncContinuationExecutor;
+public class TaskToJobExecutionWrapper<R> implements JobExecution<R> {
+ private final TaskExecution<R> taskExecution;
- /**
- * Constructor.
- */
- public AntiHijackJobExecution(JobExecution<R> execution, Executor
asyncContinuationExecutor) {
- this.execution = execution;
- this.asyncContinuationExecutor = asyncContinuationExecutor;
+ public TaskToJobExecutionWrapper(TaskExecution<R> taskExecution) {
+ this.taskExecution = taskExecution;
}
@Override
public CompletableFuture<R> resultAsync() {
- return preventThreadHijack(execution.resultAsync());
+ return taskExecution.resultAsync();
}
@Override
public CompletableFuture<@Nullable JobState> stateAsync() {
- return preventThreadHijack(execution.stateAsync());
+ return taskExecution.stateAsync().thenApply(state -> {
+ if (state == null) {
+ return null;
+
+ }
+ return JobStateImpl.builder()
+ .id(state.id())
+ .createTime(state.createTime())
+ .startTime(state.startTime())
+ .finishTime(state.finishTime())
+ .status(JobTaskStatusMapper.toJobStatus(state.status()))
+ .build();
+ });
}
@Override
public CompletableFuture<@Nullable Boolean> cancelAsync() {
- return preventThreadHijack(execution.cancelAsync());
+ return taskExecution.cancelAsync();
}
@Override
public CompletableFuture<@Nullable Boolean> changePriorityAsync(int
newPriority) {
- return preventThreadHijack(execution.changePriorityAsync(newPriority));
- }
-
- protected <T> CompletableFuture<T>
preventThreadHijack(CompletableFuture<T> originalFuture) {
- return PublicApiThreading.preventThreadHijack(originalFuture,
asyncContinuationExecutor);
+ return taskExecution.changePriorityAsync(newPriority);
}
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/AntiHijackTaskExecution.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/AntiHijackTaskExecution.java
index c8c9a7d96e..a3ce63c7f4 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/AntiHijackTaskExecution.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/AntiHijackTaskExecution.java
@@ -21,16 +21,19 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.apache.ignite.compute.JobState;
+import org.apache.ignite.compute.TaskState;
import org.apache.ignite.compute.task.TaskExecution;
-import org.apache.ignite.internal.compute.AntiHijackJobExecution;
+import org.apache.ignite.internal.thread.PublicApiThreading;
import org.jetbrains.annotations.Nullable;
/**
* Wrapper around {@link TaskExecution} that adds protection against thread
hijacking by users.
*/
-public class AntiHijackTaskExecution<R> extends AntiHijackJobExecution<R>
implements TaskExecution<R> {
+public class AntiHijackTaskExecution<R> implements TaskExecution<R> {
private final TaskExecution<R> execution;
+ private final Executor asyncContinuationExecutor;
+
/**
* Constructor.
*
@@ -38,12 +41,36 @@ public class AntiHijackTaskExecution<R> extends
AntiHijackJobExecution<R> implem
* @param asyncContinuationExecutor Executor to which the execution will
be resubmitted.
*/
public AntiHijackTaskExecution(TaskExecution<R> execution, Executor
asyncContinuationExecutor) {
- super(execution, asyncContinuationExecutor);
this.execution = execution;
+ this.asyncContinuationExecutor = asyncContinuationExecutor;
}
@Override
public CompletableFuture<List<@Nullable JobState>> statesAsync() {
return preventThreadHijack(execution.statesAsync());
}
+
+ @Override
+ public CompletableFuture<R> resultAsync() {
+ return preventThreadHijack(execution.resultAsync());
+ }
+
+ @Override
+ public CompletableFuture<@Nullable TaskState> stateAsync() {
+ return preventThreadHijack(execution.stateAsync());
+ }
+
+ @Override
+ public CompletableFuture<@Nullable Boolean> cancelAsync() {
+ return preventThreadHijack(execution.cancelAsync());
+ }
+
+ @Override
+ public CompletableFuture<@Nullable Boolean> changePriorityAsync(int
newPriority) {
+ return preventThreadHijack(execution.changePriorityAsync(newPriority));
+ }
+
+ private <T> CompletableFuture<T> preventThreadHijack(CompletableFuture<T>
originalFuture) {
+ return PublicApiThreading.preventThreadHijack(originalFuture,
asyncContinuationExecutor);
+ }
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/DelegatingTaskExecution.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/DelegatingTaskExecution.java
index 071a527ab5..076aa66d39 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/DelegatingTaskExecution.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/DelegatingTaskExecution.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.compute.task;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.compute.JobState;
+import org.apache.ignite.compute.TaskState;
import org.apache.ignite.compute.task.TaskExecution;
import org.jetbrains.annotations.Nullable;
@@ -51,7 +52,7 @@ public class DelegatingTaskExecution<R> implements
TaskExecution<R> {
}
@Override
- public CompletableFuture<@Nullable JobState> stateAsync() {
+ public CompletableFuture<@Nullable TaskState> stateAsync() {
return delegate.thenCompose(TaskExecutionInternal::stateAsync);
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
index 7fb3622e9f..0f3ebe7bc8 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
@@ -20,10 +20,10 @@ package org.apache.ignite.internal.compute.task;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toList;
-import static org.apache.ignite.compute.JobStatus.CANCELED;
import static org.apache.ignite.compute.JobStatus.COMPLETED;
-import static org.apache.ignite.compute.JobStatus.EXECUTING;
-import static org.apache.ignite.compute.JobStatus.FAILED;
+import static org.apache.ignite.compute.TaskStatus.CANCELED;
+import static org.apache.ignite.compute.TaskStatus.EXECUTING;
+import static org.apache.ignite.compute.TaskStatus.FAILED;
import static org.apache.ignite.internal.compute.ComputeUtils.instantiateTask;
import static org.apache.ignite.internal.util.ArrayUtils.concat;
import static org.apache.ignite.internal.util.CompletableFutures.allOfToList;
@@ -44,11 +44,13 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.compute.JobState;
-import org.apache.ignite.compute.JobStatus;
+import org.apache.ignite.compute.TaskState;
+import org.apache.ignite.compute.TaskStatus;
import org.apache.ignite.compute.task.MapReduceJob;
import org.apache.ignite.compute.task.MapReduceTask;
+import org.apache.ignite.compute.task.TaskExecution;
import org.apache.ignite.compute.task.TaskExecutionContext;
-import org.apache.ignite.internal.compute.JobStateImpl;
+import org.apache.ignite.internal.compute.TaskStateImpl;
import org.apache.ignite.internal.compute.queue.PriorityQueueExecutor;
import org.apache.ignite.internal.compute.queue.QueueExecution;
import org.apache.ignite.internal.logger.IgniteLogger;
@@ -64,7 +66,7 @@ import org.jetbrains.annotations.Nullable;
* @param <R> Task result type.
*/
@SuppressWarnings("unchecked")
-public class TaskExecutionInternal<R> implements JobExecution<R> {
+public class TaskExecutionInternal<R> implements TaskExecution<R> {
private static final IgniteLogger LOG =
Loggers.forClass(TaskExecutionInternal.class);
private final QueueExecution<SplitResult<R>> splitExecution;
@@ -75,7 +77,7 @@ public class TaskExecutionInternal<R> implements
JobExecution<R> {
private final CompletableFuture<QueueExecution<R>> reduceExecutionFuture;
- private final AtomicReference<JobState> reduceFailedState = new
AtomicReference<>();
+ private final AtomicReference<TaskState> reduceFailedState = new
AtomicReference<>();
private final AtomicBoolean isCancelled;
@@ -135,12 +137,12 @@ public class TaskExecutionInternal<R> implements
JobExecution<R> {
private void captureReduceFailure(QueueExecution<R> reduceExecution,
Throwable throwable) {
if (throwable != null) {
// Capture the reduce execution failure reason and time.
- JobStatus status = throwable instanceof CancellationException ?
CANCELED : FAILED;
+ TaskStatus status = throwable instanceof CancellationException ?
CANCELED : FAILED;
JobState state = splitExecution.state();
if (state != null) {
reduceFailedState.set(
- JobStateImpl.toBuilder(state)
+ TaskStateImpl.toBuilder(state)
.status(status)
.finishTime(Instant.now())
.build()
@@ -155,7 +157,7 @@ public class TaskExecutionInternal<R> implements
JobExecution<R> {
}
@Override
- public CompletableFuture<@Nullable JobState> stateAsync() {
+ public CompletableFuture<@Nullable TaskState> stateAsync() {
JobState splitState = splitExecution.state();
if (splitState == null) {
// Return null even if the reduce execution can still be retained.
@@ -163,7 +165,7 @@ public class TaskExecutionInternal<R> implements
JobExecution<R> {
}
if (splitState.status() != COMPLETED) {
- return completedFuture(splitState);
+ return
completedFuture(TaskStateImpl.toBuilder(splitState).build());
}
// This future is complete when reduce execution job is submitted,
return status from it.
@@ -174,7 +176,7 @@ public class TaskExecutionInternal<R> implements
JobExecution<R> {
if (reduceState == null) {
return null;
}
- return JobStateImpl.toBuilder(reduceState)
+ return TaskStateImpl.toBuilder(reduceState)
.id(splitState.id())
.createTime(splitState.createTime())
.startTime(splitState.startTime())
@@ -185,7 +187,7 @@ public class TaskExecutionInternal<R> implements
JobExecution<R> {
}
// At this point split is complete but reduce job is not submitted yet.
- return completedFuture(JobStateImpl.toBuilder(splitState)
+ return completedFuture(TaskStateImpl.toBuilder(splitState)
.status(EXECUTING)
.finishTime(null)
.build());
@@ -257,7 +259,8 @@ public class TaskExecutionInternal<R> implements
JobExecution<R> {
});
}
- CompletableFuture<List<@Nullable JobState>> statesAsync() {
+ @Override
+ public CompletableFuture<List<@Nullable JobState>> statesAsync() {
return executionsFuture.thenCompose(executions -> {
CompletableFuture<JobState>[] stateFutures = executions.stream()
.map(JobExecution::stateAsync)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/compute/JobStateImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/compute/JobStateImpl.java
index 15fa0390b5..8fcce79ec8 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/compute/JobStateImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/compute/JobStateImpl.java
@@ -26,7 +26,7 @@ import org.apache.ignite.internal.tostring.S;
import org.jetbrains.annotations.Nullable;
/**
- * Job status.
+ * Job state implementation.
*/
public class JobStateImpl implements JobState {
private static final long serialVersionUID = 8575969461073736006L;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/compute/JobTaskStatusMapper.java
b/modules/core/src/main/java/org/apache/ignite/internal/compute/JobTaskStatusMapper.java
new file mode 100644
index 0000000000..a2b89e3889
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/compute/JobTaskStatusMapper.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import static org.apache.ignite.compute.TaskStatus.CANCELED;
+import static org.apache.ignite.compute.TaskStatus.CANCELING;
+import static org.apache.ignite.compute.TaskStatus.COMPLETED;
+import static org.apache.ignite.compute.TaskStatus.EXECUTING;
+import static org.apache.ignite.compute.TaskStatus.FAILED;
+import static org.apache.ignite.compute.TaskStatus.QUEUED;
+
+import org.apache.ignite.compute.JobStatus;
+import org.apache.ignite.compute.TaskStatus;
+
+/**
+ * Mapper for job status from\to task status.
+ */
+public class JobTaskStatusMapper {
+
+ /**
+ * Map task status to job status.
+ *
+ * @param taskStatus Task status.
+ * @return Mapped job status.
+ */
+ public static JobStatus toJobStatus(TaskStatus taskStatus) {
+ switch (taskStatus) {
+ case QUEUED:
+ return JobStatus.QUEUED;
+ case EXECUTING:
+ return JobStatus.EXECUTING;
+ case FAILED:
+ return JobStatus.FAILED;
+ case COMPLETED:
+ return JobStatus.COMPLETED;
+ case CANCELING:
+ return JobStatus.CANCELING;
+ case CANCELED:
+ return JobStatus.CANCELED;
+ default:
+ throw new IllegalArgumentException("Unknown task status.");
+ }
+ }
+
+ /**
+ * Map job status to task status.
+ *
+ * @param jobStatus Job status.
+ * @return Mapped task status.
+ */
+ public static TaskStatus toTaskStatus(JobStatus jobStatus) {
+ switch (jobStatus) {
+ case QUEUED:
+ return QUEUED;
+ case EXECUTING:
+ return EXECUTING;
+ case FAILED:
+ return FAILED;
+ case COMPLETED:
+ return COMPLETED;
+ case CANCELING:
+ return CANCELING;
+ case CANCELED:
+ return CANCELED;
+ default:
+ throw new IllegalArgumentException("Unknown job status.");
+ }
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/compute/JobStateImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/compute/TaskStateImpl.java
similarity index 84%
copy from
modules/core/src/main/java/org/apache/ignite/internal/compute/JobStateImpl.java
copy to
modules/core/src/main/java/org/apache/ignite/internal/compute/TaskStateImpl.java
index 15fa0390b5..3b197f1b68 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/compute/JobStateImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/compute/TaskStateImpl.java
@@ -21,14 +21,15 @@ import java.time.Instant;
import java.util.Objects;
import java.util.UUID;
import org.apache.ignite.compute.JobState;
-import org.apache.ignite.compute.JobStatus;
+import org.apache.ignite.compute.TaskState;
+import org.apache.ignite.compute.TaskStatus;
import org.apache.ignite.internal.tostring.S;
import org.jetbrains.annotations.Nullable;
/**
- * Job status.
+ * Task state implementation.
*/
-public class JobStateImpl implements JobState {
+public class TaskStateImpl implements TaskState {
private static final long serialVersionUID = 8575969461073736006L;
/**
@@ -39,7 +40,7 @@ public class JobStateImpl implements JobState {
/**
* Job status.
*/
- private final JobStatus status;
+ private final TaskStatus status;
/**
* Job create time.
@@ -58,7 +59,7 @@ public class JobStateImpl implements JobState {
@Nullable
private final Instant finishTime;
- private JobStateImpl(Builder builder) {
+ private TaskStateImpl(Builder builder) {
this.id = Objects.requireNonNull(builder.id, "id");
this.status = Objects.requireNonNull(builder.status, "status");
this.createTime = Objects.requireNonNull(builder.createTime,
"createTime");
@@ -91,7 +92,7 @@ public class JobStateImpl implements JobState {
* @return Job status.
*/
@Override
- public JobStatus status() {
+ public TaskStatus status() {
return status;
}
@@ -128,14 +129,28 @@ public class JobStateImpl implements JobState {
}
/**
- * Returns a new builder with the same property values as this JobStatus.
+ * Returns a new builder with the same property values as this TaskState.
*
* @return Builder.
*/
- public static Builder toBuilder(JobState state) {
+ public static Builder toBuilder(TaskState state) {
return new Builder(state);
}
+ /**
+ * Returns a new builder with the same property values as this JobState.
+ *
+ * @return Builder.
+ */
+ public static Builder toBuilder(JobState state) {
+ return new Builder()
+ .id(state.id())
+ .createTime(state.createTime())
+ .finishTime(state.finishTime())
+ .startTime(state.startTime())
+ .status(JobTaskStatusMapper.toTaskStatus(state.status()));
+ }
+
@Override
public String toString() {
return S.toString(this);
@@ -146,7 +161,7 @@ public class JobStateImpl implements JobState {
*/
public static class Builder {
private UUID id;
- private JobStatus status;
+ private TaskStatus status;
private Instant createTime;
@Nullable
private Instant startTime;
@@ -164,7 +179,7 @@ public class JobStateImpl implements JobState {
*
* @param state Job state for copy.
*/
- private Builder(JobState state) {
+ private Builder(TaskState state) {
this.id = state.id();
this.status = state.status();
this.createTime = state.createTime();
@@ -189,7 +204,7 @@ public class JobStateImpl implements JobState {
* @param status Job status.
* @return This builder.
*/
- public Builder status(JobStatus status) {
+ public Builder status(TaskStatus status) {
this.status = status;
return this;
}
@@ -232,9 +247,8 @@ public class JobStateImpl implements JobState {
*
* @return JobState.
*/
- public JobStateImpl build() {
- return new JobStateImpl(this);
+ public TaskStateImpl build() {
+ return new TaskStateImpl(this);
}
}
}
-
diff --git
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/TaskStateMatcher.java
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/TaskStateMatcher.java
new file mode 100644
index 0000000000..91ef713490
--- /dev/null
+++
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/TaskStateMatcher.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.testframework.matchers;
+
+import static
org.apache.ignite.internal.testframework.matchers.AnythingMatcher.anything;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.notNullValue;
+
+import java.time.Instant;
+import org.apache.ignite.compute.JobState;
+import org.apache.ignite.compute.TaskState;
+import org.apache.ignite.compute.TaskStatus;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
+
+/**
+ * Matcher for {@link JobState}.
+ */
+public class TaskStateMatcher extends TypeSafeMatcher<TaskState> {
+ private final Matcher<TaskStatus> statusMatcher;
+ private final Matcher<Instant> createTimeMatcher;
+ private final Matcher<Instant> startTimeMatcher;
+ private final Matcher<Instant> finishTimeMatcher;
+
+
+ private TaskStateMatcher(
+ Matcher<TaskStatus> statusMatcher,
+ Matcher<Instant> createTimeMatcher,
+ Matcher<Instant> startTimeMatcher,
+ Matcher<Instant> finishTimeMatcher
+ ) {
+ this.statusMatcher = statusMatcher;
+ this.createTimeMatcher = createTimeMatcher;
+ this.startTimeMatcher = startTimeMatcher;
+ this.finishTimeMatcher = finishTimeMatcher;
+ }
+
+ /**
+ * Creates a matcher for matching job state with given status.
+ *
+ * @param status Expected status.
+ * @return Matcher for matching job state with given status.
+ */
+ public static TaskStateMatcher taskStateWithStatus(TaskStatus status) {
+ return taskStateWithStatus(equalTo(status));
+ }
+
+ /**
+ * Creates a matcher for matching job state with given status.
+ *
+ * @param statusMatcher Matcher for matching job state status.
+ * @return Matcher for matching job state with given status.
+ */
+ public static TaskStateMatcher taskStateWithStatus(Matcher<TaskStatus>
statusMatcher) {
+ return new TaskStateMatcher(
+ statusMatcher,
+ notNullValue(Instant.class),
+ anything(),
+ anything()
+ );
+ }
+
+ /**
+ * Creates a matcher for matching task state with given status, create
time, start time, and finish time.
+ *
+ * @param statusMatcher Matcher for matching task state status.
+ * @param createTimeMatcher Matcher for matching task state create time.
+ * @param startTimeMatcher Matcher for matching task state start time.
+ * @param finishTimeMatcher Matcher for matching task state finish time.
+ * @return Matcher for matching task state with given status, create time,
start time, and finish time.
+ */
+ public static TaskStateMatcher
taskStateWithStatusAndCreateTimeStartTimeFinishTime(
+ Matcher<TaskStatus> statusMatcher,
+ Matcher<Instant> createTimeMatcher,
+ Matcher<Instant> startTimeMatcher,
+ Matcher<Instant> finishTimeMatcher
+ ) {
+ return new TaskStateMatcher(
+ statusMatcher,
+ createTimeMatcher,
+ startTimeMatcher,
+ finishTimeMatcher
+ );
+ }
+
+ @Override
+ protected boolean matchesSafely(TaskState status) {
+ return statusMatcher.matches(status.status())
+ && createTimeMatcher.matches(status.createTime())
+ && startTimeMatcher.matches(status.startTime())
+ && finishTimeMatcher.matches(status.finishTime());
+ }
+
+ @Override
+ protected void describeMismatchSafely(TaskState state, Description
mismatchDescription) {
+ mismatchDescription.appendText("status ");
+ statusMatcher.describeMismatch(state.status(), mismatchDescription);
+ mismatchDescription.appendText(", create time ");
+ createTimeMatcher.describeMismatch(state.createTime(),
mismatchDescription);
+ mismatchDescription.appendText(", start time ");
+ startTimeMatcher.describeMismatch(state.startTime(),
mismatchDescription);
+ mismatchDescription.appendText(" and finish time ");
+ finishTimeMatcher.describeMismatch(state.finishTime(),
mismatchDescription);
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendText("a TaskState with status ")
+ .appendDescriptionOf(statusMatcher)
+ .appendText(", create time ")
+ .appendDescriptionOf(createTimeMatcher)
+ .appendText(", start time ")
+ .appendDescriptionOf(startTimeMatcher)
+ .appendText(" and finish time ")
+ .appendDescriptionOf(finishTimeMatcher);
+ }
+}
diff --git a/modules/runner/build.gradle b/modules/runner/build.gradle
index d3a710d688..0de5eb81d0 100644
--- a/modules/runner/build.gradle
+++ b/modules/runner/build.gradle
@@ -130,6 +130,7 @@ dependencies {
integrationTestImplementation project(':ignite-raft-api')
integrationTestImplementation project(':ignite-replicator')
integrationTestImplementation project(':ignite-client')
+ integrationTestImplementation project(':ignite-compute')
integrationTestImplementation project(':ignite-client-handler')
integrationTestImplementation project(':ignite-storage-api')
integrationTestImplementation project(':ignite-storage-api')
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
index 1260ef5a4f..a12855827d 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
@@ -29,6 +29,7 @@ import static
org.apache.ignite.internal.testframework.matchers.CompletableFutur
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.will;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static
org.apache.ignite.internal.testframework.matchers.JobStateMatcher.jobStateWithStatus;
+import static
org.apache.ignite.internal.testframework.matchers.TaskStateMatcher.taskStateWithStatus;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
import static
org.apache.ignite.lang.ErrorGroups.Compute.COMPUTE_JOB_FAILED_ERR;
@@ -79,11 +80,13 @@ import org.apache.ignite.compute.JobDescriptor;
import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.compute.JobExecutionContext;
import org.apache.ignite.compute.JobTarget;
+import org.apache.ignite.compute.TaskStatus;
import org.apache.ignite.compute.task.MapReduceJob;
import org.apache.ignite.compute.task.MapReduceTask;
import org.apache.ignite.compute.task.TaskExecution;
import org.apache.ignite.compute.task.TaskExecutionContext;
import org.apache.ignite.deployment.DeploymentUnit;
+import org.apache.ignite.internal.compute.TaskToJobExecutionWrapper;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.table.Tuple;
@@ -708,7 +711,7 @@ public class ItThinClientComputeTest extends
ItAbstractThinClientTest {
.collect(Collectors.toList());
assertThat(execution.resultAsync(), willBe(allOf(nodeNames)));
- assertThat(execution.stateAsync(),
willBe(jobStateWithStatus(COMPLETED)));
+ assertThat(execution.stateAsync(),
willBe(taskStateWithStatus(TaskStatus.COMPLETED)));
assertThat(execution.statesAsync(),
willBe(everyItem(jobStateWithStatus(COMPLETED))));
assertThat("compute task and sub tasks ids must be different",
@@ -721,14 +724,14 @@ public class ItThinClientComputeTest extends
ItAbstractThinClientTest {
.submitMapReduce(List.of(), MapReduceArgsTask.class.getName(),
1, "2", 3.3);
assertThat(execution.resultAsync(), willBe(containsString("1_2_3.3")));
- assertThat(execution.stateAsync(),
willBe(jobStateWithStatus(COMPLETED)));
+ assertThat(execution.stateAsync(),
willBe(taskStateWithStatus(TaskStatus.COMPLETED)));
}
@ParameterizedTest
@ValueSource(classes = {MapReduceExceptionOnSplitTask.class,
MapReduceExceptionOnReduceTask.class})
void testExecuteMapReduceExceptionPropagation(Class<?> taskClass) {
- IgniteException cause = getExceptionInJobExecutionAsync(
- client().compute().submitMapReduce(List.of(),
taskClass.getName())
+ IgniteException cause = getExceptionInJobExecutionAsync(new
TaskToJobExecutionWrapper<>(
+ client().compute().submitMapReduce(List.of(),
taskClass.getName()))
);
assertThat(cause.getMessage(), containsString("Custom job error"));