This is an automated email from the ASF dual-hosted git repository. mpochatkin pushed a commit to branch IGNITE-22431 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 66f961f6f2f04150f7a0a92d93e551188fccd250 Author: Mikhail Pochatkin <[email protected]> AuthorDate: Fri Jun 14 14:34:25 2024 +0300 IGNITE-22431 Rename ComputeJobRunner to MapReduceJob --- .../org/apache/ignite/compute/IgniteCompute.java | 1 + .../apache/ignite/compute/JobExecutionContext.java | 6 +- .../java/org/apache/ignite/compute/JobStatus.java | 181 +-------------------- .../{ComputeJobRunner.java => MapReduceJob.java} | 8 +- .../apache/ignite/compute/task/MapReduceTask.java | 2 +- .../ignite/compute/{ => task}/TaskExecution.java | 4 +- .../ClientComputeExecuteMapReduceRequest.java | 2 +- .../internal/client/compute/ClientCompute.java | 2 +- .../client/compute/ClientJobExecution.java | 3 +- .../client/compute/ClientTaskExecution.java | 2 +- .../apache/ignite/client/ClientComputeTest.java | 2 +- .../apache/ignite/client/fakes/FakeCompute.java | 9 +- .../ignite/internal/compute/ItComputeBaseTest.java | 2 +- .../ignite/internal/compute/ItMapReduceTest.java | 2 +- .../internal/compute/utils/InteractiveTasks.java | 6 +- .../apache/ignite/internal/compute/MapReduce.java | 6 +- .../internal/compute/AntiHijackIgniteCompute.java | 2 +- .../ignite/internal/compute/ComputeComponent.java | 2 +- .../internal/compute/ComputeComponentImpl.java | 2 +- .../internal/compute/FailSafeJobExecution.java | 4 +- .../ignite/internal/compute/IgniteComputeImpl.java | 6 +- .../internal/compute/JobExecutionContextImpl.java | 2 +- .../internal/compute/TaskExecutionWrapper.java | 2 +- .../compute/state/InMemoryComputeStateMachine.java | 5 +- .../compute/task/AntiHijackTaskExecution.java | 2 +- .../compute/task/DelegatingTaskExecution.java | 2 +- .../ignite/internal/compute/task/JobSubmitter.java | 4 +- .../compute/task/TaskExecutionInternal.java | 33 ++-- .../internal/compute/ComputeComponentImplTest.java | 3 +- .../compute/JobExecutionContextImplTest.java | 4 +- .../compute/executor/ComputeExecutorTest.java | 2 +- .../ignite/internal/compute/JobStatusImpl.java} | 35 ++-- .../runner/app/client/ItThinClientComputeTest.java | 18 +- 33 files changed, 109 insertions(+), 257 deletions(-) diff --git a/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java b/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java index c4bf268c32..54b23d478c 100644 --- a/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java +++ b/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java @@ -28,6 +28,7 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.CompletableFuture; import org.apache.ignite.compute.task.MapReduceTask; +import org.apache.ignite.compute.task.TaskExecution; import org.apache.ignite.network.ClusterNode; import org.apache.ignite.table.Tuple; import org.apache.ignite.table.mapper.Mapper; diff --git a/modules/api/src/main/java/org/apache/ignite/compute/JobExecutionContext.java b/modules/api/src/main/java/org/apache/ignite/compute/JobExecutionContext.java index 58a037047f..d4d0cace2e 100644 --- a/modules/api/src/main/java/org/apache/ignite/compute/JobExecutionContext.java +++ b/modules/api/src/main/java/org/apache/ignite/compute/JobExecutionContext.java @@ -31,9 +31,9 @@ public interface JobExecutionContext { Ignite ignite(); /** - * Flag indicating whether the job was interrupted. + * Flag indicating whether the job was cancelled. * - * @return {@code true} when the job was interrupted. + * @return {@code true} when the job was cancelled. */ - boolean isInterrupted(); + boolean isCancelled(); } diff --git a/modules/api/src/main/java/org/apache/ignite/compute/JobStatus.java b/modules/api/src/main/java/org/apache/ignite/compute/JobStatus.java index df0fd541cd..0127067c04 100644 --- a/modules/api/src/main/java/org/apache/ignite/compute/JobStatus.java +++ b/modules/api/src/main/java/org/apache/ignite/compute/JobStatus.java @@ -19,86 +19,33 @@ package org.apache.ignite.compute; import java.io.Serializable; import java.time.Instant; -import java.util.Objects; import java.util.UUID; import org.jetbrains.annotations.Nullable; /** * Job status. */ -public class JobStatus implements Serializable { - private static final long serialVersionUID = 8575969461073736006L; - - /** - * Job ID. - */ - private final UUID id; - - /** - * Job state. - */ - private final JobState state; - - /** - * Job create time. - */ - private final Instant createTime; - - /** - * Job start time. - */ - @Nullable - private final Instant startTime; - - /** - * Job finish time. - */ - @Nullable - private final Instant finishTime; - - private JobStatus(Builder builder) { - this.id = Objects.requireNonNull(builder.id, "id"); - this.state = Objects.requireNonNull(builder.state, "state"); - this.createTime = Objects.requireNonNull(builder.createTime, "createTime"); - this.startTime = builder.startTime; - this.finishTime = builder.finishTime; - } - - /** - * Creates a new builder. - * - * @return Builder. - */ - public static Builder builder() { - return new Builder(); - } - +public interface JobStatus extends Serializable { /** * Returns job ID. * * @return Job ID. */ - public UUID id() { - return id; - } + UUID id(); /** * Returns job state. * * @return Job state. */ - public JobState state() { - return state; - } + JobState state(); /** * Returns job create time. * * @return Job create time. */ - public Instant createTime() { - return createTime; - } + Instant createTime(); /** * Returns job start time. {@code null} if the job has not started yet. @@ -106,9 +53,7 @@ public class JobStatus implements Serializable { * @return Job start time. {@code null} if the job has not started yet. */ @Nullable - public Instant startTime() { - return startTime; - } + Instant startTime(); /** * Returns job finish time. {@code null} if the job has not finished yet. @@ -116,119 +61,5 @@ public class JobStatus implements Serializable { * @return Job finish time. {@code null} if the job has not finished yet. */ @Nullable - public Instant finishTime() { - return finishTime; - } - - /** - * Returns a new builder with the same property values as this JobStatus. - * - * @return Builder. - */ - public Builder toBuilder() { - return new Builder(this); - } - - @Override - public String toString() { - return "JobStatus{" - + "id=" + id - + ", state=" + state - + ", createTime=" + createTime - + ", startTime=" + startTime - + ", finishTime=" + finishTime - + '}'; - } - - /** - * Builder. - */ - public static class Builder { - private UUID id; - private JobState state; - private Instant createTime; - @Nullable - private Instant startTime; - @Nullable - private Instant finishTime; - - /** - * Constructor. - */ - public Builder() { - } - - private Builder(JobStatus status) { - this.id = status.id; - this.state = status.state; - this.createTime = status.createTime; - this.startTime = status.startTime; - this.finishTime = status.finishTime; - } - - /** - * Sets job ID. - * - * @param id Job ID. - * @return This builder. - */ - public Builder id(UUID id) { - this.id = id; - return this; - } - - /** - * Sets job state. - * - * @param state Job state. - * @return This builder. - */ - public Builder state(JobState state) { - this.state = state; - return this; - } - - /** - * Sets job create time. - * - * @param createTime Job create time. - * @return This builder. - */ - public Builder createTime(Instant createTime) { - this.createTime = createTime; - return this; - } - - /** - * Sets job start time. - * - * @param startTime Job start time. - * @return This builder. - */ - public Builder startTime(@Nullable Instant startTime) { - this.startTime = startTime; - return this; - } - - /** - * Sets job finish time. - * - * @param finishTime Job finish time. - * @return This builder. - */ - public Builder finishTime(@Nullable Instant finishTime) { - this.finishTime = finishTime; - return this; - } - - /** - * Builds a new JobStatus. - * - * @return JobStatus. - */ - public JobStatus build() { - return new JobStatus(this); - } - } + Instant finishTime(); } - diff --git a/modules/api/src/main/java/org/apache/ignite/compute/task/ComputeJobRunner.java b/modules/api/src/main/java/org/apache/ignite/compute/task/MapReduceJob.java similarity index 96% rename from modules/api/src/main/java/org/apache/ignite/compute/task/ComputeJobRunner.java rename to modules/api/src/main/java/org/apache/ignite/compute/task/MapReduceJob.java index e8afe84c1e..c3df006c38 100644 --- a/modules/api/src/main/java/org/apache/ignite/compute/task/ComputeJobRunner.java +++ b/modules/api/src/main/java/org/apache/ignite/compute/task/MapReduceJob.java @@ -28,7 +28,7 @@ import org.apache.ignite.network.ClusterNode; * A description of the job to be submitted as a result of the split step of the {@link MapReduceTask}. Reflects the parameters of the * {@link org.apache.ignite.compute.IgniteCompute#submit(Set, JobDescriptor, Object...)} method. */ -public class ComputeJobRunner { +public class MapReduceJob { private final Set<ClusterNode> nodes; private final JobDescriptor jobDescriptor; @@ -36,7 +36,7 @@ public class ComputeJobRunner { private final Object[] args; - private ComputeJobRunner( + private MapReduceJob( Set<ClusterNode> nodes, JobDescriptor jobDescriptor, Object[] args @@ -150,12 +150,12 @@ public class ComputeJobRunner { * * @return Description object. */ - public ComputeJobRunner build() { + public MapReduceJob build() { if (nodes.isEmpty()) { throw new IllegalArgumentException(); } - return new ComputeJobRunner(nodes, jobDescriptor, args); + return new MapReduceJob(nodes, jobDescriptor, args); } } } diff --git a/modules/api/src/main/java/org/apache/ignite/compute/task/MapReduceTask.java b/modules/api/src/main/java/org/apache/ignite/compute/task/MapReduceTask.java index 09e88ce1f3..bab9e338f4 100644 --- a/modules/api/src/main/java/org/apache/ignite/compute/task/MapReduceTask.java +++ b/modules/api/src/main/java/org/apache/ignite/compute/task/MapReduceTask.java @@ -36,7 +36,7 @@ public interface MapReduceTask<R> { * @param args Map reduce task arguments. * @return A list of compute job execution parameters. */ - List<ComputeJobRunner> split(TaskExecutionContext taskContext, Object... args); + List<MapReduceJob> split(TaskExecutionContext taskContext, Object... args); /** * This is a finishing step in the task execution. This method will be called with the map from identifiers of compute jobs submitted as diff --git a/modules/api/src/main/java/org/apache/ignite/compute/TaskExecution.java b/modules/api/src/main/java/org/apache/ignite/compute/task/TaskExecution.java similarity index 94% rename from modules/api/src/main/java/org/apache/ignite/compute/TaskExecution.java rename to modules/api/src/main/java/org/apache/ignite/compute/task/TaskExecution.java index a12fd2995b..5f1c95946a 100644 --- a/modules/api/src/main/java/org/apache/ignite/compute/TaskExecution.java +++ b/modules/api/src/main/java/org/apache/ignite/compute/task/TaskExecution.java @@ -15,12 +15,14 @@ * limitations under the License. */ -package org.apache.ignite.compute; +package org.apache.ignite.compute.task; import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; +import org.apache.ignite.compute.JobExecution; +import org.apache.ignite.compute.JobStatus; import org.jetbrains.annotations.Nullable; /** diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java index 8758f78daf..2b36e2108e 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java @@ -28,7 +28,7 @@ import java.util.concurrent.CompletableFuture; import org.apache.ignite.client.handler.NotificationSender; import org.apache.ignite.compute.DeploymentUnit; import org.apache.ignite.compute.JobStatus; -import org.apache.ignite.compute.TaskExecution; +import org.apache.ignite.compute.task.TaskExecution; import org.apache.ignite.internal.client.proto.ClientMessagePacker; import org.apache.ignite.internal.client.proto.ClientMessageUnpacker; import org.apache.ignite.internal.compute.IgniteComputeInternal; diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java index 87b9f4356e..130652d36a 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java @@ -40,7 +40,7 @@ import org.apache.ignite.compute.IgniteCompute; import org.apache.ignite.compute.JobDescriptor; import org.apache.ignite.compute.JobExecution; import org.apache.ignite.compute.JobExecutionOptions; -import org.apache.ignite.compute.TaskExecution; +import org.apache.ignite.compute.task.TaskExecution; import org.apache.ignite.internal.client.ClientUtils; import org.apache.ignite.internal.client.PayloadInputChannel; import org.apache.ignite.internal.client.PayloadOutputChannel; diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientJobExecution.java b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientJobExecution.java index f4535aec1b..50682b85ce 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientJobExecution.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientJobExecution.java @@ -29,6 +29,7 @@ import org.apache.ignite.internal.client.PayloadInputChannel; import org.apache.ignite.internal.client.ReliableChannel; import org.apache.ignite.internal.client.proto.ClientMessageUnpacker; import org.apache.ignite.internal.client.proto.ClientOp; +import org.apache.ignite.internal.compute.JobStatusImpl; import org.apache.ignite.lang.IgniteException; import org.jetbrains.annotations.Nullable; @@ -141,7 +142,7 @@ class ClientJobExecution<R> implements JobExecution<R> { if (unpacker.tryUnpackNil()) { return null; } - return JobStatus.builder() + return JobStatusImpl.builder() .id(unpacker.unpackUuid()) .state(unpackJobState(unpacker)) .createTime(unpacker.unpackInstant()) diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientTaskExecution.java b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientTaskExecution.java index 9df7d73b6a..b8d5de253e 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientTaskExecution.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientTaskExecution.java @@ -30,7 +30,7 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.function.Function; import org.apache.ignite.compute.JobStatus; -import org.apache.ignite.compute.TaskExecution; +import org.apache.ignite.compute.task.TaskExecution; import org.apache.ignite.internal.client.PayloadInputChannel; import org.apache.ignite.internal.client.ReliableChannel; import org.apache.ignite.internal.util.CompletableFutures; diff --git a/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java b/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java index 513df297be..3fa2517685 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java +++ b/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java @@ -51,7 +51,7 @@ import org.apache.ignite.compute.DeploymentUnit; import org.apache.ignite.compute.IgniteCompute; import org.apache.ignite.compute.JobDescriptor; import org.apache.ignite.compute.JobExecution; -import org.apache.ignite.compute.TaskExecution; +import org.apache.ignite.compute.task.TaskExecution; import org.apache.ignite.compute.version.Version; import org.apache.ignite.internal.client.table.ClientTable; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java index 071f641244..66d4794e06 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java +++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java @@ -46,10 +46,11 @@ import org.apache.ignite.compute.JobExecution; import org.apache.ignite.compute.JobExecutionOptions; import org.apache.ignite.compute.JobState; import org.apache.ignite.compute.JobStatus; -import org.apache.ignite.compute.TaskExecution; +import org.apache.ignite.compute.task.TaskExecution; import org.apache.ignite.internal.compute.ComputeUtils; import org.apache.ignite.internal.compute.IgniteComputeInternal; import org.apache.ignite.internal.compute.JobExecutionContextImpl; +import org.apache.ignite.internal.compute.JobStatusImpl; import org.apache.ignite.internal.table.TableViewInternal; import org.apache.ignite.internal.util.ExceptionUtils; import org.apache.ignite.network.ClusterNode; @@ -201,7 +202,7 @@ public class FakeCompute implements IgniteComputeInternal { private <R> JobExecution<R> jobExecution(CompletableFuture<R> result) { UUID jobId = UUID.randomUUID(); - JobStatus status = JobStatus.builder() + JobStatus status = JobStatusImpl.builder() .id(jobId) .state(EXECUTING) .createTime(Instant.now()) @@ -211,7 +212,7 @@ public class FakeCompute implements IgniteComputeInternal { result.whenComplete((r, throwable) -> { JobState state = throwable != null ? FAILED : COMPLETED; - JobStatus newStatus = status.toBuilder().id(jobId).state(state).finishTime(Instant.now()).build(); + JobStatus newStatus = new JobStatusImpl.Builder(status).id(jobId).state(state).finishTime(Instant.now()).build(); statuses.put(jobId, newStatus); }); return new JobExecution<>() { @@ -239,7 +240,7 @@ public class FakeCompute implements IgniteComputeInternal { private <R> TaskExecution<R> taskExecution(CompletableFuture<R> result) { BiFunction<UUID, JobState, JobStatus> toStatus = (id, jobState) -> - JobStatus.builder() + JobStatusImpl.builder() .id(id) .state(jobState) .createTime(Instant.now()) diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java index 6dbcf5c935..9d11f5bca8 100644 --- a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java +++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java @@ -48,7 +48,7 @@ import org.apache.ignite.compute.ComputeException; import org.apache.ignite.compute.DeploymentUnit; import org.apache.ignite.compute.JobDescriptor; import org.apache.ignite.compute.JobExecution; -import org.apache.ignite.compute.TaskExecution; +import org.apache.ignite.compute.task.TaskExecution; import org.apache.ignite.internal.ClusterPerClassIntegrationTest; import org.apache.ignite.internal.app.IgniteImpl; import org.apache.ignite.internal.util.ExceptionUtils; diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItMapReduceTest.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItMapReduceTest.java index 344d11cd22..fde5d473a7 100644 --- a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItMapReduceTest.java +++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItMapReduceTest.java @@ -38,7 +38,7 @@ import java.time.Instant; import java.util.List; import org.apache.ignite.compute.JobState; import org.apache.ignite.compute.JobStatus; -import org.apache.ignite.compute.TaskExecution; +import org.apache.ignite.compute.task.TaskExecution; import org.apache.ignite.internal.ClusterPerClassIntegrationTest; import org.apache.ignite.internal.app.IgniteImpl; import org.apache.ignite.internal.compute.utils.InteractiveJobs; diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveTasks.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveTasks.java index 3a67dcdcad..d8827cf734 100644 --- a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveTasks.java +++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveTasks.java @@ -31,7 +31,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.compute.JobDescriptor; -import org.apache.ignite.compute.task.ComputeJobRunner; +import org.apache.ignite.compute.task.MapReduceJob; import org.apache.ignite.compute.task.MapReduceTask; import org.apache.ignite.compute.task.TaskExecutionContext; @@ -153,7 +153,7 @@ public final class InteractiveTasks { */ private static class GlobalInteractiveMapReduceTask implements MapReduceTask<List<String>> { @Override - public List<ComputeJobRunner> split(TaskExecutionContext context, Object... args) { + public List<MapReduceJob> split(TaskExecutionContext context, Object... args) { RUNNING_GLOBAL_SPLIT_CNT.incrementAndGet(); offerArgsAsSignals(args); @@ -169,7 +169,7 @@ public final class InteractiveTasks { break; case SPLIT_RETURN_ALL_NODES: return context.ignite().clusterNodes().stream().map(node -> - ComputeJobRunner.builder() + MapReduceJob.builder() .jobDescriptor(JobDescriptor.builder(InteractiveJobs.interactiveJobName()).build()) .nodes(Set.of(node)) .build() diff --git a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/MapReduce.java b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/MapReduce.java index 2ace7c693e..a48578e35b 100644 --- a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/MapReduce.java +++ b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/MapReduce.java @@ -26,18 +26,18 @@ import java.util.UUID; import org.apache.ignite.compute.DeploymentUnit; import org.apache.ignite.compute.JobDescriptor; import org.apache.ignite.compute.JobExecutionOptions; -import org.apache.ignite.compute.task.ComputeJobRunner; +import org.apache.ignite.compute.task.MapReduceJob; import org.apache.ignite.compute.task.MapReduceTask; import org.apache.ignite.compute.task.TaskExecutionContext; /** Map reduce task which runs a {@link GetNodeNameJob} on each node and computes a sum of length of all node names. */ public class MapReduce implements MapReduceTask<Integer> { @Override - public List<ComputeJobRunner> split(TaskExecutionContext taskContext, Object... args) { + public List<MapReduceJob> split(TaskExecutionContext taskContext, Object... args) { List<DeploymentUnit> deploymentUnits = (List<DeploymentUnit>) args[0]; return taskContext.ignite().clusterNodes().stream().map(node -> - ComputeJobRunner.builder() + MapReduceJob.builder() .jobDescriptor(JobDescriptor.builder(GetNodeNameJob.class) .units(deploymentUnits) .options(JobExecutionOptions.builder() diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackIgniteCompute.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackIgniteCompute.java index ee207688cb..dadccee035 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackIgniteCompute.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackIgniteCompute.java @@ -28,7 +28,7 @@ import org.apache.ignite.compute.DeploymentUnit; import org.apache.ignite.compute.IgniteCompute; import org.apache.ignite.compute.JobDescriptor; import org.apache.ignite.compute.JobExecution; -import org.apache.ignite.compute.TaskExecution; +import org.apache.ignite.compute.task.TaskExecution; import org.apache.ignite.internal.compute.task.AntiHijackTaskExecution; import org.apache.ignite.internal.wrapper.Wrapper; import org.apache.ignite.network.ClusterNode; diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java index e05883dd33..15b4c511e8 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java @@ -24,7 +24,7 @@ import java.util.concurrent.CompletableFuture; import org.apache.ignite.compute.DeploymentUnit; import org.apache.ignite.compute.JobExecution; import org.apache.ignite.compute.JobStatus; -import org.apache.ignite.compute.TaskExecution; +import org.apache.ignite.compute.task.TaskExecution; import org.apache.ignite.internal.compute.task.JobSubmitter; import org.apache.ignite.internal.manager.IgniteComponent; import org.apache.ignite.network.ClusterNode; diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java index 54dde09fc9..9ea96b55aa 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java @@ -36,7 +36,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.ignite.compute.DeploymentUnit; import org.apache.ignite.compute.JobExecution; import org.apache.ignite.compute.JobStatus; -import org.apache.ignite.compute.TaskExecution; +import org.apache.ignite.compute.task.TaskExecution; import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService; import org.apache.ignite.internal.compute.configuration.ComputeConfiguration; import org.apache.ignite.internal.compute.executor.ComputeExecutor; diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/FailSafeJobExecution.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/FailSafeJobExecution.java index e2e0a2fe24..b222e1c8b9 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/FailSafeJobExecution.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/FailSafeJobExecution.java @@ -86,7 +86,7 @@ class FailSafeJobExecution<T> implements JobExecution<T> { } private static JobStatus failedStatus() { - return JobStatus.builder().id(UUID.randomUUID()).createTime(Instant.now()).state(JobState.FAILED).build(); + return JobStatusImpl.builder().id(UUID.randomUUID()).createTime(Instant.now()).state(JobState.FAILED).build(); } /** @@ -127,7 +127,7 @@ class FailSafeJobExecution<T> implements JobExecution<T> { capturedStatus.compareAndSet(null, jobStatus); } - return jobStatus.toBuilder() + return new JobStatusImpl.Builder(jobStatus) .createTime(capturedStatus.get().createTime()) .id(capturedStatus.get().id()) .build(); diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java index 9f5a5a180e..a3bb610f9b 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java @@ -47,8 +47,8 @@ import org.apache.ignite.compute.JobExecution; import org.apache.ignite.compute.JobExecutionOptions; import org.apache.ignite.compute.JobStatus; import org.apache.ignite.compute.NodeNotFoundException; -import org.apache.ignite.compute.TaskExecution; -import org.apache.ignite.compute.task.ComputeJobRunner; +import org.apache.ignite.compute.task.MapReduceJob; +import org.apache.ignite.compute.task.TaskExecution; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.placementdriver.PlacementDriver; import org.apache.ignite.internal.replicator.TablePartitionId; @@ -346,7 +346,7 @@ public class IgniteComputeImpl implements IgniteComputeInternal { return sync(executeMapReduceAsync(units, taskClassName, args)); } - private JobExecution<Object> submitJob(ComputeJobRunner runner) { + private JobExecution<Object> submitJob(MapReduceJob runner) { return submit(runner.nodes(), runner.jobDescriptor(), runner.args()); } diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionContextImpl.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionContextImpl.java index 32aac5375c..3684dad9aa 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionContextImpl.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionContextImpl.java @@ -51,7 +51,7 @@ public class JobExecutionContextImpl implements JobExecutionContext { } @Override - public boolean isInterrupted() { + public boolean isCancelled() { return isInterrupted.get(); } diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/TaskExecutionWrapper.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/TaskExecutionWrapper.java index 56e2803a4d..105009ca32 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/TaskExecutionWrapper.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/TaskExecutionWrapper.java @@ -22,7 +22,7 @@ import static org.apache.ignite.internal.lang.IgniteExceptionMapperUtil.convertT import java.util.List; import java.util.concurrent.CompletableFuture; import org.apache.ignite.compute.JobStatus; -import org.apache.ignite.compute.TaskExecution; +import org.apache.ignite.compute.task.TaskExecution; import org.jetbrains.annotations.Nullable; /** diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/state/InMemoryComputeStateMachine.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/state/InMemoryComputeStateMachine.java index 4654ee1d17..2bb280725e 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/state/InMemoryComputeStateMachine.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/state/InMemoryComputeStateMachine.java @@ -32,6 +32,7 @@ import java.util.function.Function; import org.apache.ignite.compute.JobState; import org.apache.ignite.compute.JobStatus; import org.apache.ignite.internal.compute.Cleaner; +import org.apache.ignite.internal.compute.JobStatusImpl; import org.apache.ignite.internal.compute.configuration.ComputeConfiguration; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; @@ -74,7 +75,7 @@ public class InMemoryComputeStateMachine implements ComputeStateMachine { @Override public UUID initJob() { UUID uuid = UUID.randomUUID(); - JobStatus status = JobStatus.builder() + JobStatus status = JobStatusImpl.builder() .id(uuid) .state(QUEUED) .createTime(Instant.now()) @@ -141,7 +142,7 @@ public class InMemoryComputeStateMachine implements ComputeStateMachine { validateStateTransition(jobId, currentState, newState); - JobStatus.Builder builder = currentStatus.toBuilder().state(newState); + JobStatusImpl.Builder builder = new JobStatusImpl.Builder(currentStatus).state(newState); if (newState == EXECUTING) { builder.startTime(Instant.now()); diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/AntiHijackTaskExecution.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/AntiHijackTaskExecution.java index 5adf264c18..64c5a3fb62 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/AntiHijackTaskExecution.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/AntiHijackTaskExecution.java @@ -21,7 +21,7 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import org.apache.ignite.compute.JobStatus; -import org.apache.ignite.compute.TaskExecution; +import org.apache.ignite.compute.task.TaskExecution; import org.apache.ignite.internal.compute.AntiHijackJobExecution; import org.jetbrains.annotations.Nullable; diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/DelegatingTaskExecution.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/DelegatingTaskExecution.java index 3da429d893..0037ff013f 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/DelegatingTaskExecution.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/DelegatingTaskExecution.java @@ -20,7 +20,7 @@ package org.apache.ignite.internal.compute.task; import java.util.List; import java.util.concurrent.CompletableFuture; import org.apache.ignite.compute.JobStatus; -import org.apache.ignite.compute.TaskExecution; +import org.apache.ignite.compute.task.TaskExecution; import org.jetbrains.annotations.Nullable; /** diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/JobSubmitter.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/JobSubmitter.java index 400c41322a..5f5c98829f 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/JobSubmitter.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/JobSubmitter.java @@ -18,7 +18,7 @@ package org.apache.ignite.internal.compute.task; import org.apache.ignite.compute.JobExecution; -import org.apache.ignite.compute.task.ComputeJobRunner; +import org.apache.ignite.compute.task.MapReduceJob; /** * Compute job submitter. @@ -30,5 +30,5 @@ public interface JobSubmitter { * * @param computeJobRunner Computer job start parameters. */ - JobExecution<Object> submit(ComputeJobRunner computeJobRunner); + JobExecution<Object> submit(MapReduceJob computeJobRunner); } diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java index 0b842d1974..0aa0d5ea58 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java @@ -42,9 +42,10 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.compute.JobExecution; import org.apache.ignite.compute.JobState; import org.apache.ignite.compute.JobStatus; -import org.apache.ignite.compute.task.ComputeJobRunner; +import org.apache.ignite.compute.task.MapReduceJob; import org.apache.ignite.compute.task.MapReduceTask; import org.apache.ignite.compute.task.TaskExecutionContext; +import org.apache.ignite.internal.compute.JobStatusImpl; import org.apache.ignite.internal.compute.queue.PriorityQueueExecutor; import org.apache.ignite.internal.compute.queue.QueueExecution; import org.apache.ignite.internal.logger.IgniteLogger; @@ -100,7 +101,7 @@ public class TaskExecutionInternal<R> implements JobExecution<R> { ); executionsFuture = splitExecution.resultAsync().thenApply(splitResult -> { - List<ComputeJobRunner> runners = splitResult.runners(); + List<MapReduceJob> runners = splitResult.runners(); LOG.debug("Submitting {} jobs for {}", runners.size(), taskClass.getName()); return submit(runners, jobSubmitter); }); @@ -125,12 +126,16 @@ public class TaskExecutionInternal<R> implements JobExecution<R> { if (throwable != null) { // Capture the reduce execution failure reason and time. JobState state = throwable instanceof CancellationException ? CANCELED : FAILED; - reduceFailedStatus.set( - splitExecution.status().toBuilder() - .state(state) - .finishTime(Instant.now()) - .build() - ); + + JobStatus status = splitExecution.status(); + if (status != null) { + reduceFailedStatus.set( + new JobStatusImpl.Builder(status) + .state(state) + .finishTime(Instant.now()) + .build() + ); + } } } @@ -159,7 +164,7 @@ public class TaskExecutionInternal<R> implements JobExecution<R> { if (reduceStatus == null) { return null; } - return reduceStatus.toBuilder() + return new JobStatusImpl.Builder(reduceStatus) .id(splitStatus.id()) .createTime(splitStatus.createTime()) .startTime(splitStatus.startTime()) @@ -170,7 +175,7 @@ public class TaskExecutionInternal<R> implements JobExecution<R> { } // At this point split is complete but reduce job is not submitted yet. - return completedFuture(splitStatus.toBuilder() + return completedFuture(new JobStatusImpl.Builder(splitStatus) .state(EXECUTING) .finishTime(null) .build()); @@ -267,7 +272,7 @@ public class TaskExecutionInternal<R> implements JobExecution<R> { }); } - private static <R> List<JobExecution<Object>> submit(List<ComputeJobRunner> runners, JobSubmitter jobSubmitter) { + private static <R> List<JobExecution<Object>> submit(List<MapReduceJob> runners, JobSubmitter jobSubmitter) { return runners.stream() .map(jobSubmitter::submit) .collect(toList()); @@ -276,14 +281,14 @@ public class TaskExecutionInternal<R> implements JobExecution<R> { private static class SplitResult<R> { private final MapReduceTask<R> task; - private final List<ComputeJobRunner> runners; + private final List<MapReduceJob> runners; - private SplitResult(MapReduceTask<R> task, List<ComputeJobRunner> runners) { + private SplitResult(MapReduceTask<R> task, List<MapReduceJob> runners) { this.task = task; this.runners = runners; } - private List<ComputeJobRunner> runners() { + private List<MapReduceJob> runners() { return runners; } diff --git a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java index dfe0efe770..284fc3bf87 100644 --- a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java +++ b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java @@ -68,7 +68,6 @@ import org.apache.ignite.compute.DeploymentUnit; import org.apache.ignite.compute.JobExecution; import org.apache.ignite.compute.JobExecutionContext; import org.apache.ignite.compute.JobState; -import org.apache.ignite.compute.JobStatus; import org.apache.ignite.compute.version.Version; import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService; import org.apache.ignite.internal.compute.configuration.ComputeConfiguration; @@ -316,7 +315,7 @@ class ComputeComponentImplTest extends BaseIgniteAbstractTest { private void respondWithJobStatusResponseWhenJobStatusRequestIsSent(UUID jobId, JobState jobState) { JobStatusResponse jobStatusResponse = new ComputeMessagesFactory().jobStatusResponse() - .status(JobStatus.builder().id(jobId).state(jobState).createTime(Instant.now()).build()) + .status(JobStatusImpl.builder().id(jobId).state(jobState).createTime(Instant.now()).build()) .build(); when(messagingService.invoke(any(ClusterNode.class), argThat(msg -> jobStatusRequestWithJobId(msg, jobId)), anyLong())) .thenReturn(completedFuture(jobStatusResponse)); diff --git a/modules/compute/src/test/java/org/apache/ignite/internal/compute/JobExecutionContextImplTest.java b/modules/compute/src/test/java/org/apache/ignite/internal/compute/JobExecutionContextImplTest.java index 8f9269789f..b1c532bbf7 100644 --- a/modules/compute/src/test/java/org/apache/ignite/internal/compute/JobExecutionContextImplTest.java +++ b/modules/compute/src/test/java/org/apache/ignite/internal/compute/JobExecutionContextImplTest.java @@ -48,10 +48,10 @@ class JobExecutionContextImplTest extends BaseIgniteAbstractTest { JobExecutionContext context = new JobExecutionContextImpl(ignite, isInterrupted, ClassLoader.getSystemClassLoader()); - assertThat(context.isInterrupted(), is(false)); + assertThat(context.isCancelled(), is(false)); isInterrupted.set(true); - assertThat(context.isInterrupted(), is(true)); + assertThat(context.isCancelled(), is(true)); } } diff --git a/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java b/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java index 38b5f2637d..523f99a5f6 100644 --- a/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java +++ b/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java @@ -119,7 +119,7 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest { public Integer execute(JobExecutionContext context, Object... args) { while (true) { try { - if (context.isInterrupted()) { + if (context.isCancelled()) { return 0; } Thread.sleep(100); diff --git a/modules/api/src/main/java/org/apache/ignite/compute/JobStatus.java b/modules/core/src/main/java/org/apache/ignite/internal/compute/JobStatusImpl.java similarity index 87% copy from modules/api/src/main/java/org/apache/ignite/compute/JobStatus.java copy to modules/core/src/main/java/org/apache/ignite/internal/compute/JobStatusImpl.java index df0fd541cd..18c2231723 100644 --- a/modules/api/src/main/java/org/apache/ignite/compute/JobStatus.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/compute/JobStatusImpl.java @@ -15,18 +15,19 @@ * limitations under the License. */ -package org.apache.ignite.compute; +package org.apache.ignite.internal.compute; -import java.io.Serializable; import java.time.Instant; import java.util.Objects; import java.util.UUID; +import org.apache.ignite.compute.JobState; +import org.apache.ignite.compute.JobStatus; import org.jetbrains.annotations.Nullable; /** * Job status. */ -public class JobStatus implements Serializable { +public class JobStatusImpl implements JobStatus { private static final long serialVersionUID = 8575969461073736006L; /** @@ -56,7 +57,7 @@ public class JobStatus implements Serializable { @Nullable private final Instant finishTime; - private JobStatus(Builder builder) { + private JobStatusImpl(Builder builder) { this.id = Objects.requireNonNull(builder.id, "id"); this.state = Objects.requireNonNull(builder.state, "state"); this.createTime = Objects.requireNonNull(builder.createTime, "createTime"); @@ -78,6 +79,7 @@ public class JobStatus implements Serializable { * * @return Job ID. */ + @Override public UUID id() { return id; } @@ -87,6 +89,7 @@ public class JobStatus implements Serializable { * * @return Job state. */ + @Override public JobState state() { return state; } @@ -96,6 +99,7 @@ public class JobStatus implements Serializable { * * @return Job create time. */ + @Override public Instant createTime() { return createTime; } @@ -106,6 +110,7 @@ public class JobStatus implements Serializable { * @return Job start time. {@code null} if the job has not started yet. */ @Nullable + @Override public Instant startTime() { return startTime; } @@ -116,6 +121,7 @@ public class JobStatus implements Serializable { * @return Job finish time. {@code null} if the job has not finished yet. */ @Nullable + @Override public Instant finishTime() { return finishTime; } @@ -158,12 +164,17 @@ public class JobStatus implements Serializable { public Builder() { } - private Builder(JobStatus status) { - this.id = status.id; - this.state = status.state; - this.createTime = status.createTime; - this.startTime = status.startTime; - this.finishTime = status.finishTime; + /** + * Constructor. + * + * @param status Job status for copy. + */ + public Builder(JobStatus status) { + this.id = status.id(); + this.state = status.state(); + this.createTime = status.createTime(); + this.startTime = status.startTime(); + this.finishTime = status.finishTime(); } /** @@ -226,8 +237,8 @@ public class JobStatus implements Serializable { * * @return JobStatus. */ - public JobStatus build() { - return new JobStatus(this); + public JobStatusImpl build() { + return new JobStatusImpl(this); } } } diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java index e028b48af6..f6d310ae9a 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java @@ -76,9 +76,9 @@ import org.apache.ignite.compute.IgniteCompute; import org.apache.ignite.compute.JobDescriptor; import org.apache.ignite.compute.JobExecution; import org.apache.ignite.compute.JobExecutionContext; -import org.apache.ignite.compute.TaskExecution; -import org.apache.ignite.compute.task.ComputeJobRunner; +import org.apache.ignite.compute.task.MapReduceJob; import org.apache.ignite.compute.task.MapReduceTask; +import org.apache.ignite.compute.task.TaskExecution; import org.apache.ignite.compute.task.TaskExecutionContext; import org.apache.ignite.lang.IgniteException; import org.apache.ignite.network.ClusterNode; @@ -804,9 +804,9 @@ public class ItThinClientComputeTest extends ItAbstractThinClientTest { private static class MapReduceNodeNameTask implements MapReduceTask<String> { @Override - public List<ComputeJobRunner> split(TaskExecutionContext context, Object... args) { + public List<MapReduceJob> split(TaskExecutionContext context, Object... args) { return context.ignite().clusterNodes().stream() - .map(node -> ComputeJobRunner.builder() + .map(node -> MapReduceJob.builder() .jobDescriptor(JobDescriptor.builder(NodeNameJob.class).build()) .nodes(Set.of(node)) .args(args) @@ -824,9 +824,9 @@ public class ItThinClientComputeTest extends ItAbstractThinClientTest { private static class MapReduceArgsTask implements MapReduceTask<String> { @Override - public List<ComputeJobRunner> split(TaskExecutionContext context, Object... args) { + public List<MapReduceJob> split(TaskExecutionContext context, Object... args) { return context.ignite().clusterNodes().stream() - .map(node -> ComputeJobRunner.builder() + .map(node -> MapReduceJob.builder() .jobDescriptor(JobDescriptor.builder(ConcatJob.class).build()) .nodes(Set.of(node)) .args(args) @@ -844,7 +844,7 @@ public class ItThinClientComputeTest extends ItAbstractThinClientTest { private static class MapReduceExceptionOnSplitTask implements MapReduceTask<String> { @Override - public List<ComputeJobRunner> split(TaskExecutionContext context, Object... args) { + public List<MapReduceJob> split(TaskExecutionContext context, Object... args) { throw new CustomException(TRACE_ID, COLUMN_ALREADY_EXISTS_ERR, "Custom job error", null); } @@ -857,9 +857,9 @@ public class ItThinClientComputeTest extends ItAbstractThinClientTest { private static class MapReduceExceptionOnReduceTask implements MapReduceTask<String> { @Override - public List<ComputeJobRunner> split(TaskExecutionContext context, Object... args) { + public List<MapReduceJob> split(TaskExecutionContext context, Object... args) { return context.ignite().clusterNodes().stream() - .map(node -> ComputeJobRunner.builder() + .map(node -> MapReduceJob.builder() .jobDescriptor(JobDescriptor.builder(NodeNameJob.class).build()) .nodes(Set.of(node)) .args(args)
