This is an automated email from the ASF dual-hosted git repository.
mpetrov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new c6e825249e3 IGNITE-18713 Fixed task options are not applied if placed
before withExecutor modifier. (#10517)
c6e825249e3 is described below
commit c6e825249e3b382f1494d1f54e656b9cba036162
Author: Mikhail Petrov <[email protected]>
AuthorDate: Tue Feb 7 14:30:42 2023 +0300
IGNITE-18713 Fixed task options are not applied if placed before
withExecutor modifier. (#10517)
---
.../apache/ignite/internal/IgniteComputeImpl.java | 45 +++++++++++----------
.../processors/task/TaskExecutionOptions.java | 19 +++++++++
.../IgniteComputeCustomExecutorSelfTest.java | 46 ++++++++++++++++++++++
3 files changed, 90 insertions(+), 20 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java
index f4a0d125f5a..1e0d4674b03 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java
@@ -69,9 +69,7 @@ public class IgniteComputeImpl extends
AsyncSupportAdapter<IgniteCompute>
private String execName;
/** Default task execution options. */
- private final ThreadLocal<TaskExecutionOptions> opts =
ThreadLocal.withInitial(() ->
- TaskExecutionOptions.options(prj.nodes()).withExecutor(execName)
- );
+ private final ThreadLocal<TaskExecutionOptions> opts =
ThreadLocal.withInitial(() -> enrich(TaskExecutionOptions.options()));
/**
* Required by {@link Externalizable}.
@@ -85,19 +83,7 @@ public class IgniteComputeImpl extends
AsyncSupportAdapter<IgniteCompute>
* @param prj Projection.
*/
public IgniteComputeImpl(GridKernalContext ctx, ClusterGroupAdapter prj) {
- this(ctx, prj, false);
- }
-
- /**
- * @param ctx Kernal context.
- * @param prj Projection.
- * @param async Async support flag.
- */
- private IgniteComputeImpl(GridKernalContext ctx, ClusterGroupAdapter prj,
boolean async) {
- super(async);
-
- this.ctx = ctx;
- this.prj = prj;
+ this(ctx, prj, false, null, null);
}
/**
@@ -107,19 +93,28 @@ public class IgniteComputeImpl extends
AsyncSupportAdapter<IgniteCompute>
* @param prj Projection.
* @param async Async support flag.
* @param execName Custom executor name.
+ * @param opts Optional initial task execution options.
*/
- private IgniteComputeImpl(GridKernalContext ctx, ClusterGroupAdapter prj,
boolean async,
- String execName) {
+ private IgniteComputeImpl(
+ GridKernalContext ctx,
+ ClusterGroupAdapter prj,
+ boolean async,
+ String execName,
+ @Nullable TaskExecutionOptions opts
+ ) {
super(async);
this.ctx = ctx;
this.prj = prj;
this.execName = execName;
+
+ if (opts != null)
+ this.opts.set(enrich(TaskExecutionOptions.options(opts)));
}
/** {@inheritDoc} */
@Override protected IgniteCompute createAsyncInstance() {
- return new IgniteComputeImpl(ctx, prj, true);
+ return new IgniteComputeImpl(ctx, prj, true, execName, opts.get());
}
/** {@inheritDoc} */
@@ -1136,6 +1131,16 @@ public class IgniteComputeImpl extends
AsyncSupportAdapter<IgniteCompute>
/** {@inheritDoc} */
@Override public IgniteCompute withExecutor(@NotNull String name) {
- return new IgniteComputeImpl(ctx, prj, isAsync(), name);
+ return new IgniteComputeImpl(ctx, prj, isAsync(), name, opts.get());
+ }
+
+ /** Enriches specified task execution options with those that are bounded
to the current compute instance. */
+ private TaskExecutionOptions enrich(TaskExecutionOptions opts) {
+ opts.withProjection(prj.nodes());
+
+ if (execName != null)
+ opts.withExecutor(execName);
+
+ return opts;
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/TaskExecutionOptions.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/TaskExecutionOptions.java
index f30630d4423..132f2c3fe66 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/TaskExecutionOptions.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/TaskExecutionOptions.java
@@ -57,6 +57,20 @@ public class TaskExecutionOptions {
/** */
private TaskExecutionOptions() {}
+ /** */
+ private TaskExecutionOptions(TaskExecutionOptions other) {
+ name = other.name;
+ timeout = other.timeout;
+ execName = other.execName;
+ pool = other.pool;
+ projection = other.projection;
+ projectionPredicate = other.projectionPredicate;
+ isFailoverDisabled = other.isFailoverDisabled;
+ isResultCacheDisabled = other.isResultCacheDisabled;
+ isSysTask = other.isSysTask;
+ isAuthDisabled = other.isAuthDisabled;
+ }
+
/** */
public static TaskExecutionOptions options() {
return new TaskExecutionOptions();
@@ -67,6 +81,11 @@ public class TaskExecutionOptions {
return new TaskExecutionOptions().withProjection(projection);
}
+ /** */
+ public static TaskExecutionOptions options(TaskExecutionOptions other) {
+ return new TaskExecutionOptions(other);
+ }
+
/** */
public long timeout() {
return timeout;
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/IgniteComputeCustomExecutorSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/IgniteComputeCustomExecutorSelfTest.java
index 73eafe12985..f9a2ec9a430 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/IgniteComputeCustomExecutorSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/IgniteComputeCustomExecutorSelfTest.java
@@ -26,6 +26,7 @@ import org.apache.ignite.IgniteException;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.ComputeJobAdapter;
import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeTaskSession;
import org.apache.ignite.compute.ComputeTaskSplitAdapter;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.ExecutorConfiguration;
@@ -34,6 +35,7 @@ import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteReducer;
import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.resources.TaskSessionResource;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
@@ -221,6 +223,25 @@ public class IgniteComputeCustomExecutorSelfTest extends
GridCommonAbstractTest
comp.execute(TestTask.class, null);
}
+ /** */
+ @Test
+ public void testMultipleTaskModifiersWithExecutorName() throws Exception {
+ String taskName = "test-task-name";
+
+ IgniteCompute compute = grid(0).compute().withName(taskName);
+
+ // Here we check that withExecutor call does not affect existing
compute instances.
+ compute.withExecutor(EXEC_NAME0);
+ compute.broadcast(new TestRunnable(taskName, "pub"));
+
+ IgniteCompute computeWithExecutor =
compute.withName(taskName).withExecutor(EXEC_NAME0);
+
+ computeWithExecutor.broadcast(new TestRunnable(taskName, EXEC_NAME0));
+ computeWithExecutor.broadcast(new
TestRunnable(TestRunnable.class.getName(), EXEC_NAME0));
+ computeWithExecutor.withName(taskName).broadcast(new
TestRunnable(taskName, EXEC_NAME0));
+
computeWithExecutor.withExecutor(EXEC_NAME1).withName(taskName).broadcast(new
TestRunnable(taskName, EXEC_NAME1));
+ }
+
/**
* Test task
*/
@@ -247,4 +268,29 @@ public class IgniteComputeCustomExecutorSelfTest extends
GridCommonAbstractTest
return null;
}
}
+
+ /** */
+ private static class TestRunnable implements IgniteRunnable {
+ /** */
+ private final String expTaskName;
+
+ /** */
+ private final String expExecName;
+
+ /** */
+ public TestRunnable(String expTaskName, String expExecName) {
+ this.expTaskName = expTaskName;
+ this.expExecName = expExecName;
+ }
+
+ /** */
+ @TaskSessionResource
+ ComputeTaskSession ses;
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ assertEquals(expTaskName, ses.getTaskName());
+ assertTrue(Thread.currentThread().getName().contains(expExecName));
+ }
+ }
}