This is an automated email from the ASF dual-hosted git repository.
mpochatkin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 54588fdf67 IGNITE-22431 Rename ComputeJobRunner to MapReduceJob (#3923)
54588fdf67 is described below
commit 54588fdf6756d8b1d0b3b179840da96d9c85eb08
Author: Mikhail <[email protected]>
AuthorDate: Mon Jun 17 17:00:05 2024 +0300
IGNITE-22431 Rename ComputeJobRunner to MapReduceJob (#3923)
Rename JobExecutionContext.isInterrupted to isCancelled
Hide builder in JobStatus
Rename ComputeJobRunner to MapReduceJob
Move TaskExecution to task package
---
.../org/apache/ignite/compute/IgniteCompute.java | 1 +
.../apache/ignite/compute/JobExecutionContext.java | 6 +-
.../java/org/apache/ignite/compute/JobStatus.java | 181 +--------------------
.../{ComputeJobRunner.java => MapReduceJob.java} | 8 +-
.../apache/ignite/compute/task/MapReduceTask.java | 2 +-
.../ignite/compute/{ => task}/TaskExecution.java | 4 +-
.../ClientComputeExecuteMapReduceRequest.java | 2 +-
.../internal/client/compute/ClientCompute.java | 2 +-
.../client/compute/ClientJobExecution.java | 3 +-
.../client/compute/ClientTaskExecution.java | 2 +-
.../apache/ignite/client/ClientComputeTest.java | 2 +-
.../apache/ignite/client/fakes/FakeCompute.java | 9 +-
.../ignite/internal/compute/ItComputeBaseTest.java | 2 +-
.../ignite/internal/compute/ItMapReduceTest.java | 2 +-
.../internal/compute/utils/InteractiveTasks.java | 6 +-
.../apache/ignite/internal/compute/MapReduce.java | 6 +-
.../internal/compute/AntiHijackIgniteCompute.java | 2 +-
.../ignite/internal/compute/ComputeComponent.java | 2 +-
.../internal/compute/ComputeComponentImpl.java | 2 +-
.../internal/compute/FailSafeJobExecution.java | 4 +-
.../ignite/internal/compute/IgniteComputeImpl.java | 6 +-
.../internal/compute/JobExecutionContextImpl.java | 2 +-
.../internal/compute/TaskExecutionWrapper.java | 2 +-
.../compute/state/InMemoryComputeStateMachine.java | 5 +-
.../compute/task/AntiHijackTaskExecution.java | 2 +-
.../compute/task/DelegatingTaskExecution.java | 2 +-
.../ignite/internal/compute/task/JobSubmitter.java | 4 +-
.../compute/task/TaskExecutionInternal.java | 33 ++--
.../internal/compute/ComputeComponentImplTest.java | 3 +-
.../compute/JobExecutionContextImplTest.java | 4 +-
.../compute/executor/ComputeExecutorTest.java | 2 +-
.../ignite/internal/compute/JobStatusImpl.java} | 48 +++---
.../runner/app/client/ItThinClientComputeTest.java | 18 +-
33 files changed, 113 insertions(+), 266 deletions(-)
diff --git
a/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
b/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
index c4bf268c32..54b23d478c 100644
--- a/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
+++ b/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
@@ -28,6 +28,7 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.compute.task.MapReduceTask;
+import org.apache.ignite.compute.task.TaskExecution;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.table.mapper.Mapper;
diff --git
a/modules/api/src/main/java/org/apache/ignite/compute/JobExecutionContext.java
b/modules/api/src/main/java/org/apache/ignite/compute/JobExecutionContext.java
index 58a037047f..d4d0cace2e 100644
---
a/modules/api/src/main/java/org/apache/ignite/compute/JobExecutionContext.java
+++
b/modules/api/src/main/java/org/apache/ignite/compute/JobExecutionContext.java
@@ -31,9 +31,9 @@ public interface JobExecutionContext {
Ignite ignite();
/**
- * Flag indicating whether the job was interrupted.
+ * Flag indicating whether the job was cancelled.
*
- * @return {@code true} when the job was interrupted.
+ * @return {@code true} when the job was cancelled.
*/
- boolean isInterrupted();
+ boolean isCancelled();
}
diff --git a/modules/api/src/main/java/org/apache/ignite/compute/JobStatus.java
b/modules/api/src/main/java/org/apache/ignite/compute/JobStatus.java
index df0fd541cd..0127067c04 100644
--- a/modules/api/src/main/java/org/apache/ignite/compute/JobStatus.java
+++ b/modules/api/src/main/java/org/apache/ignite/compute/JobStatus.java
@@ -19,86 +19,33 @@ package org.apache.ignite.compute;
import java.io.Serializable;
import java.time.Instant;
-import java.util.Objects;
import java.util.UUID;
import org.jetbrains.annotations.Nullable;
/**
* Job status.
*/
-public class JobStatus implements Serializable {
- private static final long serialVersionUID = 8575969461073736006L;
-
- /**
- * Job ID.
- */
- private final UUID id;
-
- /**
- * Job state.
- */
- private final JobState state;
-
- /**
- * Job create time.
- */
- private final Instant createTime;
-
- /**
- * Job start time.
- */
- @Nullable
- private final Instant startTime;
-
- /**
- * Job finish time.
- */
- @Nullable
- private final Instant finishTime;
-
- private JobStatus(Builder builder) {
- this.id = Objects.requireNonNull(builder.id, "id");
- this.state = Objects.requireNonNull(builder.state, "state");
- this.createTime = Objects.requireNonNull(builder.createTime,
"createTime");
- this.startTime = builder.startTime;
- this.finishTime = builder.finishTime;
- }
-
- /**
- * Creates a new builder.
- *
- * @return Builder.
- */
- public static Builder builder() {
- return new Builder();
- }
-
+public interface JobStatus extends Serializable {
/**
* Returns job ID.
*
* @return Job ID.
*/
- public UUID id() {
- return id;
- }
+ UUID id();
/**
* Returns job state.
*
* @return Job state.
*/
- public JobState state() {
- return state;
- }
+ JobState state();
/**
* Returns job create time.
*
* @return Job create time.
*/
- public Instant createTime() {
- return createTime;
- }
+ Instant createTime();
/**
* Returns job start time. {@code null} if the job has not started yet.
@@ -106,9 +53,7 @@ public class JobStatus implements Serializable {
* @return Job start time. {@code null} if the job has not started yet.
*/
@Nullable
- public Instant startTime() {
- return startTime;
- }
+ Instant startTime();
/**
* Returns job finish time. {@code null} if the job has not finished yet.
@@ -116,119 +61,5 @@ public class JobStatus implements Serializable {
* @return Job finish time. {@code null} if the job has not finished yet.
*/
@Nullable
- public Instant finishTime() {
- return finishTime;
- }
-
- /**
- * Returns a new builder with the same property values as this JobStatus.
- *
- * @return Builder.
- */
- public Builder toBuilder() {
- return new Builder(this);
- }
-
- @Override
- public String toString() {
- return "JobStatus{"
- + "id=" + id
- + ", state=" + state
- + ", createTime=" + createTime
- + ", startTime=" + startTime
- + ", finishTime=" + finishTime
- + '}';
- }
-
- /**
- * Builder.
- */
- public static class Builder {
- private UUID id;
- private JobState state;
- private Instant createTime;
- @Nullable
- private Instant startTime;
- @Nullable
- private Instant finishTime;
-
- /**
- * Constructor.
- */
- public Builder() {
- }
-
- private Builder(JobStatus status) {
- this.id = status.id;
- this.state = status.state;
- this.createTime = status.createTime;
- this.startTime = status.startTime;
- this.finishTime = status.finishTime;
- }
-
- /**
- * Sets job ID.
- *
- * @param id Job ID.
- * @return This builder.
- */
- public Builder id(UUID id) {
- this.id = id;
- return this;
- }
-
- /**
- * Sets job state.
- *
- * @param state Job state.
- * @return This builder.
- */
- public Builder state(JobState state) {
- this.state = state;
- return this;
- }
-
- /**
- * Sets job create time.
- *
- * @param createTime Job create time.
- * @return This builder.
- */
- public Builder createTime(Instant createTime) {
- this.createTime = createTime;
- return this;
- }
-
- /**
- * Sets job start time.
- *
- * @param startTime Job start time.
- * @return This builder.
- */
- public Builder startTime(@Nullable Instant startTime) {
- this.startTime = startTime;
- return this;
- }
-
- /**
- * Sets job finish time.
- *
- * @param finishTime Job finish time.
- * @return This builder.
- */
- public Builder finishTime(@Nullable Instant finishTime) {
- this.finishTime = finishTime;
- return this;
- }
-
- /**
- * Builds a new JobStatus.
- *
- * @return JobStatus.
- */
- public JobStatus build() {
- return new JobStatus(this);
- }
- }
+ Instant finishTime();
}
-
diff --git
a/modules/api/src/main/java/org/apache/ignite/compute/task/ComputeJobRunner.java
b/modules/api/src/main/java/org/apache/ignite/compute/task/MapReduceJob.java
similarity index 96%
rename from
modules/api/src/main/java/org/apache/ignite/compute/task/ComputeJobRunner.java
rename to
modules/api/src/main/java/org/apache/ignite/compute/task/MapReduceJob.java
index e8afe84c1e..c3df006c38 100644
---
a/modules/api/src/main/java/org/apache/ignite/compute/task/ComputeJobRunner.java
+++ b/modules/api/src/main/java/org/apache/ignite/compute/task/MapReduceJob.java
@@ -28,7 +28,7 @@ import org.apache.ignite.network.ClusterNode;
* A description of the job to be submitted as a result of the split step of
the {@link MapReduceTask}. Reflects the parameters of the
* {@link org.apache.ignite.compute.IgniteCompute#submit(Set, JobDescriptor,
Object...)} method.
*/
-public class ComputeJobRunner {
+public class MapReduceJob {
private final Set<ClusterNode> nodes;
private final JobDescriptor jobDescriptor;
@@ -36,7 +36,7 @@ public class ComputeJobRunner {
private final Object[] args;
- private ComputeJobRunner(
+ private MapReduceJob(
Set<ClusterNode> nodes,
JobDescriptor jobDescriptor,
Object[] args
@@ -150,12 +150,12 @@ public class ComputeJobRunner {
*
* @return Description object.
*/
- public ComputeJobRunner build() {
+ public MapReduceJob build() {
if (nodes.isEmpty()) {
throw new IllegalArgumentException();
}
- return new ComputeJobRunner(nodes, jobDescriptor, args);
+ return new MapReduceJob(nodes, jobDescriptor, args);
}
}
}
diff --git
a/modules/api/src/main/java/org/apache/ignite/compute/task/MapReduceTask.java
b/modules/api/src/main/java/org/apache/ignite/compute/task/MapReduceTask.java
index 09e88ce1f3..bab9e338f4 100644
---
a/modules/api/src/main/java/org/apache/ignite/compute/task/MapReduceTask.java
+++
b/modules/api/src/main/java/org/apache/ignite/compute/task/MapReduceTask.java
@@ -36,7 +36,7 @@ public interface MapReduceTask<R> {
* @param args Map reduce task arguments.
* @return A list of compute job execution parameters.
*/
- List<ComputeJobRunner> split(TaskExecutionContext taskContext, Object...
args);
+ List<MapReduceJob> split(TaskExecutionContext taskContext, Object... args);
/**
* This is a finishing step in the task execution. This method will be
called with the map from identifiers of compute jobs submitted as
diff --git
a/modules/api/src/main/java/org/apache/ignite/compute/TaskExecution.java
b/modules/api/src/main/java/org/apache/ignite/compute/task/TaskExecution.java
similarity index 94%
rename from
modules/api/src/main/java/org/apache/ignite/compute/TaskExecution.java
rename to
modules/api/src/main/java/org/apache/ignite/compute/task/TaskExecution.java
index a12fd2995b..5f1c95946a 100644
--- a/modules/api/src/main/java/org/apache/ignite/compute/TaskExecution.java
+++
b/modules/api/src/main/java/org/apache/ignite/compute/task/TaskExecution.java
@@ -15,12 +15,14 @@
* limitations under the License.
*/
-package org.apache.ignite.compute;
+package org.apache.ignite.compute.task;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
+import org.apache.ignite.compute.JobExecution;
+import org.apache.ignite.compute.JobStatus;
import org.jetbrains.annotations.Nullable;
/**
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
index 8758f78daf..2b36e2108e 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
@@ -28,7 +28,7 @@ import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.handler.NotificationSender;
import org.apache.ignite.compute.DeploymentUnit;
import org.apache.ignite.compute.JobStatus;
-import org.apache.ignite.compute.TaskExecution;
+import org.apache.ignite.compute.task.TaskExecution;
import org.apache.ignite.internal.client.proto.ClientMessagePacker;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.internal.compute.IgniteComputeInternal;
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
index 87b9f4356e..130652d36a 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
@@ -40,7 +40,7 @@ import org.apache.ignite.compute.IgniteCompute;
import org.apache.ignite.compute.JobDescriptor;
import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.compute.JobExecutionOptions;
-import org.apache.ignite.compute.TaskExecution;
+import org.apache.ignite.compute.task.TaskExecution;
import org.apache.ignite.internal.client.ClientUtils;
import org.apache.ignite.internal.client.PayloadInputChannel;
import org.apache.ignite.internal.client.PayloadOutputChannel;
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientJobExecution.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientJobExecution.java
index f4535aec1b..50682b85ce 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientJobExecution.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientJobExecution.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.client.PayloadInputChannel;
import org.apache.ignite.internal.client.ReliableChannel;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.internal.client.proto.ClientOp;
+import org.apache.ignite.internal.compute.JobStatusImpl;
import org.apache.ignite.lang.IgniteException;
import org.jetbrains.annotations.Nullable;
@@ -141,7 +142,7 @@ class ClientJobExecution<R> implements JobExecution<R> {
if (unpacker.tryUnpackNil()) {
return null;
}
- return JobStatus.builder()
+ return JobStatusImpl.builder()
.id(unpacker.unpackUuid())
.state(unpackJobState(unpacker))
.createTime(unpacker.unpackInstant())
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientTaskExecution.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientTaskExecution.java
index c058b7d2b8..4efa9694d1 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientTaskExecution.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientTaskExecution.java
@@ -31,7 +31,7 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.apache.ignite.compute.JobStatus;
-import org.apache.ignite.compute.TaskExecution;
+import org.apache.ignite.compute.task.TaskExecution;
import org.apache.ignite.internal.client.PayloadInputChannel;
import org.apache.ignite.internal.client.ReliableChannel;
import org.jetbrains.annotations.Nullable;
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java
b/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java
index 513df297be..3fa2517685 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java
@@ -51,7 +51,7 @@ import org.apache.ignite.compute.DeploymentUnit;
import org.apache.ignite.compute.IgniteCompute;
import org.apache.ignite.compute.JobDescriptor;
import org.apache.ignite.compute.JobExecution;
-import org.apache.ignite.compute.TaskExecution;
+import org.apache.ignite.compute.task.TaskExecution;
import org.apache.ignite.compute.version.Version;
import org.apache.ignite.internal.client.table.ClientTable;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
index 071f641244..cc3e9bd366 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
@@ -46,10 +46,11 @@ import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.compute.JobExecutionOptions;
import org.apache.ignite.compute.JobState;
import org.apache.ignite.compute.JobStatus;
-import org.apache.ignite.compute.TaskExecution;
+import org.apache.ignite.compute.task.TaskExecution;
import org.apache.ignite.internal.compute.ComputeUtils;
import org.apache.ignite.internal.compute.IgniteComputeInternal;
import org.apache.ignite.internal.compute.JobExecutionContextImpl;
+import org.apache.ignite.internal.compute.JobStatusImpl;
import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.network.ClusterNode;
@@ -201,7 +202,7 @@ public class FakeCompute implements IgniteComputeInternal {
private <R> JobExecution<R> jobExecution(CompletableFuture<R> result) {
UUID jobId = UUID.randomUUID();
- JobStatus status = JobStatus.builder()
+ JobStatus status = JobStatusImpl.builder()
.id(jobId)
.state(EXECUTING)
.createTime(Instant.now())
@@ -211,7 +212,7 @@ public class FakeCompute implements IgniteComputeInternal {
result.whenComplete((r, throwable) -> {
JobState state = throwable != null ? FAILED : COMPLETED;
- JobStatus newStatus =
status.toBuilder().id(jobId).state(state).finishTime(Instant.now()).build();
+ JobStatus newStatus =
JobStatusImpl.toBuilder(status).state(state).finishTime(Instant.now()).build();
statuses.put(jobId, newStatus);
});
return new JobExecution<>() {
@@ -239,7 +240,7 @@ public class FakeCompute implements IgniteComputeInternal {
private <R> TaskExecution<R> taskExecution(CompletableFuture<R> result) {
BiFunction<UUID, JobState, JobStatus> toStatus = (id, jobState) ->
- JobStatus.builder()
+ JobStatusImpl.builder()
.id(id)
.state(jobState)
.createTime(Instant.now())
diff --git
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
index 6dbcf5c935..9d11f5bca8 100644
---
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
+++
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
@@ -48,7 +48,7 @@ import org.apache.ignite.compute.ComputeException;
import org.apache.ignite.compute.DeploymentUnit;
import org.apache.ignite.compute.JobDescriptor;
import org.apache.ignite.compute.JobExecution;
-import org.apache.ignite.compute.TaskExecution;
+import org.apache.ignite.compute.task.TaskExecution;
import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.util.ExceptionUtils;
diff --git
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItMapReduceTest.java
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItMapReduceTest.java
index 344d11cd22..fde5d473a7 100644
---
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItMapReduceTest.java
+++
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItMapReduceTest.java
@@ -38,7 +38,7 @@ import java.time.Instant;
import java.util.List;
import org.apache.ignite.compute.JobState;
import org.apache.ignite.compute.JobStatus;
-import org.apache.ignite.compute.TaskExecution;
+import org.apache.ignite.compute.task.TaskExecution;
import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.compute.utils.InteractiveJobs;
diff --git
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveTasks.java
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveTasks.java
index 3a67dcdcad..d8827cf734 100644
---
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveTasks.java
+++
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveTasks.java
@@ -31,7 +31,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.compute.JobDescriptor;
-import org.apache.ignite.compute.task.ComputeJobRunner;
+import org.apache.ignite.compute.task.MapReduceJob;
import org.apache.ignite.compute.task.MapReduceTask;
import org.apache.ignite.compute.task.TaskExecutionContext;
@@ -153,7 +153,7 @@ public final class InteractiveTasks {
*/
private static class GlobalInteractiveMapReduceTask implements
MapReduceTask<List<String>> {
@Override
- public List<ComputeJobRunner> split(TaskExecutionContext context,
Object... args) {
+ public List<MapReduceJob> split(TaskExecutionContext context,
Object... args) {
RUNNING_GLOBAL_SPLIT_CNT.incrementAndGet();
offerArgsAsSignals(args);
@@ -169,7 +169,7 @@ public final class InteractiveTasks {
break;
case SPLIT_RETURN_ALL_NODES:
return
context.ignite().clusterNodes().stream().map(node ->
- ComputeJobRunner.builder()
+ MapReduceJob.builder()
.jobDescriptor(JobDescriptor.builder(InteractiveJobs.interactiveJobName()).build())
.nodes(Set.of(node))
.build()
diff --git
a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/MapReduce.java
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/MapReduce.java
index 2ace7c693e..a48578e35b 100644
---
a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/MapReduce.java
+++
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/MapReduce.java
@@ -26,18 +26,18 @@ import java.util.UUID;
import org.apache.ignite.compute.DeploymentUnit;
import org.apache.ignite.compute.JobDescriptor;
import org.apache.ignite.compute.JobExecutionOptions;
-import org.apache.ignite.compute.task.ComputeJobRunner;
+import org.apache.ignite.compute.task.MapReduceJob;
import org.apache.ignite.compute.task.MapReduceTask;
import org.apache.ignite.compute.task.TaskExecutionContext;
/** Map reduce task which runs a {@link GetNodeNameJob} on each node and
computes a sum of length of all node names. */
public class MapReduce implements MapReduceTask<Integer> {
@Override
- public List<ComputeJobRunner> split(TaskExecutionContext taskContext,
Object... args) {
+ public List<MapReduceJob> split(TaskExecutionContext taskContext,
Object... args) {
List<DeploymentUnit> deploymentUnits = (List<DeploymentUnit>) args[0];
return taskContext.ignite().clusterNodes().stream().map(node ->
- ComputeJobRunner.builder()
+ MapReduceJob.builder()
.jobDescriptor(JobDescriptor.builder(GetNodeNameJob.class)
.units(deploymentUnits)
.options(JobExecutionOptions.builder()
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackIgniteCompute.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackIgniteCompute.java
index ee207688cb..dadccee035 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackIgniteCompute.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackIgniteCompute.java
@@ -28,7 +28,7 @@ import org.apache.ignite.compute.DeploymentUnit;
import org.apache.ignite.compute.IgniteCompute;
import org.apache.ignite.compute.JobDescriptor;
import org.apache.ignite.compute.JobExecution;
-import org.apache.ignite.compute.TaskExecution;
+import org.apache.ignite.compute.task.TaskExecution;
import org.apache.ignite.internal.compute.task.AntiHijackTaskExecution;
import org.apache.ignite.internal.wrapper.Wrapper;
import org.apache.ignite.network.ClusterNode;
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java
index e05883dd33..15b4c511e8 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java
@@ -24,7 +24,7 @@ import java.util.concurrent.CompletableFuture;
import org.apache.ignite.compute.DeploymentUnit;
import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.compute.JobStatus;
-import org.apache.ignite.compute.TaskExecution;
+import org.apache.ignite.compute.task.TaskExecution;
import org.apache.ignite.internal.compute.task.JobSubmitter;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.network.ClusterNode;
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
index 43cab4f3d6..0ab6bc697e 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
@@ -36,7 +36,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.compute.DeploymentUnit;
import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.compute.JobStatus;
-import org.apache.ignite.compute.TaskExecution;
+import org.apache.ignite.compute.task.TaskExecution;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
import org.apache.ignite.internal.compute.configuration.ComputeConfiguration;
import org.apache.ignite.internal.compute.executor.ComputeExecutor;
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/FailSafeJobExecution.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/FailSafeJobExecution.java
index e2e0a2fe24..279f5acadc 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/FailSafeJobExecution.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/FailSafeJobExecution.java
@@ -86,7 +86,7 @@ class FailSafeJobExecution<T> implements JobExecution<T> {
}
private static JobStatus failedStatus() {
- return
JobStatus.builder().id(UUID.randomUUID()).createTime(Instant.now()).state(JobState.FAILED).build();
+ return
JobStatusImpl.builder().id(UUID.randomUUID()).createTime(Instant.now()).state(JobState.FAILED).build();
}
/**
@@ -127,7 +127,7 @@ class FailSafeJobExecution<T> implements JobExecution<T> {
capturedStatus.compareAndSet(null, jobStatus);
}
- return jobStatus.toBuilder()
+ return JobStatusImpl.toBuilder(jobStatus)
.createTime(capturedStatus.get().createTime())
.id(capturedStatus.get().id())
.build();
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
index 8b2ee9cca1..78387092a4 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
@@ -47,8 +47,8 @@ import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.compute.JobExecutionOptions;
import org.apache.ignite.compute.JobStatus;
import org.apache.ignite.compute.NodeNotFoundException;
-import org.apache.ignite.compute.TaskExecution;
-import org.apache.ignite.compute.task.ComputeJobRunner;
+import org.apache.ignite.compute.task.MapReduceJob;
+import org.apache.ignite.compute.task.TaskExecution;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.network.TopologyService;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
@@ -346,7 +346,7 @@ public class IgniteComputeImpl implements
IgniteComputeInternal {
return sync(executeMapReduceAsync(units, taskClassName, args));
}
- private JobExecution<Object> submitJob(ComputeJobRunner runner) {
+ private JobExecution<Object> submitJob(MapReduceJob runner) {
return submit(runner.nodes(), runner.jobDescriptor(), runner.args());
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionContextImpl.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionContextImpl.java
index 32aac5375c..3684dad9aa 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionContextImpl.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionContextImpl.java
@@ -51,7 +51,7 @@ public class JobExecutionContextImpl implements
JobExecutionContext {
}
@Override
- public boolean isInterrupted() {
+ public boolean isCancelled() {
return isInterrupted.get();
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/TaskExecutionWrapper.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/TaskExecutionWrapper.java
index 56e2803a4d..105009ca32 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/TaskExecutionWrapper.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/TaskExecutionWrapper.java
@@ -22,7 +22,7 @@ import static
org.apache.ignite.internal.lang.IgniteExceptionMapperUtil.convertT
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.compute.JobStatus;
-import org.apache.ignite.compute.TaskExecution;
+import org.apache.ignite.compute.task.TaskExecution;
import org.jetbrains.annotations.Nullable;
/**
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/state/InMemoryComputeStateMachine.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/state/InMemoryComputeStateMachine.java
index 4654ee1d17..8a44c4b4da 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/state/InMemoryComputeStateMachine.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/state/InMemoryComputeStateMachine.java
@@ -32,6 +32,7 @@ import java.util.function.Function;
import org.apache.ignite.compute.JobState;
import org.apache.ignite.compute.JobStatus;
import org.apache.ignite.internal.compute.Cleaner;
+import org.apache.ignite.internal.compute.JobStatusImpl;
import org.apache.ignite.internal.compute.configuration.ComputeConfiguration;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
@@ -74,7 +75,7 @@ public class InMemoryComputeStateMachine implements
ComputeStateMachine {
@Override
public UUID initJob() {
UUID uuid = UUID.randomUUID();
- JobStatus status = JobStatus.builder()
+ JobStatus status = JobStatusImpl.builder()
.id(uuid)
.state(QUEUED)
.createTime(Instant.now())
@@ -141,7 +142,7 @@ public class InMemoryComputeStateMachine implements
ComputeStateMachine {
validateStateTransition(jobId, currentState, newState);
- JobStatus.Builder builder =
currentStatus.toBuilder().state(newState);
+ JobStatusImpl.Builder builder =
JobStatusImpl.toBuilder(currentStatus).state(newState);
if (newState == EXECUTING) {
builder.startTime(Instant.now());
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/AntiHijackTaskExecution.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/AntiHijackTaskExecution.java
index 5adf264c18..64c5a3fb62 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/AntiHijackTaskExecution.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/AntiHijackTaskExecution.java
@@ -21,7 +21,7 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.apache.ignite.compute.JobStatus;
-import org.apache.ignite.compute.TaskExecution;
+import org.apache.ignite.compute.task.TaskExecution;
import org.apache.ignite.internal.compute.AntiHijackJobExecution;
import org.jetbrains.annotations.Nullable;
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/DelegatingTaskExecution.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/DelegatingTaskExecution.java
index 3da429d893..0037ff013f 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/DelegatingTaskExecution.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/DelegatingTaskExecution.java
@@ -20,7 +20,7 @@ package org.apache.ignite.internal.compute.task;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.compute.JobStatus;
-import org.apache.ignite.compute.TaskExecution;
+import org.apache.ignite.compute.task.TaskExecution;
import org.jetbrains.annotations.Nullable;
/**
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/JobSubmitter.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/JobSubmitter.java
index 400c41322a..5f5c98829f 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/JobSubmitter.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/JobSubmitter.java
@@ -18,7 +18,7 @@
package org.apache.ignite.internal.compute.task;
import org.apache.ignite.compute.JobExecution;
-import org.apache.ignite.compute.task.ComputeJobRunner;
+import org.apache.ignite.compute.task.MapReduceJob;
/**
* Compute job submitter.
@@ -30,5 +30,5 @@ public interface JobSubmitter {
*
* @param computeJobRunner Computer job start parameters.
*/
- JobExecution<Object> submit(ComputeJobRunner computeJobRunner);
+ JobExecution<Object> submit(MapReduceJob computeJobRunner);
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
index ab462224a8..32f1883226 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
@@ -43,9 +43,10 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.compute.JobState;
import org.apache.ignite.compute.JobStatus;
-import org.apache.ignite.compute.task.ComputeJobRunner;
+import org.apache.ignite.compute.task.MapReduceJob;
import org.apache.ignite.compute.task.MapReduceTask;
import org.apache.ignite.compute.task.TaskExecutionContext;
+import org.apache.ignite.internal.compute.JobStatusImpl;
import org.apache.ignite.internal.compute.queue.PriorityQueueExecutor;
import org.apache.ignite.internal.compute.queue.QueueExecution;
import org.apache.ignite.internal.logger.IgniteLogger;
@@ -100,7 +101,7 @@ public class TaskExecutionInternal<R> implements
JobExecution<R> {
);
executionsFuture = splitExecution.resultAsync().thenApply(splitResult
-> {
- List<ComputeJobRunner> runners = splitResult.runners();
+ List<MapReduceJob> runners = splitResult.runners();
LOG.debug("Submitting {} jobs for {}", runners.size(),
taskClass.getName());
return submit(runners, jobSubmitter);
});
@@ -125,12 +126,16 @@ public class TaskExecutionInternal<R> implements
JobExecution<R> {
if (throwable != null) {
// Capture the reduce execution failure reason and time.
JobState state = throwable instanceof CancellationException ?
CANCELED : FAILED;
- reduceFailedStatus.set(
- splitExecution.status().toBuilder()
- .state(state)
- .finishTime(Instant.now())
- .build()
- );
+
+ JobStatus status = splitExecution.status();
+ if (status != null) {
+ reduceFailedStatus.set(
+ JobStatusImpl.toBuilder(status)
+ .state(state)
+ .finishTime(Instant.now())
+ .build()
+ );
+ }
}
}
@@ -159,7 +164,7 @@ public class TaskExecutionInternal<R> implements
JobExecution<R> {
if (reduceStatus == null) {
return null;
}
- return reduceStatus.toBuilder()
+ return JobStatusImpl.toBuilder(reduceStatus)
.id(splitStatus.id())
.createTime(splitStatus.createTime())
.startTime(splitStatus.startTime())
@@ -170,7 +175,7 @@ public class TaskExecutionInternal<R> implements
JobExecution<R> {
}
// At this point split is complete but reduce job is not submitted yet.
- return completedFuture(splitStatus.toBuilder()
+ return completedFuture(JobStatusImpl.toBuilder(splitStatus)
.state(EXECUTING)
.finishTime(null)
.build());
@@ -267,7 +272,7 @@ public class TaskExecutionInternal<R> implements
JobExecution<R> {
});
}
- private static <R> List<JobExecution<Object>>
submit(List<ComputeJobRunner> runners, JobSubmitter jobSubmitter) {
+ private static <R> List<JobExecution<Object>> submit(List<MapReduceJob>
runners, JobSubmitter jobSubmitter) {
return runners.stream()
.map(jobSubmitter::submit)
.collect(toList());
@@ -276,14 +281,14 @@ public class TaskExecutionInternal<R> implements
JobExecution<R> {
private static class SplitResult<R> {
private final MapReduceTask<R> task;
- private final List<ComputeJobRunner> runners;
+ private final List<MapReduceJob> runners;
- private SplitResult(MapReduceTask<R> task, List<ComputeJobRunner>
runners) {
+ private SplitResult(MapReduceTask<R> task, List<MapReduceJob> runners)
{
this.task = task;
this.runners = runners;
}
- private List<ComputeJobRunner> runners() {
+ private List<MapReduceJob> runners() {
return runners;
}
diff --git
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
index 124373aceb..afd538acda 100644
---
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
+++
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
@@ -68,7 +68,6 @@ import org.apache.ignite.compute.DeploymentUnit;
import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.compute.JobExecutionContext;
import org.apache.ignite.compute.JobState;
-import org.apache.ignite.compute.JobStatus;
import org.apache.ignite.compute.version.Version;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
import org.apache.ignite.internal.compute.configuration.ComputeConfiguration;
@@ -316,7 +315,7 @@ class ComputeComponentImplTest extends
BaseIgniteAbstractTest {
private void respondWithJobStatusResponseWhenJobStatusRequestIsSent(UUID
jobId, JobState jobState) {
JobStatusResponse jobStatusResponse = new
ComputeMessagesFactory().jobStatusResponse()
-
.status(JobStatus.builder().id(jobId).state(jobState).createTime(Instant.now()).build())
+
.status(JobStatusImpl.builder().id(jobId).state(jobState).createTime(Instant.now()).build())
.build();
when(messagingService.invoke(any(ClusterNode.class), argThat(msg ->
jobStatusRequestWithJobId(msg, jobId)), anyLong()))
.thenReturn(completedFuture(jobStatusResponse));
diff --git
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/JobExecutionContextImplTest.java
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/JobExecutionContextImplTest.java
index 8f9269789f..b1c532bbf7 100644
---
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/JobExecutionContextImplTest.java
+++
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/JobExecutionContextImplTest.java
@@ -48,10 +48,10 @@ class JobExecutionContextImplTest extends
BaseIgniteAbstractTest {
JobExecutionContext context = new JobExecutionContextImpl(ignite,
isInterrupted, ClassLoader.getSystemClassLoader());
- assertThat(context.isInterrupted(), is(false));
+ assertThat(context.isCancelled(), is(false));
isInterrupted.set(true);
- assertThat(context.isInterrupted(), is(true));
+ assertThat(context.isCancelled(), is(true));
}
}
diff --git
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
index 38b5f2637d..523f99a5f6 100644
---
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
+++
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
@@ -119,7 +119,7 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest {
public Integer execute(JobExecutionContext context, Object... args) {
while (true) {
try {
- if (context.isInterrupted()) {
+ if (context.isCancelled()) {
return 0;
}
Thread.sleep(100);
diff --git a/modules/api/src/main/java/org/apache/ignite/compute/JobStatus.java
b/modules/core/src/main/java/org/apache/ignite/internal/compute/JobStatusImpl.java
similarity index 84%
copy from modules/api/src/main/java/org/apache/ignite/compute/JobStatus.java
copy to
modules/core/src/main/java/org/apache/ignite/internal/compute/JobStatusImpl.java
index df0fd541cd..dc153986b7 100644
--- a/modules/api/src/main/java/org/apache/ignite/compute/JobStatus.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/compute/JobStatusImpl.java
@@ -15,18 +15,20 @@
* limitations under the License.
*/
-package org.apache.ignite.compute;
+package org.apache.ignite.internal.compute;
-import java.io.Serializable;
import java.time.Instant;
import java.util.Objects;
import java.util.UUID;
+import org.apache.ignite.compute.JobState;
+import org.apache.ignite.compute.JobStatus;
+import org.apache.ignite.internal.tostring.S;
import org.jetbrains.annotations.Nullable;
/**
* Job status.
*/
-public class JobStatus implements Serializable {
+public class JobStatusImpl implements JobStatus {
private static final long serialVersionUID = 8575969461073736006L;
/**
@@ -56,7 +58,7 @@ public class JobStatus implements Serializable {
@Nullable
private final Instant finishTime;
- private JobStatus(Builder builder) {
+ private JobStatusImpl(Builder builder) {
this.id = Objects.requireNonNull(builder.id, "id");
this.state = Objects.requireNonNull(builder.state, "state");
this.createTime = Objects.requireNonNull(builder.createTime,
"createTime");
@@ -78,6 +80,7 @@ public class JobStatus implements Serializable {
*
* @return Job ID.
*/
+ @Override
public UUID id() {
return id;
}
@@ -87,6 +90,7 @@ public class JobStatus implements Serializable {
*
* @return Job state.
*/
+ @Override
public JobState state() {
return state;
}
@@ -96,6 +100,7 @@ public class JobStatus implements Serializable {
*
* @return Job create time.
*/
+ @Override
public Instant createTime() {
return createTime;
}
@@ -106,6 +111,7 @@ public class JobStatus implements Serializable {
* @return Job start time. {@code null} if the job has not started yet.
*/
@Nullable
+ @Override
public Instant startTime() {
return startTime;
}
@@ -116,6 +122,7 @@ public class JobStatus implements Serializable {
* @return Job finish time. {@code null} if the job has not finished yet.
*/
@Nullable
+ @Override
public Instant finishTime() {
return finishTime;
}
@@ -125,19 +132,13 @@ public class JobStatus implements Serializable {
*
* @return Builder.
*/
- public Builder toBuilder() {
- return new Builder(this);
+ public static Builder toBuilder(JobStatus status) {
+ return new Builder(status);
}
@Override
public String toString() {
- return "JobStatus{"
- + "id=" + id
- + ", state=" + state
- + ", createTime=" + createTime
- + ", startTime=" + startTime
- + ", finishTime=" + finishTime
- + '}';
+ return S.toString(this);
}
/**
@@ -155,15 +156,20 @@ public class JobStatus implements Serializable {
/**
* Constructor.
*/
- public Builder() {
+ private Builder() {
}
+ /**
+ * Constructor.
+ *
+ * @param status Job status for copy.
+ */
private Builder(JobStatus status) {
- this.id = status.id;
- this.state = status.state;
- this.createTime = status.createTime;
- this.startTime = status.startTime;
- this.finishTime = status.finishTime;
+ this.id = status.id();
+ this.state = status.state();
+ this.createTime = status.createTime();
+ this.startTime = status.startTime();
+ this.finishTime = status.finishTime();
}
/**
@@ -226,8 +232,8 @@ public class JobStatus implements Serializable {
*
* @return JobStatus.
*/
- public JobStatus build() {
- return new JobStatus(this);
+ public JobStatusImpl build() {
+ return new JobStatusImpl(this);
}
}
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
index e028b48af6..f6d310ae9a 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
@@ -76,9 +76,9 @@ import org.apache.ignite.compute.IgniteCompute;
import org.apache.ignite.compute.JobDescriptor;
import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.compute.JobExecutionContext;
-import org.apache.ignite.compute.TaskExecution;
-import org.apache.ignite.compute.task.ComputeJobRunner;
+import org.apache.ignite.compute.task.MapReduceJob;
import org.apache.ignite.compute.task.MapReduceTask;
+import org.apache.ignite.compute.task.TaskExecution;
import org.apache.ignite.compute.task.TaskExecutionContext;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.network.ClusterNode;
@@ -804,9 +804,9 @@ public class ItThinClientComputeTest extends
ItAbstractThinClientTest {
private static class MapReduceNodeNameTask implements
MapReduceTask<String> {
@Override
- public List<ComputeJobRunner> split(TaskExecutionContext context,
Object... args) {
+ public List<MapReduceJob> split(TaskExecutionContext context,
Object... args) {
return context.ignite().clusterNodes().stream()
- .map(node -> ComputeJobRunner.builder()
+ .map(node -> MapReduceJob.builder()
.jobDescriptor(JobDescriptor.builder(NodeNameJob.class).build())
.nodes(Set.of(node))
.args(args)
@@ -824,9 +824,9 @@ public class ItThinClientComputeTest extends
ItAbstractThinClientTest {
private static class MapReduceArgsTask implements MapReduceTask<String> {
@Override
- public List<ComputeJobRunner> split(TaskExecutionContext context,
Object... args) {
+ public List<MapReduceJob> split(TaskExecutionContext context,
Object... args) {
return context.ignite().clusterNodes().stream()
- .map(node -> ComputeJobRunner.builder()
+ .map(node -> MapReduceJob.builder()
.jobDescriptor(JobDescriptor.builder(ConcatJob.class).build())
.nodes(Set.of(node))
.args(args)
@@ -844,7 +844,7 @@ public class ItThinClientComputeTest extends
ItAbstractThinClientTest {
private static class MapReduceExceptionOnSplitTask implements
MapReduceTask<String> {
@Override
- public List<ComputeJobRunner> split(TaskExecutionContext context,
Object... args) {
+ public List<MapReduceJob> split(TaskExecutionContext context,
Object... args) {
throw new CustomException(TRACE_ID, COLUMN_ALREADY_EXISTS_ERR,
"Custom job error", null);
}
@@ -857,9 +857,9 @@ public class ItThinClientComputeTest extends
ItAbstractThinClientTest {
private static class MapReduceExceptionOnReduceTask implements
MapReduceTask<String> {
@Override
- public List<ComputeJobRunner> split(TaskExecutionContext context,
Object... args) {
+ public List<MapReduceJob> split(TaskExecutionContext context,
Object... args) {
return context.ignite().clusterNodes().stream()
- .map(node -> ComputeJobRunner.builder()
+ .map(node -> MapReduceJob.builder()
.jobDescriptor(JobDescriptor.builder(NodeNameJob.class).build())
.nodes(Set.of(node))
.args(args)