This is an automated email from the ASF dual-hosted git repository.

mpetrov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new c6e825249e3 IGNITE-18713 Fixed task options are not applied if placed 
before withExecutor modifier. (#10517)
c6e825249e3 is described below

commit c6e825249e3b382f1494d1f54e656b9cba036162
Author: Mikhail Petrov <[email protected]>
AuthorDate: Tue Feb 7 14:30:42 2023 +0300

    IGNITE-18713 Fixed task options are not applied if placed before 
withExecutor modifier. (#10517)
---
 .../apache/ignite/internal/IgniteComputeImpl.java  | 45 +++++++++++----------
 .../processors/task/TaskExecutionOptions.java      | 19 +++++++++
 .../IgniteComputeCustomExecutorSelfTest.java       | 46 ++++++++++++++++++++++
 3 files changed, 90 insertions(+), 20 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java
index f4a0d125f5a..1e0d4674b03 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java
@@ -69,9 +69,7 @@ public class IgniteComputeImpl extends 
AsyncSupportAdapter<IgniteCompute>
     private String execName;
 
     /** Default task execution options. */
-    private final ThreadLocal<TaskExecutionOptions> opts = 
ThreadLocal.withInitial(() ->
-        TaskExecutionOptions.options(prj.nodes()).withExecutor(execName)
-    );
+    private final ThreadLocal<TaskExecutionOptions> opts = 
ThreadLocal.withInitial(() -> enrich(TaskExecutionOptions.options()));
 
     /**
      * Required by {@link Externalizable}.
@@ -85,19 +83,7 @@ public class IgniteComputeImpl extends 
AsyncSupportAdapter<IgniteCompute>
      * @param prj Projection.
      */
     public IgniteComputeImpl(GridKernalContext ctx, ClusterGroupAdapter prj) {
-        this(ctx, prj, false);
-    }
-
-    /**
-     * @param ctx Kernal context.
-     * @param prj Projection.
-     * @param async Async support flag.
-     */
-    private IgniteComputeImpl(GridKernalContext ctx, ClusterGroupAdapter prj, 
boolean async) {
-        super(async);
-
-        this.ctx = ctx;
-        this.prj = prj;
+        this(ctx, prj, false, null, null);
     }
 
     /**
@@ -107,19 +93,28 @@ public class IgniteComputeImpl extends 
AsyncSupportAdapter<IgniteCompute>
      * @param prj Projection.
      * @param async Async support flag.
      * @param execName Custom executor name.
+     * @param opts Optional initial task execution options.
      */
-    private IgniteComputeImpl(GridKernalContext ctx, ClusterGroupAdapter prj, 
boolean async,
-        String execName) {
+    private IgniteComputeImpl(
+        GridKernalContext ctx,
+        ClusterGroupAdapter prj,
+        boolean async,
+        String execName,
+        @Nullable TaskExecutionOptions opts
+    ) {
         super(async);
 
         this.ctx = ctx;
         this.prj = prj;
         this.execName = execName;
+
+        if (opts != null)
+            this.opts.set(enrich(TaskExecutionOptions.options(opts)));
     }
 
     /** {@inheritDoc} */
     @Override protected IgniteCompute createAsyncInstance() {
-        return new IgniteComputeImpl(ctx, prj, true);
+        return new IgniteComputeImpl(ctx, prj, true, execName, opts.get());
     }
 
     /** {@inheritDoc} */
@@ -1136,6 +1131,16 @@ public class IgniteComputeImpl extends 
AsyncSupportAdapter<IgniteCompute>
 
     /** {@inheritDoc} */
     @Override public IgniteCompute withExecutor(@NotNull String name) {
-        return new IgniteComputeImpl(ctx, prj, isAsync(), name);
+        return new IgniteComputeImpl(ctx, prj, isAsync(), name, opts.get());
+    }
+
+    /** Enriches specified task execution options with those that are bounded 
to the current compute instance. */
+    private TaskExecutionOptions enrich(TaskExecutionOptions opts) {
+        opts.withProjection(prj.nodes());
+
+        if (execName != null)
+            opts.withExecutor(execName);
+        
+        return opts;
     }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/TaskExecutionOptions.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/TaskExecutionOptions.java
index f30630d4423..132f2c3fe66 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/TaskExecutionOptions.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/TaskExecutionOptions.java
@@ -57,6 +57,20 @@ public class TaskExecutionOptions {
     /** */
     private TaskExecutionOptions() {}
 
+    /** */
+    private TaskExecutionOptions(TaskExecutionOptions other) {
+        name = other.name;
+        timeout = other.timeout;
+        execName = other.execName;
+        pool = other.pool;
+        projection = other.projection;
+        projectionPredicate = other.projectionPredicate;
+        isFailoverDisabled = other.isFailoverDisabled;
+        isResultCacheDisabled = other.isResultCacheDisabled;
+        isSysTask = other.isSysTask;
+        isAuthDisabled = other.isAuthDisabled;
+    }
+
     /** */
     public static TaskExecutionOptions options() {
         return new TaskExecutionOptions();
@@ -67,6 +81,11 @@ public class TaskExecutionOptions {
         return new TaskExecutionOptions().withProjection(projection);
     }
 
+    /** */
+    public static TaskExecutionOptions options(TaskExecutionOptions other) {
+        return new TaskExecutionOptions(other);
+    }
+
     /** */
     public long timeout() {
         return timeout;
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/IgniteComputeCustomExecutorSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/IgniteComputeCustomExecutorSelfTest.java
index 73eafe12985..f9a2ec9a430 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/IgniteComputeCustomExecutorSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/IgniteComputeCustomExecutorSelfTest.java
@@ -26,6 +26,7 @@ import org.apache.ignite.IgniteException;
 import org.apache.ignite.compute.ComputeJob;
 import org.apache.ignite.compute.ComputeJobAdapter;
 import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeTaskSession;
 import org.apache.ignite.compute.ComputeTaskSplitAdapter;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.ExecutorConfiguration;
@@ -34,6 +35,7 @@ import org.apache.ignite.lang.IgniteCallable;
 import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteReducer;
 import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.resources.TaskSessionResource;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.jetbrains.annotations.Nullable;
 import org.junit.Test;
@@ -221,6 +223,25 @@ public class IgniteComputeCustomExecutorSelfTest extends 
GridCommonAbstractTest
         comp.execute(TestTask.class, null);
     }
 
+    /** */
+    @Test
+    public void testMultipleTaskModifiersWithExecutorName() throws Exception {
+        String taskName = "test-task-name";
+
+        IgniteCompute compute = grid(0).compute().withName(taskName);
+
+        // Here we check that withExecutor call does not affect existing 
compute instances.
+        compute.withExecutor(EXEC_NAME0);
+        compute.broadcast(new TestRunnable(taskName, "pub"));
+
+        IgniteCompute computeWithExecutor = 
compute.withName(taskName).withExecutor(EXEC_NAME0);
+
+        computeWithExecutor.broadcast(new TestRunnable(taskName, EXEC_NAME0));
+        computeWithExecutor.broadcast(new 
TestRunnable(TestRunnable.class.getName(), EXEC_NAME0));
+        computeWithExecutor.withName(taskName).broadcast(new 
TestRunnable(taskName, EXEC_NAME0));
+        
computeWithExecutor.withExecutor(EXEC_NAME1).withName(taskName).broadcast(new 
TestRunnable(taskName, EXEC_NAME1));
+    }
+
     /**
      * Test task
      */
@@ -247,4 +268,29 @@ public class IgniteComputeCustomExecutorSelfTest extends 
GridCommonAbstractTest
             return null;
         }
     }
+
+    /** */
+    private static class TestRunnable implements IgniteRunnable {
+        /** */
+        private final String expTaskName;
+
+        /** */
+        private final String expExecName;
+
+        /** */
+        public TestRunnable(String expTaskName, String expExecName) {
+            this.expTaskName = expTaskName;
+            this.expExecName = expExecName;
+        }
+
+        /** */
+        @TaskSessionResource
+        ComputeTaskSession ses;
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            assertEquals(expTaskName, ses.getTaskName());
+            assertTrue(Thread.currentThread().getName().contains(expExecName));
+        }
+    }
 }

Reply via email to