This is an automated email from the ASF dual-hosted git repository. mpochatkin pushed a commit to branch IGNITE-22434 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 3e7bcb957c863d1cd693a0d6f6db87c9657e227b Author: Mikhail Pochatkin <[email protected]> AuthorDate: Fri Jun 28 10:32:57 2024 +0300 IGNITE-22434 Do not inherit TaskExecution from JobExecution --- .../apache/ignite/compute/task/TaskExecution.java | 46 +++++++++++++++++++++- .../ignite/internal/compute/ItMapReduceTest.java | 8 ++-- .../internal/compute/AntiHijackJobExecution.java | 2 +- .../internal/compute/ComputeComponentImpl.java | 2 +- .../internal/compute/TaskExecutionWrapper.java | 24 ++++++++++- ...ecution.java => TaskToJobExecutionWrapper.java} | 32 ++++++--------- .../compute/task/AntiHijackTaskExecution.java | 32 +++++++++++++-- modules/runner/build.gradle | 1 + .../runner/app/client/ItThinClientComputeTest.java | 5 ++- 9 files changed, 118 insertions(+), 34 deletions(-) diff --git a/modules/api/src/main/java/org/apache/ignite/compute/task/TaskExecution.java b/modules/api/src/main/java/org/apache/ignite/compute/task/TaskExecution.java index 280aadfe28..836119891c 100644 --- a/modules/api/src/main/java/org/apache/ignite/compute/task/TaskExecution.java +++ b/modules/api/src/main/java/org/apache/ignite/compute/task/TaskExecution.java @@ -30,7 +30,7 @@ import org.jetbrains.annotations.Nullable; * * @param <R> Task result type. */ -public interface TaskExecution<R> extends JobExecution<R> { +public interface TaskExecution<R> { /** * Returns a collection of states of the jobs which are executing under this task. The resulting future is completed only after the * jobs are submitted for execution. The list could contain {@code null} values if the time for retaining job state has been exceeded. @@ -50,4 +50,48 @@ public interface TaskExecution<R> extends JobExecution<R> { .map(state -> state != null ? state.id() : null) .collect(Collectors.toList())); } + + /** + * Returns task's execution result. + * + * @return Job's execution result future. + */ + CompletableFuture<R> resultAsync(); + + /** + * Returns the current state of the task. The task state may be deleted and thus return {@code null} if the time for retaining task + * state has been exceeded. + * + * @return The current state of the task, or {@code null} if the task state no longer exists due to exceeding the retention time limit. + */ + CompletableFuture<@Nullable JobState> stateAsync(); + + /** + * Returns the id of the task. The task state may be deleted and thus return {@code null} if the time for retaining task state has been + * exceeded. + * + * @return The id of the task, or {@code null} if the job state no longer exists due to exceeding the retention time limit. + */ + default CompletableFuture<@Nullable UUID> idAsync() { + return stateAsync().thenApply(state -> state != null ? state.id() : null); + } + + /** + * Cancels the task. + * + * @return The future which will be completed with {@code true} when the task is cancelled, {@code false} when the task couldn't be + * cancelled (if it's already completed or in the process of cancelling), or {@code null} if the task no longer exists due to + * exceeding the retention time limit. + */ + CompletableFuture<@Nullable Boolean> cancelAsync(); + + /** + * Changes task priority. After priority change task will be the last in the queue of tasks with the same priority. + * + * @param newPriority new priority. + * @return The future which will be completed with {@code true} when the priority is changed, {@code false} when the priority couldn't + * be changed (if the job is already executing or completed), or {@code null} if the job no longer exists due to exceeding the + * retention time limit. + */ + CompletableFuture<@Nullable Boolean> changePriorityAsync(int newPriority); } diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItMapReduceTest.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItMapReduceTest.java index 834b427fb3..1ef59d9ab4 100644 --- a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItMapReduceTest.java +++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItMapReduceTest.java @@ -68,7 +68,7 @@ class ItMapReduceTest extends ClusterPerClassIntegrationTest { // Given running task. TaskExecution<List<String>> taskExecution = entryNode.compute().submitMapReduce(List.of(), InteractiveTasks.GlobalApi.name()); - TestingJobExecution<List<String>> testExecution = new TestingJobExecution<>(taskExecution); + TestingJobExecution<List<String>> testExecution = new TestingJobExecution<>(new TaskToJobExecutionWrapper<>(taskExecution)); testExecution.assertExecuting(); InteractiveTasks.GlobalApi.assertAlive(); @@ -179,7 +179,7 @@ class ItMapReduceTest extends ClusterPerClassIntegrationTest { // Given running task. TaskExecution<List<String>> taskExecution = entryNode.compute().submitMapReduce(List.of(), InteractiveTasks.GlobalApi.name()); - TestingJobExecution<List<String>> testExecution = new TestingJobExecution<>(taskExecution); + TestingJobExecution<List<String>> testExecution = new TestingJobExecution<>(new TaskToJobExecutionWrapper<>(taskExecution)); testExecution.assertExecuting(); InteractiveTasks.GlobalApi.assertAlive(); @@ -236,7 +236,7 @@ class ItMapReduceTest extends ClusterPerClassIntegrationTest { // Given running task. String arg = cooperativeCancel ? "NO_INTERRUPT" : ""; TaskExecution<List<String>> taskExecution = entryNode.compute().submitMapReduce(List.of(), InteractiveTasks.GlobalApi.name(), arg); - TestingJobExecution<List<String>> testExecution = new TestingJobExecution<>(taskExecution); + TestingJobExecution<List<String>> testExecution = new TestingJobExecution<>(new TaskToJobExecutionWrapper<>(taskExecution)); testExecution.assertExecuting(); InteractiveTasks.GlobalApi.assertAlive(); @@ -267,7 +267,7 @@ class ItMapReduceTest extends ClusterPerClassIntegrationTest { private static TaskExecution<List<String>> startTask(IgniteImpl entryNode, Object... args) throws InterruptedException { TaskExecution<List<String>> taskExecution = entryNode.compute().submitMapReduce(List.of(), InteractiveTasks.GlobalApi.name(), args); - new TestingJobExecution<>(taskExecution).assertExecuting(); + new TestingJobExecution<>(new TaskToJobExecutionWrapper<>(taskExecution)).assertExecuting(); InteractiveTasks.GlobalApi.assertAlive(); return taskExecution; } diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackJobExecution.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackJobExecution.java index 20901f947d..d68af92a79 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackJobExecution.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackJobExecution.java @@ -59,7 +59,7 @@ public class AntiHijackJobExecution<R> implements JobExecution<R> { return preventThreadHijack(execution.changePriorityAsync(newPriority)); } - protected <T> CompletableFuture<T> preventThreadHijack(CompletableFuture<T> originalFuture) { + private <T> CompletableFuture<T> preventThreadHijack(CompletableFuture<T> originalFuture) { return PublicApiThreading.preventThreadHijack(originalFuture, asyncContinuationExecutor); } } diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java index ee49434632..856f82f18a 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java @@ -174,7 +174,7 @@ public class ComputeComponentImpl implements ComputeComponent { inFlightFutures.registerFuture(taskFuture); DelegatingTaskExecution<R> result = new DelegatingTaskExecution<>(taskFuture); - result.idAsync().thenAccept(jobId -> executionManager.addExecution(jobId, result)); + result.idAsync().thenAccept(jobId -> executionManager.addExecution(jobId, new TaskToJobExecutionWrapper<>(result))); return result; } finally { busyLock.leaveBusy(); diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/TaskExecutionWrapper.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/TaskExecutionWrapper.java index d2cabdc8e7..8d013c9021 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/TaskExecutionWrapper.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/TaskExecutionWrapper.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.compute; +import static org.apache.ignite.internal.compute.ComputeUtils.convertToComputeFuture; import static org.apache.ignite.internal.lang.IgniteExceptionMapperUtil.convertToPublicFuture; import java.util.List; @@ -30,11 +31,10 @@ import org.jetbrains.annotations.Nullable; * * @param <R> Result type. */ -class TaskExecutionWrapper<R> extends JobExecutionWrapper<R> implements TaskExecution<R> { +class TaskExecutionWrapper<R> implements TaskExecution<R> { private final TaskExecution<R> delegate; TaskExecutionWrapper(TaskExecution<R> delegate) { - super(delegate); this.delegate = delegate; } @@ -42,4 +42,24 @@ class TaskExecutionWrapper<R> extends JobExecutionWrapper<R> implements TaskExec public CompletableFuture<List<@Nullable JobState>> statesAsync() { return convertToPublicFuture(delegate.statesAsync()); } + + @Override + public CompletableFuture<R> resultAsync() { + return convertToComputeFuture(delegate.resultAsync()); + } + + @Override + public CompletableFuture<@Nullable JobState> stateAsync() { + return convertToPublicFuture(delegate.stateAsync()); + } + + @Override + public CompletableFuture<@Nullable Boolean> cancelAsync() { + return convertToPublicFuture(delegate.cancelAsync()); + } + + @Override + public CompletableFuture<@Nullable Boolean> changePriorityAsync(int newPriority) { + return convertToPublicFuture(delegate.changePriorityAsync(newPriority)); + } } diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackJobExecution.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/TaskToJobExecutionWrapper.java similarity index 56% copy from modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackJobExecution.java copy to modules/compute/src/main/java/org/apache/ignite/internal/compute/TaskToJobExecutionWrapper.java index 20901f947d..3a778c1241 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackJobExecution.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/TaskToJobExecutionWrapper.java @@ -18,48 +18,40 @@ package org.apache.ignite.internal.compute; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; import org.apache.ignite.compute.JobExecution; import org.apache.ignite.compute.JobState; -import org.apache.ignite.internal.thread.PublicApiThreading; +import org.apache.ignite.compute.task.TaskExecution; import org.jetbrains.annotations.Nullable; /** - * Wrapper around {@link JobExecution} that adds protection against thread hijacking by users. + * Representation of {@link TaskExecution} as {@link JobExecution}. + * + * @param <R> Job result type. */ -public class AntiHijackJobExecution<R> implements JobExecution<R> { - private final JobExecution<R> execution; - private final Executor asyncContinuationExecutor; +public class TaskToJobExecutionWrapper<R> implements JobExecution<R> { + private final TaskExecution<R> taskExecution; - /** - * Constructor. - */ - public AntiHijackJobExecution(JobExecution<R> execution, Executor asyncContinuationExecutor) { - this.execution = execution; - this.asyncContinuationExecutor = asyncContinuationExecutor; + public TaskToJobExecutionWrapper(TaskExecution<R> taskExecution) { + this.taskExecution = taskExecution; } @Override public CompletableFuture<R> resultAsync() { - return preventThreadHijack(execution.resultAsync()); + return taskExecution.resultAsync(); } @Override public CompletableFuture<@Nullable JobState> stateAsync() { - return preventThreadHijack(execution.stateAsync()); + return taskExecution.stateAsync(); } @Override public CompletableFuture<@Nullable Boolean> cancelAsync() { - return preventThreadHijack(execution.cancelAsync()); + return taskExecution.cancelAsync(); } @Override public CompletableFuture<@Nullable Boolean> changePriorityAsync(int newPriority) { - return preventThreadHijack(execution.changePriorityAsync(newPriority)); - } - - protected <T> CompletableFuture<T> preventThreadHijack(CompletableFuture<T> originalFuture) { - return PublicApiThreading.preventThreadHijack(originalFuture, asyncContinuationExecutor); + return taskExecution.changePriorityAsync(newPriority); } } diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/AntiHijackTaskExecution.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/AntiHijackTaskExecution.java index c8c9a7d96e..1faa01453f 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/AntiHijackTaskExecution.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/AntiHijackTaskExecution.java @@ -22,15 +22,17 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import org.apache.ignite.compute.JobState; import org.apache.ignite.compute.task.TaskExecution; -import org.apache.ignite.internal.compute.AntiHijackJobExecution; +import org.apache.ignite.internal.thread.PublicApiThreading; import org.jetbrains.annotations.Nullable; /** * Wrapper around {@link TaskExecution} that adds protection against thread hijacking by users. */ -public class AntiHijackTaskExecution<R> extends AntiHijackJobExecution<R> implements TaskExecution<R> { +public class AntiHijackTaskExecution<R> implements TaskExecution<R> { private final TaskExecution<R> execution; + private final Executor asyncContinuationExecutor; + /** * Constructor. * @@ -38,12 +40,36 @@ public class AntiHijackTaskExecution<R> extends AntiHijackJobExecution<R> implem * @param asyncContinuationExecutor Executor to which the execution will be resubmitted. */ public AntiHijackTaskExecution(TaskExecution<R> execution, Executor asyncContinuationExecutor) { - super(execution, asyncContinuationExecutor); this.execution = execution; + this.asyncContinuationExecutor = asyncContinuationExecutor; } @Override public CompletableFuture<List<@Nullable JobState>> statesAsync() { return preventThreadHijack(execution.statesAsync()); } + + @Override + public CompletableFuture<R> resultAsync() { + return preventThreadHijack(execution.resultAsync()); + } + + @Override + public CompletableFuture<@Nullable JobState> stateAsync() { + return preventThreadHijack(execution.stateAsync()); + } + + @Override + public CompletableFuture<@Nullable Boolean> cancelAsync() { + return preventThreadHijack(execution.cancelAsync()); + } + + @Override + public CompletableFuture<@Nullable Boolean> changePriorityAsync(int newPriority) { + return preventThreadHijack(execution.changePriorityAsync(newPriority)); + } + + private <T> CompletableFuture<T> preventThreadHijack(CompletableFuture<T> originalFuture) { + return PublicApiThreading.preventThreadHijack(originalFuture, asyncContinuationExecutor); + } } diff --git a/modules/runner/build.gradle b/modules/runner/build.gradle index d3a710d688..0de5eb81d0 100644 --- a/modules/runner/build.gradle +++ b/modules/runner/build.gradle @@ -130,6 +130,7 @@ dependencies { integrationTestImplementation project(':ignite-raft-api') integrationTestImplementation project(':ignite-replicator') integrationTestImplementation project(':ignite-client') + integrationTestImplementation project(':ignite-compute') integrationTestImplementation project(':ignite-client-handler') integrationTestImplementation project(':ignite-storage-api') integrationTestImplementation project(':ignite-storage-api') diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java index 4bcb14fe44..045d784f92 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java @@ -84,6 +84,7 @@ import org.apache.ignite.compute.task.MapReduceJob; import org.apache.ignite.compute.task.MapReduceTask; import org.apache.ignite.compute.task.TaskExecution; import org.apache.ignite.compute.task.TaskExecutionContext; +import org.apache.ignite.internal.compute.TaskToJobExecutionWrapper; import org.apache.ignite.lang.IgniteException; import org.apache.ignite.network.ClusterNode; import org.apache.ignite.table.Tuple; @@ -727,8 +728,8 @@ public class ItThinClientComputeTest extends ItAbstractThinClientTest { @ParameterizedTest @ValueSource(classes = {MapReduceExceptionOnSplitTask.class, MapReduceExceptionOnReduceTask.class}) void testExecuteMapReduceExceptionPropagation(Class<?> taskClass) { - IgniteException cause = getExceptionInJobExecutionAsync( - client().compute().submitMapReduce(List.of(), taskClass.getName()) + IgniteException cause = getExceptionInJobExecutionAsync(new TaskToJobExecutionWrapper<>( + client().compute().submitMapReduce(List.of(), taskClass.getName())) ); assertThat(cause.getMessage(), containsString("Custom job error"));
