This is an automated email from the ASF dual-hosted git repository.

mpochatkin pushed a commit to branch IGNITE-22434
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 3e7bcb957c863d1cd693a0d6f6db87c9657e227b
Author: Mikhail Pochatkin <[email protected]>
AuthorDate: Fri Jun 28 10:32:57 2024 +0300

    IGNITE-22434 Do not inherit TaskExecution from JobExecution
---
 .../apache/ignite/compute/task/TaskExecution.java  | 46 +++++++++++++++++++++-
 .../ignite/internal/compute/ItMapReduceTest.java   |  8 ++--
 .../internal/compute/AntiHijackJobExecution.java   |  2 +-
 .../internal/compute/ComputeComponentImpl.java     |  2 +-
 .../internal/compute/TaskExecutionWrapper.java     | 24 ++++++++++-
 ...ecution.java => TaskToJobExecutionWrapper.java} | 32 ++++++---------
 .../compute/task/AntiHijackTaskExecution.java      | 32 +++++++++++++--
 modules/runner/build.gradle                        |  1 +
 .../runner/app/client/ItThinClientComputeTest.java |  5 ++-
 9 files changed, 118 insertions(+), 34 deletions(-)

diff --git 
a/modules/api/src/main/java/org/apache/ignite/compute/task/TaskExecution.java 
b/modules/api/src/main/java/org/apache/ignite/compute/task/TaskExecution.java
index 280aadfe28..836119891c 100644
--- 
a/modules/api/src/main/java/org/apache/ignite/compute/task/TaskExecution.java
+++ 
b/modules/api/src/main/java/org/apache/ignite/compute/task/TaskExecution.java
@@ -30,7 +30,7 @@ import org.jetbrains.annotations.Nullable;
  *
  * @param <R> Task result type.
  */
-public interface TaskExecution<R> extends JobExecution<R> {
+public interface TaskExecution<R> {
     /**
      * Returns a collection of states of the jobs which are executing under 
this task. The resulting future is completed only after the
      * jobs are submitted for execution. The list could contain {@code null} 
values if the time for retaining job state has been exceeded.
@@ -50,4 +50,48 @@ public interface TaskExecution<R> extends JobExecution<R> {
                 .map(state -> state != null ? state.id() : null)
                 .collect(Collectors.toList()));
     }
+
+    /**
+     * Returns task's execution result.
+     *
+     * @return Job's execution result future.
+     */
+    CompletableFuture<R> resultAsync();
+
+    /**
+     * Returns the current state of the task. The task state may be deleted 
and thus return {@code null} if the time for retaining task
+     * state has been exceeded.
+     *
+     * @return The current state of the task, or {@code null} if the task 
state no longer exists due to exceeding the retention time limit.
+     */
+    CompletableFuture<@Nullable JobState> stateAsync();
+
+    /**
+     * Returns the id of the task. The task state may be deleted and thus 
return {@code null} if the time for retaining task state has been
+     * exceeded.
+     *
+     * @return The id of the task, or {@code null} if the job state no longer 
exists due to exceeding the retention time limit.
+     */
+    default CompletableFuture<@Nullable UUID> idAsync() {
+        return stateAsync().thenApply(state -> state != null ? state.id() : 
null);
+    }
+
+    /**
+     * Cancels the task.
+     *
+     * @return The future which will be completed with {@code true} when the 
task is cancelled, {@code false} when the task couldn't be
+     *         cancelled (if it's already completed or in the process of 
cancelling), or {@code null} if the task no longer exists due to
+     *         exceeding the retention time limit.
+     */
+    CompletableFuture<@Nullable Boolean> cancelAsync();
+
+    /**
+     * Changes task priority. After priority change task will be the last in 
the queue of tasks with the same priority.
+     *
+     * @param newPriority new priority.
+     * @return The future which will be completed with {@code true} when the 
priority is changed, {@code false} when the priority couldn't
+     *         be changed (if the job is already executing or completed), or 
{@code null} if the job no longer exists due to exceeding the
+     *         retention time limit.
+     */
+    CompletableFuture<@Nullable Boolean> changePriorityAsync(int newPriority);
 }
diff --git 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItMapReduceTest.java
 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItMapReduceTest.java
index 834b427fb3..1ef59d9ab4 100644
--- 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItMapReduceTest.java
+++ 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItMapReduceTest.java
@@ -68,7 +68,7 @@ class ItMapReduceTest extends ClusterPerClassIntegrationTest {
 
         // Given running task.
         TaskExecution<List<String>> taskExecution = 
entryNode.compute().submitMapReduce(List.of(), 
InteractiveTasks.GlobalApi.name());
-        TestingJobExecution<List<String>> testExecution = new 
TestingJobExecution<>(taskExecution);
+        TestingJobExecution<List<String>> testExecution = new 
TestingJobExecution<>(new TaskToJobExecutionWrapper<>(taskExecution));
         testExecution.assertExecuting();
         InteractiveTasks.GlobalApi.assertAlive();
 
@@ -179,7 +179,7 @@ class ItMapReduceTest extends 
ClusterPerClassIntegrationTest {
 
         // Given running task.
         TaskExecution<List<String>> taskExecution = 
entryNode.compute().submitMapReduce(List.of(), 
InteractiveTasks.GlobalApi.name());
-        TestingJobExecution<List<String>> testExecution = new 
TestingJobExecution<>(taskExecution);
+        TestingJobExecution<List<String>> testExecution = new 
TestingJobExecution<>(new TaskToJobExecutionWrapper<>(taskExecution));
         testExecution.assertExecuting();
         InteractiveTasks.GlobalApi.assertAlive();
 
@@ -236,7 +236,7 @@ class ItMapReduceTest extends 
ClusterPerClassIntegrationTest {
         // Given running task.
         String arg = cooperativeCancel ? "NO_INTERRUPT" : "";
         TaskExecution<List<String>> taskExecution = 
entryNode.compute().submitMapReduce(List.of(), 
InteractiveTasks.GlobalApi.name(), arg);
-        TestingJobExecution<List<String>> testExecution = new 
TestingJobExecution<>(taskExecution);
+        TestingJobExecution<List<String>> testExecution = new 
TestingJobExecution<>(new TaskToJobExecutionWrapper<>(taskExecution));
         testExecution.assertExecuting();
         InteractiveTasks.GlobalApi.assertAlive();
 
@@ -267,7 +267,7 @@ class ItMapReduceTest extends 
ClusterPerClassIntegrationTest {
 
     private static TaskExecution<List<String>> startTask(IgniteImpl entryNode, 
Object... args) throws InterruptedException {
         TaskExecution<List<String>> taskExecution = 
entryNode.compute().submitMapReduce(List.of(), 
InteractiveTasks.GlobalApi.name(), args);
-        new TestingJobExecution<>(taskExecution).assertExecuting();
+        new TestingJobExecution<>(new 
TaskToJobExecutionWrapper<>(taskExecution)).assertExecuting();
         InteractiveTasks.GlobalApi.assertAlive();
         return taskExecution;
     }
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackJobExecution.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackJobExecution.java
index 20901f947d..d68af92a79 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackJobExecution.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackJobExecution.java
@@ -59,7 +59,7 @@ public class AntiHijackJobExecution<R> implements 
JobExecution<R> {
         return preventThreadHijack(execution.changePriorityAsync(newPriority));
     }
 
-    protected <T> CompletableFuture<T> 
preventThreadHijack(CompletableFuture<T> originalFuture) {
+    private <T> CompletableFuture<T> preventThreadHijack(CompletableFuture<T> 
originalFuture) {
         return PublicApiThreading.preventThreadHijack(originalFuture, 
asyncContinuationExecutor);
     }
 }
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
index ee49434632..856f82f18a 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
@@ -174,7 +174,7 @@ public class ComputeComponentImpl implements 
ComputeComponent {
             inFlightFutures.registerFuture(taskFuture);
 
             DelegatingTaskExecution<R> result = new 
DelegatingTaskExecution<>(taskFuture);
-            result.idAsync().thenAccept(jobId -> 
executionManager.addExecution(jobId, result));
+            result.idAsync().thenAccept(jobId -> 
executionManager.addExecution(jobId, new TaskToJobExecutionWrapper<>(result)));
             return result;
         } finally {
             busyLock.leaveBusy();
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/TaskExecutionWrapper.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/TaskExecutionWrapper.java
index d2cabdc8e7..8d013c9021 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/TaskExecutionWrapper.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/TaskExecutionWrapper.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.compute;
 
+import static 
org.apache.ignite.internal.compute.ComputeUtils.convertToComputeFuture;
 import static 
org.apache.ignite.internal.lang.IgniteExceptionMapperUtil.convertToPublicFuture;
 
 import java.util.List;
@@ -30,11 +31,10 @@ import org.jetbrains.annotations.Nullable;
  *
  * @param <R> Result type.
  */
-class TaskExecutionWrapper<R> extends JobExecutionWrapper<R> implements 
TaskExecution<R> {
+class TaskExecutionWrapper<R> implements TaskExecution<R> {
     private final TaskExecution<R> delegate;
 
     TaskExecutionWrapper(TaskExecution<R> delegate) {
-        super(delegate);
         this.delegate = delegate;
     }
 
@@ -42,4 +42,24 @@ class TaskExecutionWrapper<R> extends JobExecutionWrapper<R> 
implements TaskExec
     public CompletableFuture<List<@Nullable JobState>> statesAsync() {
         return convertToPublicFuture(delegate.statesAsync());
     }
+
+    @Override
+    public CompletableFuture<R> resultAsync() {
+        return convertToComputeFuture(delegate.resultAsync());
+    }
+
+    @Override
+    public CompletableFuture<@Nullable JobState> stateAsync() {
+        return convertToPublicFuture(delegate.stateAsync());
+    }
+
+    @Override
+    public CompletableFuture<@Nullable Boolean> cancelAsync() {
+        return convertToPublicFuture(delegate.cancelAsync());
+    }
+
+    @Override
+    public CompletableFuture<@Nullable Boolean> changePriorityAsync(int 
newPriority) {
+        return 
convertToPublicFuture(delegate.changePriorityAsync(newPriority));
+    }
 }
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackJobExecution.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/TaskToJobExecutionWrapper.java
similarity index 56%
copy from 
modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackJobExecution.java
copy to 
modules/compute/src/main/java/org/apache/ignite/internal/compute/TaskToJobExecutionWrapper.java
index 20901f947d..3a778c1241 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackJobExecution.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/TaskToJobExecutionWrapper.java
@@ -18,48 +18,40 @@
 package org.apache.ignite.internal.compute;
 
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
 import org.apache.ignite.compute.JobExecution;
 import org.apache.ignite.compute.JobState;
-import org.apache.ignite.internal.thread.PublicApiThreading;
+import org.apache.ignite.compute.task.TaskExecution;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * Wrapper around {@link JobExecution} that adds protection against thread 
hijacking by users.
+ * Representation of {@link TaskExecution} as {@link JobExecution}.
+ *
+ * @param <R> Job result type.
  */
-public class AntiHijackJobExecution<R> implements JobExecution<R> {
-    private final JobExecution<R> execution;
-    private final Executor asyncContinuationExecutor;
+public class TaskToJobExecutionWrapper<R> implements JobExecution<R> {
+    private final TaskExecution<R> taskExecution;
 
-    /**
-     * Constructor.
-     */
-    public AntiHijackJobExecution(JobExecution<R> execution, Executor 
asyncContinuationExecutor) {
-        this.execution = execution;
-        this.asyncContinuationExecutor = asyncContinuationExecutor;
+    public TaskToJobExecutionWrapper(TaskExecution<R> taskExecution) {
+        this.taskExecution = taskExecution;
     }
 
     @Override
     public CompletableFuture<R> resultAsync() {
-        return preventThreadHijack(execution.resultAsync());
+        return taskExecution.resultAsync();
     }
 
     @Override
     public CompletableFuture<@Nullable JobState> stateAsync() {
-        return preventThreadHijack(execution.stateAsync());
+        return taskExecution.stateAsync();
     }
 
     @Override
     public CompletableFuture<@Nullable Boolean> cancelAsync() {
-        return preventThreadHijack(execution.cancelAsync());
+        return taskExecution.cancelAsync();
     }
 
     @Override
     public CompletableFuture<@Nullable Boolean> changePriorityAsync(int 
newPriority) {
-        return preventThreadHijack(execution.changePriorityAsync(newPriority));
-    }
-
-    protected <T> CompletableFuture<T> 
preventThreadHijack(CompletableFuture<T> originalFuture) {
-        return PublicApiThreading.preventThreadHijack(originalFuture, 
asyncContinuationExecutor);
+        return taskExecution.changePriorityAsync(newPriority);
     }
 }
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/AntiHijackTaskExecution.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/AntiHijackTaskExecution.java
index c8c9a7d96e..1faa01453f 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/AntiHijackTaskExecution.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/AntiHijackTaskExecution.java
@@ -22,15 +22,17 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import org.apache.ignite.compute.JobState;
 import org.apache.ignite.compute.task.TaskExecution;
-import org.apache.ignite.internal.compute.AntiHijackJobExecution;
+import org.apache.ignite.internal.thread.PublicApiThreading;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * Wrapper around {@link TaskExecution} that adds protection against thread 
hijacking by users.
  */
-public class AntiHijackTaskExecution<R> extends AntiHijackJobExecution<R> 
implements TaskExecution<R> {
+public class AntiHijackTaskExecution<R> implements TaskExecution<R> {
     private final TaskExecution<R> execution;
 
+    private final Executor asyncContinuationExecutor;
+
     /**
      * Constructor.
      *
@@ -38,12 +40,36 @@ public class AntiHijackTaskExecution<R> extends 
AntiHijackJobExecution<R> implem
      * @param asyncContinuationExecutor Executor to which the execution will 
be resubmitted.
      */
     public AntiHijackTaskExecution(TaskExecution<R> execution, Executor 
asyncContinuationExecutor) {
-        super(execution, asyncContinuationExecutor);
         this.execution = execution;
+        this.asyncContinuationExecutor = asyncContinuationExecutor;
     }
 
     @Override
     public CompletableFuture<List<@Nullable JobState>> statesAsync() {
         return preventThreadHijack(execution.statesAsync());
     }
+
+    @Override
+    public CompletableFuture<R> resultAsync() {
+        return preventThreadHijack(execution.resultAsync());
+    }
+
+    @Override
+    public CompletableFuture<@Nullable JobState> stateAsync() {
+        return preventThreadHijack(execution.stateAsync());
+    }
+
+    @Override
+    public CompletableFuture<@Nullable Boolean> cancelAsync() {
+        return preventThreadHijack(execution.cancelAsync());
+    }
+
+    @Override
+    public CompletableFuture<@Nullable Boolean> changePriorityAsync(int 
newPriority) {
+        return preventThreadHijack(execution.changePriorityAsync(newPriority));
+    }
+
+    private <T> CompletableFuture<T> preventThreadHijack(CompletableFuture<T> 
originalFuture) {
+        return PublicApiThreading.preventThreadHijack(originalFuture, 
asyncContinuationExecutor);
+    }
 }
diff --git a/modules/runner/build.gradle b/modules/runner/build.gradle
index d3a710d688..0de5eb81d0 100644
--- a/modules/runner/build.gradle
+++ b/modules/runner/build.gradle
@@ -130,6 +130,7 @@ dependencies {
     integrationTestImplementation project(':ignite-raft-api')
     integrationTestImplementation project(':ignite-replicator')
     integrationTestImplementation project(':ignite-client')
+    integrationTestImplementation project(':ignite-compute')
     integrationTestImplementation project(':ignite-client-handler')
     integrationTestImplementation project(':ignite-storage-api')
     integrationTestImplementation project(':ignite-storage-api')
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
index 4bcb14fe44..045d784f92 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
@@ -84,6 +84,7 @@ import org.apache.ignite.compute.task.MapReduceJob;
 import org.apache.ignite.compute.task.MapReduceTask;
 import org.apache.ignite.compute.task.TaskExecution;
 import org.apache.ignite.compute.task.TaskExecutionContext;
+import org.apache.ignite.internal.compute.TaskToJobExecutionWrapper;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.table.Tuple;
@@ -727,8 +728,8 @@ public class ItThinClientComputeTest extends 
ItAbstractThinClientTest {
     @ParameterizedTest
     @ValueSource(classes = {MapReduceExceptionOnSplitTask.class, 
MapReduceExceptionOnReduceTask.class})
     void testExecuteMapReduceExceptionPropagation(Class<?> taskClass) {
-        IgniteException cause = getExceptionInJobExecutionAsync(
-                client().compute().submitMapReduce(List.of(), 
taskClass.getName())
+        IgniteException cause = getExceptionInJobExecutionAsync(new 
TaskToJobExecutionWrapper<>(
+                client().compute().submitMapReduce(List.of(), 
taskClass.getName()))
         );
 
         assertThat(cause.getMessage(), containsString("Custom job error"));

Reply via email to