This is an automated email from the ASF dual-hosted git repository.
mpetrov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 2abc2470c2c IGNITE-18545 Fixed task execution options propagation.
(#10482)
2abc2470c2c is described below
commit 2abc2470c2c0d5226aad5694fb664cd4d45eecfd
Author: Mikhail Petrov <[email protected]>
AuthorDate: Sun Jan 22 22:00:11 2023 +0300
IGNITE-18545 Fixed task execution options propagation. (#10482)
---
.../main/java/org/apache/ignite/IgniteCompute.java | 4 +-
.../apache/ignite/internal/IgniteComputeImpl.java | 254 +++++++-------
.../internal/executor/GridExecutorService.java | 32 +-
.../processors/affinity/GridAffinityProcessor.java | 10 +-
.../processors/cache/GridCacheAdapter.java | 61 ++--
.../datastructures/CacheDataStructuresManager.java | 21 +-
.../distributed/GridDistributedCacheAdapter.java | 16 +-
.../snapshot/IgniteSnapshotManager.java | 64 ++--
.../cache/query/GridCacheQueryManager.java | 18 +-
.../processors/closure/GridClosureProcessor.java | 367 +++++----------------
.../platform/client/compute/ClientComputeTask.java | 21 +-
.../platform/compute/PlatformCompute.java | 4 -
.../handlers/cache/GridCacheCommandHandler.java | 18 +-
.../rest/handlers/task/GridTaskCommandHandler.java | 13 +-
.../processors/service/GridServiceProxy.java | 22 +-
.../processors/task/GridTaskProcessor.java | 166 ++--------
.../processors/task/GridTaskThreadContextKey.java | 47 ---
.../internal/processors/task/GridTaskWorker.java | 41 +--
.../processors/task/TaskExecutionOptions.java | 189 +++++++++++
.../main/resources/META-INF/classnames.properties | 2 +-
.../internal/GridContinuousTaskSelfTest.java | 22 +-
.../IgniteComputeTopologyExceptionTest.java | 9 +-
.../compute/TaskOptionsPropagationTest.java | 299 +++++++++++++++++
.../testsuites/IgniteComputeGridTestSuite.java | 4 +-
24 files changed, 939 insertions(+), 765 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java
b/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java
index 6e9cbea852d..e6110c9d189 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java
@@ -724,9 +724,7 @@ public interface IgniteCompute extends IgniteAsyncSupport {
/**
* Sets task timeout for the next executed task in the <b>current
thread</b>.
- * When task starts execution, the timeout is reset, so one timeout is
used only once. You may use
- * this method to set task name when executing jobs directly, without
explicitly
- * defining {@link ComputeTask}.
+ * When task starts execution, the timeout is reset, so one timeout is
used only once.
* <p>
* Here is an example.
* <pre class="brush:java">
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java
index ca9f5344d01..f4a0d125f5a 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java
@@ -25,7 +25,6 @@ import java.io.ObjectStreamException;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
-import java.util.concurrent.Callable;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteCompute;
import org.apache.ignite.IgniteDeploymentException;
@@ -35,6 +34,7 @@ import org.apache.ignite.compute.ComputeTask;
import org.apache.ignite.compute.ComputeTaskFuture;
import org.apache.ignite.internal.cluster.ClusterGroupAdapter;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
+import org.apache.ignite.internal.processors.task.TaskExecutionOptions;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -50,11 +50,6 @@ import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.GridClosureCallMode.BALANCE;
import static org.apache.ignite.internal.GridClosureCallMode.BROADCAST;
-import static
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_NO_FAILOVER;
-import static
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_NO_RESULT_CACHE;
-import static
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBGRID_PREDICATE;
-import static
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_TASK_NAME;
-import static
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_TIMEOUT;
/**
* {@link IgniteCompute} implementation.
@@ -73,6 +68,11 @@ public class IgniteComputeImpl extends
AsyncSupportAdapter<IgniteCompute>
/** Custom executor name. */
private String execName;
+ /** Default task execution options. */
+ private final ThreadLocal<TaskExecutionOptions> opts =
ThreadLocal.withInitial(() ->
+ TaskExecutionOptions.options(prj.nodes()).withExecutor(execName)
+ );
+
/**
* Required by {@link Externalizable}.
*/
@@ -105,7 +105,6 @@ public class IgniteComputeImpl extends
AsyncSupportAdapter<IgniteCompute>
*
* @param ctx Kernal context.
* @param prj Projection.
- * @param subjId Subject ID.
* @param async Async support flag.
* @param execName Custom executor name.
*/
@@ -130,8 +129,6 @@ public class IgniteComputeImpl extends
AsyncSupportAdapter<IgniteCompute>
/** {@inheritDoc} */
@Override public void affinityRun(String cacheName, Object affKey,
IgniteRunnable job) {
- CU.validateCacheName(cacheName);
-
try {
saveOrGet(affinityRunAsync0(cacheName, affKey, job));
}
@@ -143,7 +140,6 @@ public class IgniteComputeImpl extends
AsyncSupportAdapter<IgniteCompute>
/** {@inheritDoc} */
@Override public IgniteFuture<Void> affinityRunAsync(String cacheName,
Object affKey,
IgniteRunnable job) throws IgniteException {
- CU.validateCacheName(cacheName);
return (IgniteFuture<Void>)createFuture(affinityRunAsync0(cacheName,
affKey, job));
}
@@ -157,12 +153,13 @@ public class IgniteComputeImpl extends
AsyncSupportAdapter<IgniteCompute>
* @return Internal future.
*/
private IgniteInternalFuture<?> affinityRunAsync0(String cacheName, Object
affKey, IgniteRunnable job) {
- A.notNull(affKey, "affKey");
- A.notNull(job, "job");
-
guard();
try {
+ A.notNull(affKey, "affKey");
+ A.notNull(job, "job");
+ CU.validateCacheName(cacheName);
+
// In case cache key is passed instead of affinity key.
final Object affKey0 = ctx.affinity().affinityKey(cacheName,
affKey);
int partId = ctx.affinity().partition(cacheName, affKey0);
@@ -171,20 +168,20 @@ public class IgniteComputeImpl extends
AsyncSupportAdapter<IgniteCompute>
throw new IgniteCheckedException("Failed map key to partition:
[cache=" + cacheName + " key="
+ affKey + ']');
- return
ctx.closure().affinityRun(Collections.singletonList(cacheName), partId, job,
prj.nodes(), execName);
+ return
ctx.closure().affinityRun(Collections.singletonList(cacheName), partId, job,
opts.get());
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
finally {
+ opts.remove();
+
unguard();
}
}
/** {@inheritDoc} */
@Override public void affinityRun(@NotNull Collection<String> cacheNames,
Object affKey, IgniteRunnable job) {
- CU.validateCacheNames(cacheNames);
-
try {
saveOrGet(affinityRunAsync0(cacheNames, affKey, job));
}
@@ -196,8 +193,6 @@ public class IgniteComputeImpl extends
AsyncSupportAdapter<IgniteCompute>
/** {@inheritDoc} */
@Override public IgniteFuture<Void> affinityRunAsync(@NotNull
Collection<String> cacheNames, Object affKey,
IgniteRunnable job) throws IgniteException {
- CU.validateCacheNames(cacheNames);
-
return (IgniteFuture<Void>)createFuture(affinityRunAsync0(cacheNames,
affKey, job));
}
@@ -211,13 +206,14 @@ public class IgniteComputeImpl extends
AsyncSupportAdapter<IgniteCompute>
*/
private IgniteInternalFuture<?> affinityRunAsync0(@NotNull
Collection<String> cacheNames, Object affKey,
IgniteRunnable job) {
- A.notNull(affKey, "affKey");
- A.notNull(job, "job");
- A.ensure(!cacheNames.isEmpty(), "cachesNames mustn't be empty");
-
guard();
try {
+ A.notNull(affKey, "affKey");
+ A.notNull(job, "job");
+ A.ensure(!cacheNames.isEmpty(), "cachesNames mustn't be empty");
+ CU.validateCacheNames(cacheNames);
+
final String cacheName = F.first(cacheNames);
// In case cache key is passed instead of affinity key.
@@ -228,20 +224,20 @@ public class IgniteComputeImpl extends
AsyncSupportAdapter<IgniteCompute>
throw new IgniteCheckedException("Failed map key to partition:
[cache=" + cacheName + " key="
+ affKey + ']');
- return ctx.closure().affinityRun(cacheNames, partId, job,
prj.nodes(), execName);
+ return ctx.closure().affinityRun(cacheNames, partId, job,
opts.get());
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
finally {
+ opts.remove();
+
unguard();
}
}
/** {@inheritDoc} */
@Override public void affinityRun(@NotNull Collection<String> cacheNames,
int partId, IgniteRunnable job) {
- CU.validateCacheNames(cacheNames);
-
try {
saveOrGet(affinityRunAsync0(cacheNames, partId, job));
}
@@ -253,8 +249,6 @@ public class IgniteComputeImpl extends
AsyncSupportAdapter<IgniteCompute>
/** {@inheritDoc} */
@Override public IgniteFuture<Void> affinityRunAsync(@NotNull
Collection<String> cacheNames, int partId,
IgniteRunnable job) throws IgniteException {
- CU.validateCacheNames(cacheNames);
-
return (IgniteFuture<Void>)createFuture(affinityRunAsync0(cacheNames,
partId, job));
}
@@ -268,27 +262,28 @@ public class IgniteComputeImpl extends
AsyncSupportAdapter<IgniteCompute>
*/
private IgniteInternalFuture<?> affinityRunAsync0(@NotNull
Collection<String> cacheNames, int partId,
IgniteRunnable job) {
- A.ensure(partId >= 0, "partId = " + partId);
- A.notNull(job, "job");
- A.ensure(!cacheNames.isEmpty(), "cachesNames mustn't be empty");
-
guard();
try {
- return ctx.closure().affinityRun(cacheNames, partId, job,
prj.nodes(), execName);
+ A.ensure(partId >= 0, "partId = " + partId);
+ A.notNull(job, "job");
+ A.ensure(!cacheNames.isEmpty(), "cachesNames mustn't be empty");
+ CU.validateCacheNames(cacheNames);
+
+ return ctx.closure().affinityRun(cacheNames, partId, job,
opts.get());
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
finally {
+ opts.remove();
+
unguard();
}
}
/** {@inheritDoc} */
@Override public <R> R affinityCall(String cacheName, Object affKey,
IgniteCallable<R> job) {
- CU.validateCacheName(cacheName);
-
try {
return saveOrGet(affinityCallAsync0(cacheName, affKey, job));
}
@@ -300,8 +295,6 @@ public class IgniteComputeImpl extends
AsyncSupportAdapter<IgniteCompute>
/** {@inheritDoc} */
@Override public <R> IgniteFuture<R> affinityCallAsync(String cacheName,
Object affKey,
IgniteCallable<R> job) throws IgniteException {
- CU.validateCacheName(cacheName);
-
return createFuture(affinityCallAsync0(cacheName, affKey, job));
}
@@ -315,12 +308,13 @@ public class IgniteComputeImpl extends
AsyncSupportAdapter<IgniteCompute>
*/
private <R> IgniteInternalFuture<R> affinityCallAsync0(String cacheName,
Object affKey,
IgniteCallable<R> job) {
- A.notNull(affKey, "affKey");
- A.notNull(job, "job");
-
guard();
try {
+ A.notNull(affKey, "affKey");
+ A.notNull(job, "job");
+ CU.validateCacheName(cacheName);
+
// In case cache key is passed instead of affinity key.
final Object affKey0 = ctx.affinity().affinityKey(cacheName,
affKey);
int partId = ctx.affinity().partition(cacheName, affKey0);
@@ -329,20 +323,20 @@ public class IgniteComputeImpl extends
AsyncSupportAdapter<IgniteCompute>
throw new IgniteCheckedException("Failed map key to partition:
[cache=" + cacheName + " key="
+ affKey + ']');
- return
ctx.closure().affinityCall(Collections.singletonList(cacheName), partId, job,
prj.nodes(), execName);
+ return
ctx.closure().affinityCall(Collections.singletonList(cacheName), partId, job,
opts.get());
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
finally {
+ opts.remove();
+
unguard();
}
}
/** {@inheritDoc} */
@Override public <R> R affinityCall(@NotNull Collection<String>
cacheNames, Object affKey, IgniteCallable<R> job) {
- CU.validateCacheNames(cacheNames);
-
try {
return saveOrGet(affinityCallAsync0(cacheNames, affKey, job));
}
@@ -354,8 +348,6 @@ public class IgniteComputeImpl extends
AsyncSupportAdapter<IgniteCompute>
/** {@inheritDoc} */
@Override public <R> IgniteFuture<R> affinityCallAsync(@NotNull
Collection<String> cacheNames, Object affKey,
IgniteCallable<R> job) throws IgniteException {
- CU.validateCacheNames(cacheNames);
-
return createFuture(affinityCallAsync0(cacheNames, affKey, job));
}
@@ -369,13 +361,14 @@ public class IgniteComputeImpl extends
AsyncSupportAdapter<IgniteCompute>
*/
private <R> IgniteInternalFuture<R> affinityCallAsync0(@NotNull
Collection<String> cacheNames, Object affKey,
IgniteCallable<R> job) {
- A.notNull(affKey, "affKey");
- A.notNull(job, "job");
- A.ensure(!cacheNames.isEmpty(), "cachesNames mustn't be empty");
-
guard();
try {
+ A.notNull(affKey, "affKey");
+ A.notNull(job, "job");
+ A.ensure(!cacheNames.isEmpty(), "cachesNames mustn't be empty");
+ CU.validateCacheNames(cacheNames);
+
final String cacheName = F.first(cacheNames);
// In case cache key is passed instead of affinity key.
@@ -386,20 +379,20 @@ public class IgniteComputeImpl extends
AsyncSupportAdapter<IgniteCompute>
throw new IgniteCheckedException("Failed map key to partition:
[cache=" + cacheName + " key="
+ affKey + ']');
- return ctx.closure().affinityCall(cacheNames, partId, job,
prj.nodes(), execName);
+ return ctx.closure().affinityCall(cacheNames, partId, job,
opts.get());
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
finally {
+ opts.remove();
+
unguard();
}
}
/** {@inheritDoc} */
@Override public <R> R affinityCall(@NotNull Collection<String>
cacheNames, int partId, IgniteCallable<R> job) {
- CU.validateCacheNames(cacheNames);
-
try {
return saveOrGet(affinityCallAsync0(cacheNames, partId, job));
}
@@ -411,8 +404,6 @@ public class IgniteComputeImpl extends
AsyncSupportAdapter<IgniteCompute>
/** {@inheritDoc} */
@Override public <R> IgniteFuture<R> affinityCallAsync(@NotNull
Collection<String> cacheNames, int partId,
IgniteCallable<R> job) throws IgniteException {
- CU.validateCacheNames(cacheNames);
-
return createFuture(affinityCallAsync0(cacheNames, partId, job));
}
@@ -426,19 +417,22 @@ public class IgniteComputeImpl extends
AsyncSupportAdapter<IgniteCompute>
*/
private <R> IgniteInternalFuture<R> affinityCallAsync0(@NotNull
Collection<String> cacheNames, int partId,
IgniteCallable<R> job) {
- A.ensure(partId >= 0, "partId = " + partId);
- A.notNull(job, "job");
- A.ensure(!cacheNames.isEmpty(), "cachesNames mustn't be empty");
-
guard();
try {
- return ctx.closure().affinityCall(cacheNames, partId, job,
prj.nodes(), execName);
+ A.ensure(partId >= 0, "partId = " + partId);
+ A.notNull(job, "job");
+ A.ensure(!cacheNames.isEmpty(), "cachesNames mustn't be empty");
+ CU.validateCacheNames(cacheNames);
+
+ return ctx.closure().affinityCall(cacheNames, partId, job,
opts.get());
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
finally {
+ opts.remove();
+
unguard();
}
}
@@ -446,7 +440,7 @@ public class IgniteComputeImpl extends
AsyncSupportAdapter<IgniteCompute>
/** {@inheritDoc} */
@Override public <T, R> R execute(String taskName, @Nullable T arg) {
try {
- return (R)saveOrGet(executeAsync0(taskName, arg));
+ return saveOrGet(executeAsync0(taskName, arg));
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -466,16 +460,16 @@ public class IgniteComputeImpl extends
AsyncSupportAdapter<IgniteCompute>
* @return Internal future.
*/
private <T, R> IgniteInternalFuture<R> executeAsync0(String taskName,
@Nullable T arg) {
- A.notNull(taskName, "taskName");
-
guard();
try {
- ctx.task().setThreadContextIfNotNull(TC_SUBGRID_PREDICATE,
prj.predicate());
+ A.notNull(taskName, "taskName");
- return ctx.task().execute(taskName, arg, execName);
+ return ctx.task().execute(taskName, arg, opts.get());
}
finally {
+ opts.remove();
+
unguard();
}
}
@@ -483,7 +477,7 @@ public class IgniteComputeImpl extends
AsyncSupportAdapter<IgniteCompute>
/** {@inheritDoc} */
@Override public <T, R> R execute(Class<? extends ComputeTask<T, R>>
taskCls, @Nullable T arg) {
try {
- return (R)saveOrGet(executeAsync0(taskCls, arg));
+ return saveOrGet(executeAsync0(taskCls, arg));
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -504,16 +498,16 @@ public class IgniteComputeImpl extends
AsyncSupportAdapter<IgniteCompute>
* @return Internal future.
*/
private <T, R> IgniteInternalFuture<R> executeAsync0(Class<? extends
ComputeTask<T, R>> taskCls, @Nullable T arg) {
- A.notNull(taskCls, "taskCls");
-
guard();
try {
- ctx.task().setThreadContextIfNotNull(TC_SUBGRID_PREDICATE,
prj.predicate());
+ A.notNull(taskCls, "taskCls");
- return ctx.task().execute(taskCls, arg, execName);
+ return ctx.task().execute(taskCls, arg,
opts.get().withProjectionPredicate(prj.predicate()));
}
finally {
+ opts.remove();
+
unguard();
}
}
@@ -521,7 +515,7 @@ public class IgniteComputeImpl extends
AsyncSupportAdapter<IgniteCompute>
/** {@inheritDoc} */
@Override public <T, R> R execute(ComputeTask<T, R> task, @Nullable T arg)
{
try {
- return (R)saveOrGet(executeAsync0(task, arg));
+ return saveOrGet(executeAsync0(task, arg));
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -542,16 +536,16 @@ public class IgniteComputeImpl extends
AsyncSupportAdapter<IgniteCompute>
* @return Task future.
*/
public <T, R> ComputeTaskInternalFuture<R> executeAsync0(ComputeTask<T, R>
task, @Nullable T arg) {
- A.notNull(task, "task");
-
guard();
try {
- ctx.task().setThreadContextIfNotNull(TC_SUBGRID_PREDICATE,
prj.predicate());
+ A.notNull(task, "task");
- return ctx.task().execute(task, arg, execName);
+ return ctx.task().execute(task, arg,
opts.get().withProjectionPredicate(prj.predicate()));
}
finally {
+ opts.remove();
+
unguard();
}
}
@@ -578,14 +572,16 @@ public class IgniteComputeImpl extends
AsyncSupportAdapter<IgniteCompute>
* @return Internal future.
*/
private IgniteInternalFuture<?> broadcastAsync0(IgniteRunnable job) {
- A.notNull(job, "job");
-
guard();
try {
- return ctx.closure().runAsync(BROADCAST, job, prj.nodes(),
execName);
+ A.notNull(job, "job");
+
+ return ctx.closure().runAsync(BROADCAST, job, opts.get());
}
finally {
+ opts.remove();
+
unguard();
}
}
@@ -612,14 +608,16 @@ public class IgniteComputeImpl extends
AsyncSupportAdapter<IgniteCompute>
* @return Internal future.
*/
private <R> IgniteInternalFuture<Collection<R>>
broadcastAsync0(IgniteCallable<R> job) {
- A.notNull(job, "job");
-
guard();
try {
- return ctx.closure().callAsync(BROADCAST,
Collections.singletonList(job), prj.nodes(), execName);
+ A.notNull(job, "job");
+
+ return ctx.closure().callAsync(BROADCAST,
Collections.singletonList(job), opts.get());
}
finally {
+ opts.remove();
+
unguard();
}
}
@@ -648,14 +646,16 @@ public class IgniteComputeImpl extends
AsyncSupportAdapter<IgniteCompute>
* @return Internal future.
*/
private <R, T> IgniteInternalFuture<Collection<R>>
broadcastAsync0(IgniteClosure<T, R> job, @Nullable T arg) {
- A.notNull(job, "job");
-
guard();
try {
- return ctx.closure().broadcast(job, arg, prj.nodes(), execName);
+ A.notNull(job, "job");
+
+ return ctx.closure().broadcast(job, arg, opts.get());
}
finally {
+ opts.remove();
+
unguard();
}
}
@@ -682,14 +682,16 @@ public class IgniteComputeImpl extends
AsyncSupportAdapter<IgniteCompute>
* @return Internal future.
*/
private IgniteInternalFuture<?> runAsync0(IgniteRunnable job) {
- A.notNull(job, "job");
-
guard();
try {
- return ctx.closure().runAsync(BALANCE, job, prj.nodes(), execName);
+ A.notNull(job, "job");
+
+ return ctx.closure().runAsync(BALANCE, job, opts.get());
}
finally {
+ opts.remove();
+
unguard();
}
}
@@ -717,14 +719,16 @@ public class IgniteComputeImpl extends
AsyncSupportAdapter<IgniteCompute>
* @return Internal future.
*/
private IgniteInternalFuture<?> runAsync0(Collection<? extends
IgniteRunnable> jobs) {
- A.notEmpty(jobs, "jobs");
-
guard();
try {
- return ctx.closure().runAsync(BALANCE, jobs, prj.nodes(),
execName);
+ A.notEmpty(jobs, "jobs");
+
+ return ctx.closure().runAsync(BALANCE, jobs, opts.get());
}
finally {
+ opts.remove();
+
unguard();
}
}
@@ -742,7 +746,7 @@ public class IgniteComputeImpl extends
AsyncSupportAdapter<IgniteCompute>
/** {@inheritDoc} */
@Override public <R, T> IgniteFuture<R> applyAsync(IgniteClosure<T, R>
job, @Nullable T arg)
throws IgniteException {
- return (IgniteFuture<R>)createFuture(applyAsync0(job, arg));
+ return createFuture(applyAsync0(job, arg));
}
/**
@@ -753,14 +757,16 @@ public class IgniteComputeImpl extends
AsyncSupportAdapter<IgniteCompute>
* @return Internal future.
*/
private <R, T> IgniteInternalFuture<R> applyAsync0(IgniteClosure<T, R>
job, @Nullable T arg) {
- A.notNull(job, "job");
-
guard();
try {
- return ctx.closure().callAsync(job, arg, prj.nodes(), execName);
+ A.notNull(job, "job");
+
+ return ctx.closure().callAsync(job, arg, opts.get());
}
finally {
+ opts.remove();
+
unguard();
}
}
@@ -777,7 +783,7 @@ public class IgniteComputeImpl extends
AsyncSupportAdapter<IgniteCompute>
/** {@inheritDoc} */
@Override public <R> IgniteFuture<R> callAsync(IgniteCallable<R> job)
throws IgniteException {
- return (IgniteFuture<R>)createFuture(callAsync0(job));
+ return createFuture(callAsync0(job));
}
/**
@@ -787,14 +793,16 @@ public class IgniteComputeImpl extends
AsyncSupportAdapter<IgniteCompute>
* @return Internal future.
*/
private <R> IgniteInternalFuture<R> callAsync0(IgniteCallable<R> job) {
- A.notNull(job, "job");
-
guard();
try {
- return ctx.closure().callAsync(BALANCE, job, prj.nodes(),
execName);
+ A.notNull(job, "job");
+
+ return ctx.closure().callAsync(BALANCE, job, opts.get());
}
finally {
+ opts.remove();
+
unguard();
}
}
@@ -812,7 +820,7 @@ public class IgniteComputeImpl extends
AsyncSupportAdapter<IgniteCompute>
/** {@inheritDoc} */
@Override public <R> IgniteFuture<Collection<R>> callAsync(
Collection<? extends IgniteCallable<R>> jobs) throws IgniteException {
- return (IgniteFuture<Collection<R>>)createFuture(callAsync0(jobs));
+ return createFuture(callAsync0(jobs));
}
/**
@@ -822,14 +830,16 @@ public class IgniteComputeImpl extends
AsyncSupportAdapter<IgniteCompute>
* @return Internal future.
*/
private <R> IgniteInternalFuture<Collection<R>> callAsync0(Collection<?
extends IgniteCallable<R>> jobs) {
- A.notEmpty(jobs, "jobs");
-
guard();
try {
- return ctx.closure().callAsync(BALANCE, (Collection<? extends
Callable<R>>)jobs, prj.nodes(), execName);
+ A.notEmpty(jobs, "jobs");
+
+ return ctx.closure().callAsync(BALANCE, jobs, opts.get());
}
finally {
+ opts.remove();
+
unguard();
}
}
@@ -847,7 +857,7 @@ public class IgniteComputeImpl extends
AsyncSupportAdapter<IgniteCompute>
/** {@inheritDoc} */
@Override public <T, R> IgniteFuture<Collection<R>>
applyAsync(IgniteClosure<T, R> job,
Collection<? extends T> args) throws IgniteException {
- return (IgniteFuture<Collection<R>>)createFuture(applyAsync0(job,
args));
+ return createFuture(applyAsync0(job, args));
}
/**
@@ -859,15 +869,17 @@ public class IgniteComputeImpl extends
AsyncSupportAdapter<IgniteCompute>
*/
private <T, R> IgniteInternalFuture<Collection<R>> applyAsync0(final
IgniteClosure<T, R> job,
@Nullable Collection<? extends T> args) {
- A.notNull(job, "job");
- A.notNull(args, "args");
-
guard();
try {
- return ctx.closure().callAsync(job, args, prj.nodes(), execName);
+ A.notNull(job, "job");
+ A.notNull(args, "args");
+
+ return ctx.closure().callAsync(job, args, opts.get());
}
finally {
+ opts.remove();
+
unguard();
}
}
@@ -885,7 +897,7 @@ public class IgniteComputeImpl extends
AsyncSupportAdapter<IgniteCompute>
/** {@inheritDoc} */
@Override public <R1, R2> IgniteFuture<R2> callAsync(Collection<? extends
IgniteCallable<R1>> jobs,
IgniteReducer<R1, R2> rdc) throws IgniteException {
- return (IgniteFuture<R2>)createFuture(callAsync0(jobs, rdc));
+ return createFuture(callAsync0(jobs, rdc));
}
/**
@@ -897,15 +909,17 @@ public class IgniteComputeImpl extends
AsyncSupportAdapter<IgniteCompute>
*/
private <R1, R2> IgniteInternalFuture<R2> callAsync0(Collection<? extends
IgniteCallable<R1>> jobs,
IgniteReducer<R1, R2> rdc) {
- A.notEmpty(jobs, "jobs");
- A.notNull(rdc, "rdc");
-
guard();
try {
- return ctx.closure().forkjoinAsync(BALANCE, jobs, rdc,
prj.nodes(), execName);
+ A.notEmpty(jobs, "jobs");
+ A.notNull(rdc, "rdc");
+
+ return ctx.closure().forkjoinAsync(BALANCE, jobs, rdc, opts.get());
}
finally {
+ opts.remove();
+
unguard();
}
}
@@ -937,16 +951,18 @@ public class IgniteComputeImpl extends
AsyncSupportAdapter<IgniteCompute>
*/
private <R1, R2, T> IgniteInternalFuture<R2> applyAsync0(IgniteClosure<T,
R1> job, Collection<? extends T> args,
IgniteReducer<R1, R2> rdc) {
- A.notNull(job, "job");
- A.notNull(rdc, "rdc");
- A.notNull(args, "args");
-
guard();
try {
- return ctx.closure().callAsync(job, args, rdc, prj.nodes(),
execName);
+ A.notNull(job, "job");
+ A.notNull(rdc, "rdc");
+ A.notNull(args, "args");
+
+ return ctx.closure().callAsync(job, args, rdc, opts.get());
}
finally {
+ opts.remove();
+
unguard();
}
}
@@ -970,7 +986,7 @@ public class IgniteComputeImpl extends
AsyncSupportAdapter<IgniteCompute>
guard();
try {
- ctx.task().setThreadContext(TC_TASK_NAME, taskName);
+ opts.get().withName(taskName);
}
finally {
unguard();
@@ -986,7 +1002,7 @@ public class IgniteComputeImpl extends
AsyncSupportAdapter<IgniteCompute>
guard();
try {
- ctx.task().setThreadContext(TC_TIMEOUT, timeout);
+ opts.get().withTimeout(timeout);
}
finally {
unguard();
@@ -1000,7 +1016,7 @@ public class IgniteComputeImpl extends
AsyncSupportAdapter<IgniteCompute>
guard();
try {
- ctx.task().setThreadContext(TC_NO_FAILOVER, true);
+ opts.get().withFailoverDisabled();
}
finally {
unguard();
@@ -1014,7 +1030,7 @@ public class IgniteComputeImpl extends
AsyncSupportAdapter<IgniteCompute>
guard();
try {
- ctx.task().setThreadContext(TC_NO_RESULT_CACHE, true);
+ opts.get().withResultCacheDisabled();
}
finally {
unguard();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/executor/GridExecutorService.java
b/modules/core/src/main/java/org/apache/ignite/internal/executor/GridExecutorService.java
index cf1c78b17b7..1f9f87e3f48 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/executor/GridExecutorService.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/executor/GridExecutorService.java
@@ -43,6 +43,7 @@ import
org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterGroupAdapter;
import org.apache.ignite.internal.compute.ComputeTaskTimeoutCheckedException;
+import org.apache.ignite.internal.processors.task.TaskExecutionOptions;
import org.apache.ignite.internal.util.typedef.CX1;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -259,7 +260,7 @@ public class GridExecutorService implements
ExecutorService, Externalizable {
ctx.gateway().readLock();
try {
- return addFuture(ctx.closure().callAsync(BALANCE, task,
prj.nodes()));
+ return addFuture(ctx.closure().callAsync(BALANCE, task,
options()));
}
finally {
ctx.gateway().readUnlock();
@@ -275,14 +276,16 @@ public class GridExecutorService implements
ExecutorService, Externalizable {
ctx.gateway().readLock();
try {
- IgniteInternalFuture<T> fut = ctx.closure().runAsync(BALANCE,
task, prj.nodes()).chain(
- new CX1<IgniteInternalFuture<?>, T>() {
- @Override public T applyx(IgniteInternalFuture<?> fut)
throws IgniteCheckedException {
- fut.get();
+ IgniteInternalFuture<T> fut = ctx.closure()
+ .runAsync(BALANCE, task, options())
+ .chain(
+ new CX1<IgniteInternalFuture<?>, T>() {
+ @Override public T applyx(IgniteInternalFuture<?> fut)
throws IgniteCheckedException {
+ fut.get();
- return res;
- }
- });
+ return res;
+ }
+ });
return addFuture(fut);
}
@@ -300,7 +303,7 @@ public class GridExecutorService implements
ExecutorService, Externalizable {
ctx.gateway().readLock();
try {
- return addFuture(ctx.closure().runAsync(BALANCE, task,
prj.nodes()));
+ return addFuture(ctx.closure().runAsync(BALANCE, task, options()));
}
finally {
ctx.gateway().readUnlock();
@@ -364,7 +367,7 @@ public class GridExecutorService implements
ExecutorService, Externalizable {
ctx.gateway().readLock();
try {
- fut = ctx.closure().callAsync(BALANCE, task, prj.nodes());
+ fut = ctx.closure().callAsync(BALANCE, task, options());
}
finally {
ctx.gateway().readUnlock();
@@ -497,7 +500,7 @@ public class GridExecutorService implements
ExecutorService, Externalizable {
ctx.gateway().readLock();
try {
- fut = ctx.closure().callAsync(BALANCE, cmd, prj.nodes());
+ fut = ctx.closure().callAsync(BALANCE, cmd, options());
}
finally {
ctx.gateway().readUnlock();
@@ -574,7 +577,7 @@ public class GridExecutorService implements
ExecutorService, Externalizable {
ctx.gateway().readLock();
try {
- addFuture(ctx.closure().runAsync(BALANCE, cmd, prj.nodes()));
+ addFuture(ctx.closure().runAsync(BALANCE, cmd, options()));
}
finally {
ctx.gateway().readUnlock();
@@ -727,4 +730,9 @@ public class GridExecutorService implements
ExecutorService, Externalizable {
}
}
}
+
+ /** @return Task execution options with node projection automatically set.
*/
+ private TaskExecutionOptions options() {
+ return TaskExecutionOptions.options(prj.nodes());
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
index caddc85d9b6..4067df9665f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
@@ -70,11 +70,13 @@ import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
+
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.internal.GridClosureCallMode.BROADCAST;
import static
org.apache.ignite.internal.processors.affinity.GridAffinityUtils.affinityJob;
import static
org.apache.ignite.internal.processors.affinity.GridAffinityUtils.unmarshall;
+import static
org.apache.ignite.internal.processors.task.TaskExecutionOptions.options;
/**
* Data affinity processor.
@@ -606,7 +608,13 @@ public class GridAffinityProcessor extends
GridProcessorAdapter {
*/
private IgniteInternalFuture<AffinityInfo> affinityInfoFromNode(String
cacheName, AffinityTopologyVersion topVer, ClusterNode n) {
IgniteInternalFuture<GridTuple3<GridAffinityMessage,
GridAffinityMessage, GridAffinityAssignment>> fut = ctx.closure()
- .callAsyncNoFailover(BROADCAST, affinityJob(cacheName, topVer),
F.asList(n), true/*system pool*/, 0, false);
+ .callAsync(
+ BROADCAST,
+ affinityJob(cacheName, topVer),
+ options(F.asList(n))
+ .withFailoverDisabled()
+ .asSystemTask()
+ );
return fut.chain(
new CX1<IgniteInternalFuture<GridTuple3<GridAffinityMessage,
GridAffinityMessage, GridAffinityAssignment>>, AffinityInfo>() {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 36378b008f8..9eb04c8b208 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -76,7 +76,6 @@ import org.apache.ignite.compute.ComputeTaskAdapter;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.TransactionConfiguration;
-import org.apache.ignite.internal.ComputeTaskInternalFuture;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteFeatures;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -174,8 +173,7 @@ import static
org.apache.ignite.internal.processors.cache.distributed.dht.topolo
import static org.apache.ignite.internal.processors.dr.GridDrType.DR_LOAD;
import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE;
import static
org.apache.ignite.internal.processors.metric.impl.MetricUtils.cacheMetricsRegistryName;
-import static
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_NO_FAILOVER;
-import static
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBGRID;
+import static
org.apache.ignite.internal.processors.task.TaskExecutionOptions.options;
import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
import static
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static
org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
@@ -1181,10 +1179,11 @@ public abstract class GridCacheAdapter<K, V> implements
IgniteInternalCache<K, V
Collection<ClusterNode> srvNodes =
ctx.grid().cluster().forCacheNodes(name(), !near, near, false).nodes();
if (!srvNodes.isEmpty()) {
- ctx.kernalContext().task().setThreadContext(TC_SUBGRID, srvNodes);
-
return ctx.kernalContext().task().execute(
- new ClearTask(ctx.name(),
ctx.affinity().affinityTopologyVersion(), keys, near), null);
+ new ClearTask(ctx.name(),
ctx.affinity().affinityTopologyVersion(), keys, near),
+ null,
+ options(srvNodes)
+ );
}
else
return new GridFinishedFuture<>();
@@ -1211,8 +1210,12 @@ public abstract class GridCacheAdapter<K, V> implements
IgniteInternalCache<K, V
return new GridFinishedFuture<>();
}
- return ctx.closures().affinityRun(Collections.singleton(name()), part,
- new PartitionPreloadJob(ctx.name(), part), grp.nodes(), null);
+ return ctx.closures().affinityRun(
+ Collections.singleton(name()),
+ part,
+ new PartitionPreloadJob(ctx.name(), part),
+ options(grp.nodes())
+ );
}
/**
@@ -3818,12 +3821,13 @@ public abstract class GridCacheAdapter<K, V> implements
IgniteInternalCache<K, V
if (nodes.isEmpty())
return new GridFinishedFuture<>();
- return ctx.closures().callAsyncNoFailover(BROADCAST,
+ return ctx.closures().callAsync(
+ BROADCAST,
new LoadKeysCallable<>(ctx.name(), keys, update, plc, keepBinary),
- nodes,
- true,
- 0,
- false);
+ options(nodes)
+ .withFailoverDisabled()
+ .asSystemTask()
+ );
}
/**
@@ -3929,8 +3933,6 @@ public abstract class GridCacheAdapter<K, V> implements
IgniteInternalCache<K, V
IgniteInternalFuture<?> globalLoadCacheAsync(@Nullable
IgniteBiPredicate<K, V> p, @Nullable Object... args)
throws IgniteCheckedException {
- ctx.kernalContext().task().setThreadContext(TC_NO_FAILOVER, true);
-
CacheOperationContext opCtx = ctx.operationContextPerCall();
ExpiryPolicy plc = opCtx != null ? opCtx.expiry() : null;
@@ -3944,10 +3946,10 @@ public abstract class GridCacheAdapter<K, V> implements
IgniteInternalCache<K, V
final boolean keepBinary = opCtx != null && opCtx.isKeepBinary();
- ComputeTaskInternalFuture fut =
ctx.kernalContext().closure().callAsync(BROADCAST,
- Collections.singletonList(
- new LoadCacheJobV2<>(ctx.name(),
ctx.affinity().affinityTopologyVersion(), p, args, plc, keepBinary)),
- nodes);
+ IgniteInternalFuture<?> fut = ctx.kernalContext().closure().callAsync(
+ BROADCAST,
+ Collections.singletonList(new LoadCacheJobV2<>(ctx.name(),
ctx.affinity().affinityTopologyVersion(), p, args, plc, keepBinary)),
+ options(nodes));
return fut;
}
@@ -3982,10 +3984,11 @@ public abstract class GridCacheAdapter<K, V> implements
IgniteInternalCache<K, V
if (nodes.isEmpty())
return new GridFinishedFuture<>(0);
- ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes);
-
return ctx.kernalContext().task().execute(
- new SizeTask(ctx.name(), ctx.affinity().affinityTopologyVersion(),
peekModes), null);
+ new SizeTask(ctx.name(), ctx.affinity().affinityTopologyVersion(),
peekModes),
+ null,
+ options(nodes)
+ );
}
/** {@inheritDoc} */
@@ -4003,10 +4006,11 @@ public abstract class GridCacheAdapter<K, V> implements
IgniteInternalCache<K, V
if (nodes.isEmpty())
return new GridFinishedFuture<>(0L);
- ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes);
-
return ctx.kernalContext().task().execute(
- new SizeLongTask(ctx.name(),
ctx.affinity().affinityTopologyVersion(), peekModes), null);
+ new SizeLongTask(ctx.name(),
ctx.affinity().affinityTopologyVersion(), peekModes),
+ null,
+ options(nodes)
+ );
}
/** {@inheritDoc} */
@@ -4032,10 +4036,11 @@ public abstract class GridCacheAdapter<K, V> implements
IgniteInternalCache<K, V
if (nodes.isEmpty())
return new GridFinishedFuture<>(0L);
- ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes);
-
return ctx.kernalContext().task().execute(
- new PartitionSizeLongTask(ctx.name(),
ctx.affinity().affinityTopologyVersion(), peekModes, part), null);
+ new PartitionSizeLongTask(ctx.name(),
ctx.affinity().affinityTopologyVersion(), peekModes, part),
+ null,
+ options(nodes)
+ );
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
index c5f493b694d..2defa91c2b4 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
@@ -73,6 +73,7 @@ import org.jetbrains.annotations.Nullable;
import static javax.cache.event.EventType.REMOVED;
import static org.apache.ignite.internal.GridClosureCallMode.BROADCAST;
+import static
org.apache.ignite.internal.processors.task.TaskExecutionOptions.options;
/**
*
@@ -474,11 +475,13 @@ public class CacheDataStructuresManager extends
GridCacheManagerAdapter {
Collection<ClusterNode> nodes = cctx.discovery().nodes(topVer);
try {
- cctx.closures().callAsyncNoFailover(BROADCAST,
+ cctx.closures().callAsync(
+ BROADCAST,
new BlockSetCallable(cctx.name(), id),
- nodes,
- true,
- 0, false).get();
+ options(nodes)
+ .withFailoverDisabled()
+ .asSystemTask()
+ ).get();
// Separated cache will be destroyed after the set is blocked.
if (separated)
@@ -504,11 +507,13 @@ public class CacheDataStructuresManager extends
GridCacheManagerAdapter {
Collection<ClusterNode> affNodes = CU.affinityNodes(cctx, topVer);
try {
- cctx.closures().callAsyncNoFailover(BROADCAST,
+ cctx.closures().callAsync(
+ BROADCAST,
new RemoveSetDataCallable(cctx.name(), id, topVer),
- affNodes,
- true,
- 0, false).get();
+ options(affNodes)
+ .withFailoverDisabled()
+ .asSystemTask()
+ ).get();
}
catch (IgniteCheckedException e) {
if (e.hasCause(ClusterTopologyCheckedException.class)) {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
index 5214abaa84b..4a5abb62b25 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
@@ -62,7 +62,7 @@ import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import static
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
-import static
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBGRID;
+import static
org.apache.ignite.internal.processors.task.TaskExecutionOptions.options;
/**
* Distributed cache implementation.
@@ -184,10 +184,11 @@ public abstract class GridDistributedCacheAdapter<K, V>
extends GridCacheAdapter
Collection<ClusterNode> nodes =
ctx.grid().cluster().forDataNodes(name()).nodes();
if (!nodes.isEmpty()) {
- ctx.kernalContext().task().setThreadContext(TC_SUBGRID,
nodes);
-
retry = !ctx.kernalContext().task().execute(
- new RemoveAllTask(ctx.name(), topVer, skipStore,
keepBinary), null).get();
+ new RemoveAllTask(ctx.name(), topVer, skipStore,
keepBinary),
+ null,
+ options(nodes)
+ ).get();
}
}
while (ctx.affinity().affinityTopologyVersion().compareTo(topVer)
!= 0 || retry);
@@ -225,10 +226,11 @@ public abstract class GridDistributedCacheAdapter<K, V>
extends GridCacheAdapter
Collection<ClusterNode> nodes =
ctx.grid().cluster().forDataNodes(name()).nodes();
if (!nodes.isEmpty()) {
- ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes);
-
IgniteInternalFuture<Boolean> rmvAll =
ctx.kernalContext().task().execute(
- new RemoveAllTask(ctx.name(), topVer, skipStore, keepBinary),
null);
+ new RemoveAllTask(ctx.name(), topVer, skipStore, keepBinary),
+ null,
+ options(nodes)
+ );
rmvAll.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>()
{
@Override public void apply(IgniteInternalFuture<Boolean> fut)
{
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
index 5d655f5b760..212e5ba8b07 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
@@ -225,8 +225,7 @@ import static
org.apache.ignite.internal.processors.cache.persistence.tree.io.Pa
import static
org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO.getType;
import static
org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO.getVersion;
import static
org.apache.ignite.internal.processors.configuration.distributed.DistributedLongProperty.detachedLongProperty;
-import static
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SKIP_AUTH;
-import static
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBGRID;
+import static
org.apache.ignite.internal.processors.task.TaskExecutionOptions.options;
import static org.apache.ignite.internal.util.GridUnsafe.bufferAddress;
import static
org.apache.ignite.internal.util.IgniteUtils.isLocalNodeCoordinator;
import static
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.END_SNAPSHOT;
@@ -1264,12 +1263,13 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
cctx.kernalContext().security().authorize(ADMIN_SNAPSHOT);
return cctx.kernalContext().closure()
- .callAsyncNoFailover(BROADCAST,
+ .callAsync(
+ BROADCAST,
new CancelSnapshotCallable(null, name),
- cctx.discovery().aliveServerNodes(),
- false,
- 0,
- true);
+ options(cctx.discovery().aliveServerNodes())
+ .withFailoverDisabled()
+ .withAuthenticationDisabled()
+ );
}
/**
@@ -1282,12 +1282,13 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
cctx.kernalContext().security().authorize(ADMIN_SNAPSHOT);
IgniteInternalFuture<Boolean> fut0 = cctx.kernalContext().closure()
- .callAsyncNoFailover(BROADCAST,
+ .callAsync(
+ BROADCAST,
new CancelSnapshotCallable(reqId, null),
- cctx.discovery().aliveServerNodes(),
- false,
- 0,
- true);
+ options(cctx.discovery().aliveServerNodes())
+ .withFailoverDisabled()
+ .withAuthenticationDisabled()
+ );
return new IgniteFutureImpl<>(fut0);
}
@@ -1431,12 +1432,13 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
Collection<ClusterNode> bltNodes =
F.view(cctx.discovery().serverNodes(AffinityTopologyVersion.NONE),
(node) -> CU.baselineNode(node, kctx0.state().clusterState()));
- kctx0.task().setThreadContext(TC_SKIP_AUTH, true);
- kctx0.task().setThreadContext(TC_SUBGRID, bltNodes);
-
SnapshotMetadataCollectorTaskArg taskArg = new
SnapshotMetadataCollectorTaskArg(name, snpPath);
- kctx0.task().execute(SnapshotMetadataCollectorTask.class,
taskArg).listen(f0 -> {
+ kctx0.task().execute(
+ SnapshotMetadataCollectorTask.class,
+ taskArg,
+ options(bltNodes).withAuthenticationDisabled()
+ ).listen(f0 -> {
if (f0.error() == null) {
Map<ClusterNode, List<SnapshotMetadata>> metas = f0.result();
@@ -1511,14 +1513,14 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
return;
}
- kctx0.task().setThreadContext(TC_SKIP_AUTH, true);
- kctx0.task().setThreadContext(TC_SUBGRID, new
ArrayList<>(metas.keySet()));
-
Class<? extends AbstractSnapshotVerificationTask> cls =
includeCustomHandlers ? SnapshotHandlerRestoreTask.class :
SnapshotPartitionsVerifyTask.class;
- kctx0.task().execute(cls, new
SnapshotPartitionsVerifyTaskArg(grps, metas, snpPath))
- .listen(f1 -> {
+ kctx0.task().execute(
+ cls,
+ new SnapshotPartitionsVerifyTaskArg(grps, metas,
snpPath),
+ options(new
ArrayList<>(metas.keySet())).withAuthenticationDisabled()
+ ).listen(f1 -> {
if (f1.error() == null)
res.onDone(f1.result());
else if (f1.error() instanceof
IgniteSnapshotVerifyException)
@@ -1693,12 +1695,13 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
throw new IgniteException("There is no alive server nodes
in the cluster");
return new
IgniteSnapshotFutureImpl(cctx.kernalContext().closure()
- .callAsyncNoFailover(BALANCE,
+ .callAsync(
+ BALANCE,
new CreateSnapshotCallable(name),
- Collections.singletonList(crd),
- false,
- 0,
- true));
+ options(Collections.singletonList(crd))
+ .withFailoverDisabled()
+ .withAuthenticationDisabled()
+ ));
}
ClusterSnapshotFuture snpFut0;
@@ -2323,10 +2326,11 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
Collection<ClusterNode> bltNodes =
F.view(cctx.discovery().serverNodes(AffinityTopologyVersion.NONE),
(node) -> CU.baselineNode(node,
cctx.kernalContext().state().clusterState()));
- cctx.kernalContext().task().setThreadContext(TC_SKIP_AUTH, true);
- cctx.kernalContext().task().setThreadContext(TC_SUBGRID, bltNodes);
-
- return new
IgniteFutureImpl<>(cctx.kernalContext().task().execute(taskCls, snpName));
+ return new IgniteFutureImpl<>(cctx.kernalContext().task().execute(
+ taskCls,
+ snpName,
+ options(bltNodes).withAuthenticationDisabled()
+ ));
}
/**
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 31f02b8cc61..a139361a2a9 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -135,6 +135,7 @@ import
org.apache.ignite.spi.indexing.IndexingQueryFilterImpl;
import org.apache.ignite.spi.indexing.IndexingSpi;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
+
import static org.apache.ignite.IgniteSystemProperties.IGNITE_QUIET;
import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
@@ -150,6 +151,7 @@ import static
org.apache.ignite.internal.processors.cache.query.GridCacheQueryTy
import static
org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS;
import static
org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.TEXT;
import static
org.apache.ignite.internal.processors.security.SecurityUtils.securitySubjectId;
+import static
org.apache.ignite.internal.processors.task.TaskExecutionOptions.options;
/**
* Query and index manager.
@@ -1868,7 +1870,13 @@ public abstract class GridCacheQueryManager<K, V>
extends GridCacheManagerAdapte
// Get metadata from remote nodes.
if (!nodes.isEmpty())
- rmtFut = cctx.closures().callAsyncNoFailover(BROADCAST,
Collections.singleton(job), nodes, true, 0);
+ rmtFut = cctx.closures().callAsync(
+ BROADCAST,
+ Collections.singleton(job),
+ options(nodes)
+ .withFailoverDisabled()
+ .asSystemTask()
+ );
// Get local metadata.
IgniteInternalFuture<Collection<CacheSqlMetadata>> locFut =
cctx.closures().callLocalSafe(job, true);
@@ -1968,7 +1976,13 @@ public abstract class GridCacheQueryManager<K, V>
extends GridCacheManagerAdapte
if (!allNodesNew)
return sqlMetadata();
- rmtFut = cctx.closures().callAsyncNoFailover(BROADCAST,
Collections.singleton(job), nodes, true, 0);
+ rmtFut = cctx.closures().callAsync(
+ BROADCAST,
+ Collections.singleton(job),
+ options(nodes)
+ .withFailoverDisabled()
+ .asSystemTask()
+ );
}
// Get local metadata.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
index 068a35fb8a2..69bdd33378d 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
@@ -50,6 +50,7 @@ import
org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.pool.PoolProcessor;
import org.apache.ignite.internal.processors.resource.GridNoImplicitInjection;
+import org.apache.ignite.internal.processors.task.TaskExecutionOptions;
import org.apache.ignite.internal.util.GridSpinReadWriteLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -75,10 +76,6 @@ import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.compute.ComputeJobResultPolicy.FAILOVER;
import static org.apache.ignite.compute.ComputeJobResultPolicy.REDUCE;
-import static
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_NO_FAILOVER;
-import static
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SKIP_AUTH;
-import static
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBGRID;
-import static
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_TIMEOUT;
/**
*
@@ -147,28 +144,13 @@ public class GridClosureProcessor extends
GridProcessorAdapter {
/**
* @param mode Distribution mode.
* @param jobs Closures to execute.
- * @param nodes Grid nodes.
- * @param execName Custom executor name.
- * @return Task execution future.
- */
- public ComputeTaskInternalFuture<?> runAsync(GridClosureCallMode mode,
@Nullable Collection<? extends Runnable> jobs,
- @Nullable Collection<ClusterNode> nodes, @Nullable String execName) {
- return runAsync(mode, jobs, nodes, false, execName);
- }
-
- /**
- * @param mode Distribution mode.
- * @param jobs Closures to execute.
- * @param nodes Grid nodes.
- * @param sys If {@code true}, then system pool will be used.
- * @param execName Custom executor name.
+ * @param opts Task execution options.
* @return Task execution future.
*/
- public ComputeTaskInternalFuture<?> runAsync(GridClosureCallMode mode,
+ public ComputeTaskInternalFuture<?> runAsync(
+ GridClosureCallMode mode,
Collection<? extends Runnable> jobs,
- @Nullable Collection<ClusterNode> nodes,
- boolean sys,
- @Nullable String execName
+ TaskExecutionOptions opts
) {
assert mode != null;
assert !F.isEmpty(jobs) : jobs;
@@ -181,12 +163,10 @@ public class GridClosureProcessor extends
GridProcessorAdapter {
new IgniteCheckedException("Closure processor cannot be
used on stopped grid: " + ctx.igniteInstanceName()));
}
- if (F.isEmpty(nodes))
+ if (F.isEmpty(opts.projection()))
return ComputeTaskInternalFuture.finishedFuture(ctx, T1.class,
U.emptyTopologyException());
- ctx.task().setThreadContext(TC_SUBGRID, nodes);
-
- return ctx.task().execute(new T1(mode, jobs), null, sys, execName);
+ return ctx.task().execute(new T1(mode, jobs), null, opts);
}
finally {
busyLock.readUnlock();
@@ -196,39 +176,13 @@ public class GridClosureProcessor extends
GridProcessorAdapter {
/**
* @param mode Distribution mode.
* @param job Closure to execute.
- * @param nodes Grid nodes.
- * @return Task execution future.
- */
- public ComputeTaskInternalFuture<?> runAsync(GridClosureCallMode mode,
Runnable job,
- @Nullable Collection<ClusterNode> nodes) {
- return runAsync(mode, job, nodes, null);
- }
-
- /**
- * @param mode Distribution mode.
- * @param job Closure to execute.
- * @param nodes Grid nodes.
- * @param execName Custom executor name.
- * @return Task execution future.
- */
- public ComputeTaskInternalFuture<?> runAsync(GridClosureCallMode mode,
Runnable job,
- @Nullable Collection<ClusterNode> nodes, @Nullable String execName) {
- return runAsync(mode, job, nodes, false, execName);
- }
-
- /**
- * @param mode Distribution mode.
- * @param job Closure to execute.
- * @param nodes Grid nodes.
- * @param sys If {@code true}, then system pool will be used.
- * @param execName Custom executor name.
+ * @param opts Task execution options.
* @return Task execution future.
*/
- public ComputeTaskInternalFuture<?> runAsync(GridClosureCallMode mode,
+ public ComputeTaskInternalFuture<?> runAsync(
+ GridClosureCallMode mode,
Runnable job,
- @Nullable Collection<ClusterNode> nodes,
- boolean sys,
- @Nullable String execName
+ TaskExecutionOptions opts
) {
assert mode != null;
assert job != null;
@@ -236,12 +190,10 @@ public class GridClosureProcessor extends
GridProcessorAdapter {
busyLock.readLock();
try {
- if (F.isEmpty(nodes))
+ if (F.isEmpty(opts.projection()))
return ComputeTaskInternalFuture.finishedFuture(ctx, T2.class,
U.emptyTopologyException());
- ctx.task().setThreadContext(TC_SUBGRID, nodes);
-
- return ctx.task().execute(new T2(mode, job), null, sys, execName);
+ return ctx.task().execute(new T2(mode, job), null, opts);
}
finally {
busyLock.readUnlock();
@@ -359,17 +311,16 @@ public class GridClosureProcessor extends
GridProcessorAdapter {
* @param mode Distribution mode.
* @param jobs Closures to execute.
* @param rdc Reducer.
- * @param nodes Grid nodes.
- * @param execName Custom executor name.
+ * @param opts Task execution options.
* @param <R1> Type.
* @param <R2> Type.
* @return Reduced result.
*/
- public <R1, R2> ComputeTaskInternalFuture<R2>
forkjoinAsync(GridClosureCallMode mode,
+ public <R1, R2> ComputeTaskInternalFuture<R2> forkjoinAsync(
+ GridClosureCallMode mode,
Collection<? extends Callable<R1>> jobs,
IgniteReducer<R1, R2> rdc,
- @Nullable Collection<ClusterNode> nodes,
- @Nullable String execName
+ TaskExecutionOptions opts
) {
assert mode != null;
assert rdc != null;
@@ -378,12 +329,10 @@ public class GridClosureProcessor extends
GridProcessorAdapter {
busyLock.readLock();
try {
- if (F.isEmpty(nodes))
+ if (F.isEmpty(opts.projection()))
return ComputeTaskInternalFuture.finishedFuture(ctx, T3.class,
U.emptyTopologyException());
- ctx.task().setThreadContext(TC_SUBGRID, nodes);
-
- return ctx.task().execute(new T3<>(mode, jobs, rdc), null,
execName);
+ return ctx.task().execute(new T3<>(mode, jobs, rdc), null, opts);
}
finally {
busyLock.readUnlock();
@@ -393,112 +342,55 @@ public class GridClosureProcessor extends
GridProcessorAdapter {
/**
* @param mode Distribution mode.
* @param jobs Closures to execute.
- * @param nodes Grid nodes.
- * @param <R> Type.
- * @return Grid future for collection of closure results.
- */
- public <R> ComputeTaskInternalFuture<Collection<R>> callAsync(
- GridClosureCallMode mode,
- @Nullable Collection<? extends Callable<R>> jobs,
- @Nullable Collection<ClusterNode> nodes) {
- return callAsync(mode, jobs, nodes, null);
- }
-
- /**
- * @param mode Distribution mode.
- * @param jobs Closures to execute.
- * @param nodes Grid nodes.
- * @param execName Custom executor name.
+ * @param opts Task execution options.
* @param <R> Type.
* @return Grid future for collection of closure results.
*/
- public <R> ComputeTaskInternalFuture<Collection<R>> callAsync(
+ public <R> IgniteInternalFuture<Collection<R>> callAsync(
GridClosureCallMode mode,
- @Nullable Collection<? extends Callable<R>> jobs,
- @Nullable Collection<ClusterNode> nodes,
- @Nullable String execName) {
- return callAsync(mode, jobs, nodes, false, execName);
- }
-
- /**
- * @param mode Distribution mode.
- * @param jobs Closures to execute.
- * @param nodes Grid nodes.
- * @param sys If {@code true}, then system pool will be used.
- * @param execName Custom executor name.
- * @param <R> Type.
- * @return Grid future for collection of closure results.
- */
- public <R> ComputeTaskInternalFuture<Collection<R>>
callAsync(GridClosureCallMode mode,
Collection<? extends Callable<R>> jobs,
- @Nullable Collection<ClusterNode> nodes,
- boolean sys,
- @Nullable String execName
+ TaskExecutionOptions opts
) {
assert mode != null;
+ assert opts.isFailoverDisabled() || !F.isEmpty(jobs);
+
assert !F.isEmpty(jobs);
busyLock.readLock();
try {
- if (F.isEmpty(nodes))
- return ComputeTaskInternalFuture.finishedFuture(ctx, T6.class,
U.emptyTopologyException());
+ if (F.isEmpty(jobs) && opts.isFailoverDisabled())
+ return new GridFinishedFuture<>();
- ctx.task().setThreadContext(TC_SUBGRID, nodes);
+ if (F.isEmpty(opts.projection()))
+ return ComputeTaskInternalFuture.finishedFuture(ctx, T6.class,
U.emptyTopologyException());
- return ctx.task().execute(new T6<>(mode, jobs), null, sys,
execName);
+ return ctx.task().execute(new T6<>(mode, jobs), null, opts);
}
finally {
busyLock.readUnlock();
}
}
- /**
- * @param mode Distribution mode.
- * @param job Closure to execute.
- * @param nodes Grid nodes.
- * @param <R> Type.
- * @return Grid future for collection of closure results.
- */
- public <R> ComputeTaskInternalFuture<R> callAsync(GridClosureCallMode mode,
- @Nullable Callable<R> job, @Nullable Collection<ClusterNode> nodes) {
- return callAsync(mode, job, nodes, null);
- }
-
- /**
- * @param mode Distribution mode.
- * @param job Closure to execute.
- * @param nodes Grid nodes.
- * @param execName Custom executor name.
- * @param <R> Type.
- * @return Grid future for collection of closure results.
- */
- public <R> ComputeTaskInternalFuture<R> callAsync(GridClosureCallMode mode,
- @Nullable Callable<R> job, @Nullable Collection<ClusterNode> nodes,
- @Nullable String execName) {
- return callAsync(mode, job, nodes, false, execName);
- }
-
/**
* @param cacheNames Cache names.
* @param partId Partition.
* @param job Closure to execute.
- * @param nodes Grid nodes.
- * @param execName Custom executor name.
+ * @param opts Task execution options.
* @return Grid future for collection of closure results.
* @throws IgniteCheckedException If failed.
*/
public <R> ComputeTaskInternalFuture<R> affinityCall(@NotNull
Collection<String> cacheNames,
int partId,
Callable<R> job,
- @Nullable Collection<ClusterNode> nodes,
- @Nullable String execName) throws IgniteCheckedException {
+ TaskExecutionOptions opts
+ ) throws IgniteCheckedException {
assert partId >= 0 : partId;
busyLock.readLock();
try {
- if (F.isEmpty(nodes))
+ if (F.isEmpty(opts.projection()))
return ComputeTaskInternalFuture.finishedFuture(ctx, T5.class,
U.emptyTopologyException());
final String cacheName = F.first(cacheNames);
@@ -509,10 +401,7 @@ public class GridClosureProcessor extends
GridProcessorAdapter {
if (node == null)
return ComputeTaskInternalFuture.finishedFuture(ctx, T5.class,
U.emptyTopologyException());
- ctx.task().setThreadContext(TC_SUBGRID, nodes);
-
- return ctx.task().execute(new T5(node, job, cacheNames, partId,
mapTopVer), null,
- false, execName);
+ return ctx.task().execute(new T5(node, job, cacheNames, partId,
mapTopVer), null, opts);
}
finally {
busyLock.readUnlock();
@@ -523,22 +412,22 @@ public class GridClosureProcessor extends
GridProcessorAdapter {
* @param cacheNames Cache names.
* @param partId Partition.
* @param job Job.
- * @param nodes Grid nodes.
- * @param execName Custom executor name.
+ * @param opts Task execution options.
* @return Job future.
* @throws IgniteCheckedException If failed.
*/
- public ComputeTaskInternalFuture<?> affinityRun(@NotNull
Collection<String> cacheNames,
+ public ComputeTaskInternalFuture<?> affinityRun(
+ @NotNull Collection<String> cacheNames,
int partId,
Runnable job,
- @Nullable Collection<ClusterNode> nodes,
- @Nullable String execName) throws IgniteCheckedException {
+ TaskExecutionOptions opts
+ ) throws IgniteCheckedException {
assert partId >= 0 : partId;
busyLock.readLock();
try {
- if (F.isEmpty(nodes))
+ if (F.isEmpty(opts.projection()))
return ComputeTaskInternalFuture.finishedFuture(ctx, T4.class,
U.emptyTopologyException());
final String cacheName = F.first(cacheNames);
@@ -549,10 +438,7 @@ public class GridClosureProcessor extends
GridProcessorAdapter {
if (node == null)
return ComputeTaskInternalFuture.finishedFuture(ctx, T4.class,
U.emptyTopologyException());
- ctx.task().setThreadContext(TC_SUBGRID, nodes);
-
- return ctx.task().execute(new T4(node, job, cacheNames, partId,
mapTopVer), null,
- false, execName);
+ return ctx.task().execute(new T4(node, job, cacheNames, partId,
mapTopVer), null, opts);
}
finally {
busyLock.readUnlock();
@@ -560,118 +446,30 @@ public class GridClosureProcessor extends
GridProcessorAdapter {
}
/**
- * @param <R> Type.
* @param mode Distribution mode.
* @param job Closure to execute.
- * @param nodes Grid nodes.
- * @param sys If {@code true}, then system pool will be used.
- * @param timeout Timeout.
- * @param skipAuth Skip authorization check.
- * @return Grid future for collection of closure results.
- */
- public <R> IgniteInternalFuture<R> callAsyncNoFailover(
- GridClosureCallMode mode,
- @Nullable Callable<R> job,
- @Nullable Collection<ClusterNode> nodes,
- boolean sys,
- long timeout,
- boolean skipAuth) {
- assert mode != null;
- assert timeout >= 0 : timeout;
-
- busyLock.readLock();
-
- try {
- if (job == null)
- return new GridFinishedFuture<>();
-
- if (F.isEmpty(nodes))
- return new GridFinishedFuture<>(U.emptyTopologyException());
-
- ctx.task().setThreadContext(TC_NO_FAILOVER, true);
- ctx.task().setThreadContext(TC_SUBGRID, nodes);
-
- if (skipAuth)
- ctx.task().setThreadContext(TC_SKIP_AUTH, true);
-
- if (timeout > 0)
- ctx.task().setThreadContext(TC_TIMEOUT, timeout);
-
- return ctx.task().execute(new T7<>(mode, job), null, sys);
- }
- finally {
- busyLock.readUnlock();
- }
- }
-
- /**
- * @param mode Distribution mode.
- * @param jobs Closures to execute.
- * @param nodes Grid nodes.
- * @param sys If {@code true}, then system pool will be used.
- * @param timeout If greater than 0 limits task execution. Cannot be
negative.
* @param <R> Type.
+ * @param opts Task execution options.
* @return Grid future for collection of closure results.
*/
- public <R> IgniteInternalFuture<Collection<R>> callAsyncNoFailover(
+ public <R> IgniteInternalFuture<R> callAsync(
GridClosureCallMode mode,
- @Nullable Collection<? extends Callable<R>> jobs,
- @Nullable Collection<ClusterNode> nodes,
- boolean sys,
- long timeout
+ Callable<R> job,
+ TaskExecutionOptions opts
) {
assert mode != null;
- assert timeout >= 0 : timeout;
+ assert opts.isFailoverDisabled() || job != null;
busyLock.readLock();
try {
- if (F.isEmpty(jobs))
+ if (job == null && opts.isFailoverDisabled())
return new GridFinishedFuture<>();
- if (F.isEmpty(nodes))
- return new GridFinishedFuture<>(U.emptyTopologyException());
-
- ctx.task().setThreadContext(TC_NO_FAILOVER, true);
- ctx.task().setThreadContext(TC_SUBGRID, nodes);
-
- if (timeout > 0)
- ctx.task().setThreadContext(TC_TIMEOUT, timeout);
-
- return ctx.task().execute(new T6<>(mode, jobs), null, sys);
- }
- finally {
- busyLock.readUnlock();
- }
- }
-
- /**
- * @param mode Distribution mode.
- * @param job Closure to execute.
- * @param nodes Grid nodes.
- * @param sys If {@code true}, then system pool will be used.
- * @param execName Custom executor name.
- * @param <R> Type.
- * @return Grid future for collection of closure results.
- */
- public <R> ComputeTaskInternalFuture<R> callAsync(GridClosureCallMode mode,
- Callable<R> job,
- @Nullable Collection<ClusterNode> nodes,
- boolean sys,
- @Nullable String execName
- ) {
- assert mode != null;
- assert job != null;
-
- busyLock.readLock();
-
- try {
- if (F.isEmpty(nodes))
+ if (F.isEmpty(opts.projection()))
return ComputeTaskInternalFuture.finishedFuture(ctx, T7.class,
U.emptyTopologyException());
- ctx.task().setThreadContext(TC_SUBGRID, nodes);
-
- return ctx.task().execute(new T7<>(mode, job), null, sys,
execName);
+ return ctx.task().execute(new T7<>(mode, job), null, opts);
}
finally {
busyLock.readUnlock();
@@ -681,21 +479,17 @@ public class GridClosureProcessor extends
GridProcessorAdapter {
/**
* @param job Job closure.
* @param arg Optional job argument.
- * @param nodes Grid nodes.
- * @param execName Custom executor name.
+ * @param opts Task execution options.
* @return Grid future for execution result.
*/
- public <T, R> ComputeTaskInternalFuture<R> callAsync(IgniteClosure<T, R>
job, @Nullable T arg,
- @Nullable Collection<ClusterNode> nodes, @Nullable String execName) {
+ public <T, R> ComputeTaskInternalFuture<R> callAsync(IgniteClosure<T, R>
job, @Nullable T arg, TaskExecutionOptions opts) {
busyLock.readLock();
try {
- if (F.isEmpty(nodes))
+ if (F.isEmpty(opts.projection()))
return ComputeTaskInternalFuture.finishedFuture(ctx, T8.class,
U.emptyTopologyException());
- ctx.task().setThreadContext(TC_SUBGRID, nodes);
-
- return ctx.task().execute(new T8(job, arg), null, false, execName);
+ return ctx.task().execute(new T8(job, arg), null, opts);
}
finally {
busyLock.readUnlock();
@@ -705,21 +499,17 @@ public class GridClosureProcessor extends
GridProcessorAdapter {
/**
* @param job Job closure.
* @param arg Optional job argument.
- * @param nodes Grid nodes.
- * @param execName Custom executor name.
+ * @param opts Task execution options.
* @return Grid future for execution result.
*/
- public <T, R> IgniteInternalFuture<Collection<R>>
broadcast(IgniteClosure<T, R> job, @Nullable T arg,
- @Nullable Collection<ClusterNode> nodes, @Nullable String execName) {
+ public <T, R> IgniteInternalFuture<Collection<R>>
broadcast(IgniteClosure<T, R> job, @Nullable T arg, TaskExecutionOptions opts) {
busyLock.readLock();
try {
- if (F.isEmpty(nodes))
+ if (F.isEmpty(opts.projection()))
return new GridFinishedFuture<>(U.emptyTopologyException());
- ctx.task().setThreadContext(TC_SUBGRID, nodes);
-
- return ctx.task().execute(new T11<>(job), arg, false, execName);
+ return ctx.task().execute(new T11<>(job), arg, opts);
}
finally {
busyLock.readUnlock();
@@ -729,24 +519,21 @@ public class GridClosureProcessor extends
GridProcessorAdapter {
/**
* @param job Job closure.
* @param args Job arguments.
- * @param nodes Grid nodes.
- * @param execName Custom executor name.
+ * @param opts Task execution options.
* @return Grid future for execution result.
*/
- public <T, R> ComputeTaskInternalFuture<Collection<R>>
callAsync(IgniteClosure<T, R> job,
+ public <T, R> ComputeTaskInternalFuture<Collection<R>> callAsync(
+ IgniteClosure<T, R> job,
@Nullable Collection<? extends T> args,
- @Nullable Collection<ClusterNode> nodes,
- @Nullable String execName
+ TaskExecutionOptions opts
) {
busyLock.readLock();
try {
- if (F.isEmpty(nodes))
+ if (F.isEmpty(opts.projection()))
return ComputeTaskInternalFuture.finishedFuture(ctx, T9.class,
U.emptyTopologyException());
- ctx.task().setThreadContext(TC_SUBGRID, nodes);
-
- return ctx.task().execute(new T9<>(job, args), null, false,
execName);
+ return ctx.task().execute(new T9<>(job, args), null, opts);
}
finally {
busyLock.readUnlock();
@@ -757,22 +544,22 @@ public class GridClosureProcessor extends
GridProcessorAdapter {
* @param job Job closure.
* @param args Job arguments.
* @param rdc Reducer.
- * @param nodes Grid nodes.
- * @param execName Custom executor name.
+ * @param opts Task execution options.
* @return Grid future for execution result.
*/
- public <T, R1, R2> ComputeTaskInternalFuture<R2>
callAsync(IgniteClosure<T, R1> job,
- Collection<? extends T> args, IgniteReducer<R1, R2> rdc, @Nullable
Collection<ClusterNode> nodes,
- @Nullable String execName) {
+ public <T, R1, R2> ComputeTaskInternalFuture<R2> callAsync(
+ IgniteClosure<T, R1> job,
+ Collection<? extends T> args,
+ IgniteReducer<R1, R2> rdc,
+ TaskExecutionOptions opts
+ ) {
busyLock.readLock();
try {
- if (F.isEmpty(nodes))
+ if (F.isEmpty(opts.projection()))
return ComputeTaskInternalFuture.finishedFuture(ctx,
T10.class, U.emptyTopologyException());
- ctx.task().setThreadContext(TC_SUBGRID, nodes);
-
- return ctx.task().execute(new T10<>(job, args, rdc), null, false,
execName);
+ return ctx.task().execute(new T10<>(job, args, rdc), null, opts);
}
finally {
busyLock.readUnlock();
@@ -1175,7 +962,7 @@ public class GridClosureProcessor extends
GridProcessorAdapter {
/**
* Task that is free of dragged in enclosing context for the method
- * {@link GridClosureProcessor#runAsync(GridClosureCallMode, Collection,
Collection,String)}.
+ * {@link GridClosureProcessor#runAsync(GridClosureCallMode, Collection,
TaskExecutionOptions)}.
*/
private class T1 extends TaskNoReduceAdapter<Void> implements
GridNoImplicitInjection {
/** */
@@ -1206,7 +993,7 @@ public class GridClosureProcessor extends
GridProcessorAdapter {
/**
* Task that is free of dragged in enclosing context for the method
- * {@link GridClosureProcessor#runAsync(GridClosureCallMode, Runnable,
Collection, String)}.
+ * {@link GridClosureProcessor#runAsync(GridClosureCallMode, Runnable,
TaskExecutionOptions)}.
*/
private class T2 extends TaskNoReduceAdapter<Void> implements
GridNoImplicitInjection {
/** */
@@ -1237,7 +1024,7 @@ public class GridClosureProcessor extends
GridProcessorAdapter {
/**
* Task that is free of dragged in enclosing context for the method
- * {@link GridClosureProcessor#forkjoinAsync(GridClosureCallMode,
Collection, org.apache.ignite.lang.IgniteReducer, Collection, String)}
+ * {@link GridClosureProcessor#forkjoinAsync(GridClosureCallMode,
Collection, IgniteReducer, TaskExecutionOptions)}
*/
private class T3<R1, R2> extends GridPeerDeployAwareTaskAdapter<Void, R2>
implements GridNoImplicitInjection {
/** */
@@ -1423,7 +1210,7 @@ public class GridClosureProcessor extends
GridProcessorAdapter {
/**
* Task that is free of dragged in enclosing context for the method
- * {@link GridClosureProcessor#callAsync(GridClosureCallMode, Collection,
Collection, String)}
+ * {@link GridClosureProcessor#callAsync(GridClosureCallMode, Collection,
TaskExecutionOptions)}
*/
private class T6<R> extends GridPeerDeployAwareTaskAdapter<Void,
Collection<R>> implements GridNoImplicitInjection {
/** */
@@ -1466,7 +1253,7 @@ public class GridClosureProcessor extends
GridProcessorAdapter {
/**
* Task that is free of dragged in enclosing context for the method
- * {@link GridClosureProcessor#callAsync(GridClosureCallMode, Callable,
Collection, String)}
+ * {@link GridClosureProcessor#callAsync(GridClosureCallMode, Callable,
TaskExecutionOptions)}
*/
private class T7<R> extends GridPeerDeployAwareTaskAdapter<Void, R>
implements GridNoImplicitInjection {
/** */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/compute/ClientComputeTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/compute/ClientComputeTask.java
index 48118378e1a..b579fb9ab5c 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/compute/ClientComputeTask.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/compute/ClientComputeTask.java
@@ -31,16 +31,14 @@ import
org.apache.ignite.internal.processors.platform.client.ClientObjectNotific
import org.apache.ignite.internal.processors.platform.client.ClientStatus;
import
org.apache.ignite.internal.processors.platform.client.IgniteClientException;
import org.apache.ignite.internal.processors.task.GridTaskProcessor;
+import org.apache.ignite.internal.processors.task.TaskExecutionOptions;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgnitePredicate;
import static
org.apache.ignite.internal.processors.platform.client.ClientMessageParser.OP_COMPUTE_TASK_FINISHED;
-import static
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_NO_FAILOVER;
-import static
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_NO_RESULT_CACHE;
-import static
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBGRID_PREDICATE;
-import static
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_TIMEOUT;
+import static
org.apache.ignite.internal.processors.task.TaskExecutionOptions.options;
/**
* Client compute task.
@@ -101,12 +99,17 @@ class ClientComputeTask implements ClientCloseableResource
{
IgnitePredicate<ClusterNode> nodePredicate = F.isEmpty(nodeIds) ? node
-> !node.isClient() :
F.nodeForNodeIds(nodeIds);
- task.setThreadContext(TC_SUBGRID_PREDICATE, nodePredicate);
- task.setThreadContext(TC_TIMEOUT, timeout);
- task.setThreadContext(TC_NO_FAILOVER, (flags & NO_FAILOVER_FLAG_MASK)
!= 0);
- task.setThreadContext(TC_NO_RESULT_CACHE, (flags &
NO_RESULT_CACHE_FLAG_MASK) != 0);
+ TaskExecutionOptions opts = options()
+ .withProjectionPredicate(nodePredicate)
+ .withTimeout(timeout);
- taskFut = task.execute(taskName, arg);
+ if ((flags & NO_FAILOVER_FLAG_MASK) != 0)
+ opts.withFailoverDisabled();
+
+ if ((flags & NO_RESULT_CACHE_FLAG_MASK) != 0)
+ opts.withResultCacheDisabled();
+
+ taskFut = task.execute(taskName, arg, opts);
// Fail fast.
if (taskFut.isDone() && taskFut.error() != null)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
index 298133233d5..835bc54ef08 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
@@ -46,8 +46,6 @@ import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.jetbrains.annotations.Nullable;
-import static
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBGRID;
-
/**
* Interop compute.
*/
@@ -291,8 +289,6 @@ public class PlatformCompute extends PlatformAbstractTarget
{
((PlatformBalancingMultiClosureTask)task).jobs(jobs);
}
- platformCtx.kernalContext().task().setThreadContext(TC_SUBGRID,
computeForPlatform.clusterGroup().nodes());
-
return executeNative0(task);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java
index 3e46c80290b..3ff4f8ab5f0 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java
@@ -121,7 +121,7 @@ import static
org.apache.ignite.internal.processors.rest.GridRestCommand.CACHE_S
import static
org.apache.ignite.internal.processors.rest.GridRestCommand.CACHE_UPDATE_TLL;
import static
org.apache.ignite.internal.processors.rest.GridRestCommand.DESTROY_CACHE;
import static
org.apache.ignite.internal.processors.rest.GridRestCommand.GET_OR_CREATE_CACHE;
-import static
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_NO_FAILOVER;
+import static
org.apache.ignite.internal.processors.task.TaskExecutionOptions.options;
/**
* Command handler for API requests.
@@ -762,11 +762,11 @@ public class GridCacheCommandHandler extends
GridRestCommandHandlerAdapter {
else {
ClusterGroup prj =
ctx.grid().cluster().forPredicate(F.nodeForNodeId(destId));
- ctx.task().setThreadContext(TC_NO_FAILOVER, true);
-
- return ctx.closure().callAsync(BALANCE,
+ return ctx.closure().callAsync(
+ BALANCE,
new FlaggedCacheOperationCallable(cacheName, cacheFlags, op,
key),
- prj.nodes());
+ options(prj.nodes()).withFailoverDisabled()
+ );
}
}
@@ -796,11 +796,11 @@ public class GridCacheCommandHandler extends
GridRestCommandHandlerAdapter {
else {
ClusterGroup prj =
ctx.grid().cluster().forPredicate(F.nodeForNodeId(destId));
- ctx.task().setThreadContext(TC_NO_FAILOVER, true);
-
- return ctx.closure().callAsync(BALANCE,
+ return ctx.closure().callAsync(
+ BALANCE,
new CacheOperationCallable(cacheName, op, key),
- prj.nodes());
+ options(prj.nodes()).withFailoverDisabled()
+ );
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
index 4994307b60e..c92986e14b3 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
@@ -77,8 +77,7 @@ import static
org.apache.ignite.internal.managers.communication.GridIoPolicy.SYS
import static org.apache.ignite.internal.processors.rest.GridRestCommand.EXE;
import static org.apache.ignite.internal.processors.rest.GridRestCommand.NOOP;
import static
org.apache.ignite.internal.processors.rest.GridRestCommand.RESULT;
-import static
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_NO_FAILOVER;
-import static
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_TIMEOUT;
+import static
org.apache.ignite.internal.processors.task.TaskExecutionOptions.options;
import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q;
/**
@@ -216,22 +215,20 @@ public class GridTaskCommandHandler extends
GridRestCommandHandlerAdapter {
final IgniteInternalFuture<Object> taskFut;
if (locExec) {
- ctx.task().setThreadContext(TC_TIMEOUT, timeout);
-
Object arg = !F.isEmpty(params) ? params.size() == 1 ?
params.get(0) : params.toArray() : null;
- taskFut = ctx.task().execute(name, arg);
+ taskFut = ctx.task().execute(name, arg,
options().withTimeout(timeout));
}
else {
// Using predicate instead of node intentionally
// in order to provide user well-structured
EmptyProjectionException.
ClusterGroup prj =
ctx.grid().cluster().forPredicate(F.nodeForNodeId(req.destinationId()));
- ctx.task().setThreadContext(TC_NO_FAILOVER, true);
-
taskFut = ctx.closure().callAsync(
BALANCE,
- new ExeCallable(name, params, timeout), prj.nodes());
+ new ExeCallable(name, params, timeout),
+ options(prj.nodes()).withFailoverDisabled()
+ );
}
if (async) {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
index f9fa1cd9db1..882c46fb85a 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
@@ -47,7 +47,6 @@ import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.binary.BinaryArray;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
-import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.processors.metric.impl.HistogramMetricImpl;
import org.apache.ignite.internal.processors.platform.PlatformNativeException;
import org.apache.ignite.internal.processors.platform.services.PlatformService;
@@ -64,7 +63,8 @@ import org.apache.ignite.services.Service;
import org.apache.ignite.services.ServiceCallInterceptor;
import org.jetbrains.annotations.Nullable;
-import static
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_IO_POLICY;
+import static
org.apache.ignite.internal.managers.communication.GridIoPolicy.SERVICE_POOL;
+import static
org.apache.ignite.internal.processors.task.TaskExecutionOptions.options;
/**
* Wrapper for making {@link org.apache.ignite.services.Service} class proxies.
@@ -218,16 +218,16 @@ public class GridServiceProxy<T> implements Serializable {
}
}
else {
- ctx.task().setThreadContext(TC_IO_POLICY,
GridIoPolicy.SERVICE_POOL);
-
// Execute service remotely.
- return
unmarshalResult(ctx.closure().callAsyncNoFailover(
- GridClosureCallMode.BROADCAST,
- new ServiceProxyCallable(methodName(mtd), name,
mtd.getParameterTypes(), args, callAttrs),
- Collections.singleton(node),
- false,
- waitTimeout,
- true).get());
+ return unmarshalResult(ctx.closure().callAsync(
+ GridClosureCallMode.BROADCAST,
+ new ServiceProxyCallable(methodName(mtd),
name, mtd.getParameterTypes(), args, callAttrs),
+ options(Collections.singleton(node))
+ .withPool(SERVICE_POOL)
+ .withFailoverDisabled()
+ .withTimeout(waitTimeout)
+ .withAuthenticationDisabled()
+ ).get());
}
}
catch (InvocationTargetException e) {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
index 6d4cd232bb4..46425bf4a32 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.task;
import java.util.Collection;
import java.util.Collections;
-import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -106,11 +105,7 @@ import static
org.apache.ignite.internal.managers.communication.GridIoPolicy.SYS
import static
org.apache.ignite.internal.processors.cache.GridCacheUtils.isPersistenceEnabled;
import static
org.apache.ignite.internal.processors.metric.GridMetricManager.SYS_METRICS;
import static
org.apache.ignite.internal.processors.security.SecurityUtils.securitySubjectId;
-import static
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SKIP_AUTH;
-import static
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBGRID;
-import static
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBGRID_PREDICATE;
-import static
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_TASK_NAME;
-import static
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_TIMEOUT;
+import static
org.apache.ignite.internal.processors.task.TaskExecutionOptions.options;
/**
* This class defines task processor.
@@ -128,10 +123,6 @@ public class GridTaskProcessor extends
GridProcessorAdapter implements IgniteCha
/** Wait for 5 seconds to allow discovery to take effect (best effort). */
private static final long DISCO_TIMEOUT = 5000;
- /** */
- private static final Map<GridTaskThreadContextKey, Object> EMPTY_ENUM_MAP =
- new EnumMap<>(GridTaskThreadContextKey.class);
-
/** */
private final Marshaller marsh;
@@ -150,9 +141,6 @@ public class GridTaskProcessor extends GridProcessorAdapter
implements IgniteCha
/** Total executed tasks metric. */
private final LongAdderMetric execTasks;
- /** */
- private final ThreadLocal<Map<GridTaskThreadContextKey, Object>> thCtx =
new ThreadLocal<>();
-
/** */
private final GridSpinReadWriteLock lock = new GridSpinReadWriteLock();
@@ -359,52 +347,6 @@ public class GridTaskProcessor extends
GridProcessorAdapter implements IgniteCha
log.debug("Stopped task processor.");
}
- /**
- * Sets the thread-local context value.
- *
- * @param key Key.
- * @param val Value.
- */
- public void setThreadContext(GridTaskThreadContextKey key, Object val) {
- assert key != null;
- assert val != null;
-
- Map<GridTaskThreadContextKey, Object> map = thCtx.get();
-
- // NOTE: access to 'map' is always single-threaded since it's held
- // in a thread local.
- if (map == null)
- thCtx.set(map = new EnumMap<>(GridTaskThreadContextKey.class));
-
- map.put(key, val);
- }
-
- /**
- * Sets the thread-local context value, if it is not null.
- *
- * @param key Key.
- * @param val Value.
- */
- public void setThreadContextIfNotNull(GridTaskThreadContextKey key,
@Nullable Object val) {
- if (val != null)
- setThreadContext(key, val);
- }
-
- /**
- * Gets thread-local context value for a given {@code key}.
- *
- * @param key Thread-local context key.
- * @return Thread-local context value associated with given {@code key} -
or {@code null}
- * if value with given {@code key} doesn't exist.
- */
- @Nullable public <T> T getThreadContext(GridTaskThreadContextKey key) {
- assert (key != null);
-
- Map<GridTaskThreadContextKey, Object> map = thCtx.get();
-
- return map == null ? null : (T)map.get(key);
- }
-
/**
* Gets currently used deployments.
*
@@ -446,19 +388,22 @@ public class GridTaskProcessor extends
GridProcessorAdapter implements IgniteCha
* @param <R> Task return value type.
*/
public <T, R> ComputeTaskInternalFuture<R> execute(Class<? extends
ComputeTask<T, R>> taskCls, @Nullable T arg) {
- return execute(taskCls, arg, null);
+ return execute(taskCls, arg, options());
}
/**
* @param taskCls Task class.
* @param arg Optional execution argument.
- * @param execName Name of the custom executor.
+ * @param opts Task execution options.
* @return Task future.
* @param <T> Task argument type.
* @param <R> Task return value type.
*/
- public <T, R> ComputeTaskInternalFuture<R> execute(Class<? extends
ComputeTask<T, R>> taskCls, @Nullable T arg,
- @Nullable String execName) {
+ public <T, R> ComputeTaskInternalFuture<R> execute(
+ Class<? extends ComputeTask<T, R>> taskCls,
+ @Nullable T arg,
+ TaskExecutionOptions opts
+ ) {
assert taskCls != null;
lock.readLock();
@@ -467,8 +412,7 @@ public class GridTaskProcessor extends GridProcessorAdapter
implements IgniteCha
if (stopping)
throw new IllegalStateException("Failed to execute task due to
grid shutdown: " + taskCls);
- return startTask(null, taskCls, null,
IgniteUuid.fromUuid(ctx.localNodeId()), arg,
- false, execName);
+ return startTask(null, taskCls, null,
IgniteUuid.fromUuid(ctx.localNodeId()), arg, opts);
}
finally {
lock.readUnlock();
@@ -483,52 +427,25 @@ public class GridTaskProcessor extends
GridProcessorAdapter implements IgniteCha
* @param <R> Task return value type.
*/
public <T, R> ComputeTaskInternalFuture<R> execute(ComputeTask<T, R> task,
@Nullable T arg) {
- return execute(task, arg, false, null);
- }
-
- /**
- * @param task Actual task.
- * @param arg Optional task argument.
- * @param execName Name of the custom executor.
- * @return Task future.
- * @param <T> Task argument type.
- * @param <R> Task return value type.
- */
- public <T, R> ComputeTaskInternalFuture<R> execute(ComputeTask<T, R> task,
@Nullable T arg, String execName) {
- return execute(task, arg, false, execName);
+ return execute(task, arg, options());
}
/**
* @param task Actual task.
* @param arg Optional task argument.
- * @param sys If {@code true}, then system pool will be used.
+ * @param opts Task execution options.
* @return Task future.
* @param <T> Task argument type.
* @param <R> Task return value type.
*/
- public <T, R> ComputeTaskInternalFuture<R> execute(ComputeTask<T, R> task,
@Nullable T arg, boolean sys) {
- return execute(task, arg, sys, null);
- }
-
- /**
- * @param task Actual task.
- * @param arg Optional task argument.
- * @param sys If {@code true}, then system pool will be used.
- * @param execName Name of the custom executor.
- * @return Task future.
- * @param <T> Task argument type.
- * @param <R> Task return value type.
- */
- public <T, R> ComputeTaskInternalFuture<R> execute(ComputeTask<T, R> task,
@Nullable T arg, boolean sys,
- @Nullable String execName) {
+ public <T, R> ComputeTaskInternalFuture<R> execute(ComputeTask<T, R> task,
@Nullable T arg, TaskExecutionOptions opts) {
lock.readLock();
try {
if (stopping)
throw new IllegalStateException("Failed to execute task due to
grid shutdown: " + task);
- return startTask(null, null, task,
IgniteUuid.fromUuid(ctx.localNodeId()), arg,
- sys, execName);
+ return startTask(null, null, task,
IgniteUuid.fromUuid(ctx.localNodeId()), arg, opts);
}
finally {
lock.readUnlock();
@@ -567,18 +484,18 @@ public class GridTaskProcessor extends
GridProcessorAdapter implements IgniteCha
* @param <R> Task return value type.
*/
public <T, R> ComputeTaskInternalFuture<R> execute(String taskName,
@Nullable T arg) {
- return execute(taskName, arg, null);
+ return execute(taskName, arg, options());
}
/**
* @param taskName Task name.
* @param arg Optional execution argument.
- * @param execName Name of the custom executor.
+ * @param opts Task execution options.
* @return Task future.
* @param <T> Task argument type.
* @param <R> Task return value type.
*/
- public <T, R> ComputeTaskInternalFuture<R> execute(String taskName,
@Nullable T arg, @Nullable String execName) {
+ public <T, R> ComputeTaskInternalFuture<R> execute(String taskName,
@Nullable T arg, @Nullable TaskExecutionOptions opts) {
assert taskName != null;
lock.readLock();
@@ -587,8 +504,7 @@ public class GridTaskProcessor extends GridProcessorAdapter
implements IgniteCha
if (stopping)
throw new IllegalStateException("Failed to execute task due to
grid shutdown: " + taskName);
- return startTask(taskName, null, null,
IgniteUuid.fromUuid(ctx.localNodeId()), arg,
- false, execName);
+ return startTask(taskName, null, null,
IgniteUuid.fromUuid(ctx.localNodeId()), arg, opts);
}
finally {
lock.readUnlock();
@@ -599,10 +515,8 @@ public class GridTaskProcessor extends
GridProcessorAdapter implements IgniteCha
* @param taskName Task name.
* @param taskCls Task class.
* @param task Task.
- * @param sesId Task session ID.
* @param arg Optional task argument.
- * @param sys If {@code true}, then system pool will be used.
- * @param execName Name of the custom executor.
+ * @param opts Task execution options.
* @return Task future.
*/
private <T, R> ComputeTaskInternalFuture<R> startTask(
@@ -611,8 +525,8 @@ public class GridTaskProcessor extends GridProcessorAdapter
implements IgniteCha
@Nullable ComputeTask<T, R> task,
IgniteUuid sesId,
@Nullable T arg,
- boolean sys,
- @Nullable String execName) {
+ @Nullable TaskExecutionOptions opts
+ ) {
assert sesId != null;
String taskClsName;
@@ -626,21 +540,12 @@ public class GridTaskProcessor extends
GridProcessorAdapter implements IgniteCha
else
taskClsName = taskCls != null ? taskCls.getName() : taskName;
- // Get values from thread-local context.
- Map<GridTaskThreadContextKey, Object> map = thCtx.get();
-
- if (map == null)
- map = EMPTY_ENUM_MAP;
- else
- // Reset thread-local context.
- thCtx.set(null);
-
- if (map.get(TC_SKIP_AUTH) == null)
+ if (!opts.isAuthenticationDisabled())
ctx.security().authorize(taskClsName,
SecurityPermission.TASK_EXECUTE);
- Long timeout = (Long)map.get(TC_TIMEOUT);
+ assert opts.timeout() >= 0;
- long timeout0 = timeout == null || timeout == 0 ? Long.MAX_VALUE :
timeout;
+ long timeout0 = opts.timeout() == 0 ? Long.MAX_VALUE : opts.timeout();
long startTime = U.currentTimeMillis();
@@ -694,7 +599,7 @@ public class GridTaskProcessor extends GridProcessorAdapter
implements IgniteCha
throw new IgniteDeploymentCheckedException("Failed to
auto-deploy task " +
"(was task (re|un)deployed?): " + taskCls);
- taskName = taskName(dep, taskCls, map);
+ taskName = taskName(dep, taskCls, opts);
}
catch (IgniteCheckedException e) {
taskName = taskCls.getName();
@@ -734,7 +639,7 @@ public class GridTaskProcessor extends GridProcessorAdapter
implements IgniteCha
throw new IgniteDeploymentCheckedException("Failed to
auto-deploy task " +
"(was task (re|un)deployed?): " + cls);
- taskName = taskName(dep, taskCls, map);
+ taskName = taskName(dep, taskCls, opts);
}
catch (IgniteCheckedException e) {
taskName = task.getClass().getName();
@@ -753,10 +658,10 @@ public class GridTaskProcessor extends
GridProcessorAdapter implements IgniteCha
Collection<UUID> top = null;
- final IgnitePredicate<ClusterNode> topPred =
(IgnitePredicate<ClusterNode>)map.get(TC_SUBGRID_PREDICATE);
+ final IgnitePredicate<ClusterNode> topPred =
opts.projectionPredicate();
if (topPred == null) {
- final Collection<ClusterNode> nodes =
(Collection<ClusterNode>)map.get(TC_SUBGRID);
+ final Collection<ClusterNode> nodes = opts.projection();
top = nodes != null ? F.nodeIds(nodes) : null;
}
@@ -783,7 +688,7 @@ public class GridTaskProcessor extends GridProcessorAdapter
implements IgniteCha
emptyMap(),
fullSup,
internal,
- execName,
+ opts.executor(),
ctx.security().enabled() ?
ctx.security().securityContext().subject().login() : null
);
@@ -813,7 +718,7 @@ public class GridTaskProcessor extends GridProcessorAdapter
implements IgniteCha
task,
dep,
new TaskEventListener(),
- map,
+ opts,
securitySubjectId(ctx));
GridTaskWorker<?, ?> taskWorker0 = tasks.putIfAbsent(sesId,
taskWorker);
@@ -842,7 +747,7 @@ public class GridTaskProcessor extends GridProcessorAdapter
implements IgniteCha
if (dep.annotation(taskCls, ComputeTaskMapAsync.class) !=
null) {
try {
// Start task execution in another thread.
- if (sys)
+ if (opts.isSystemTask())
ctx.pools().getSystemExecutorService().execute(taskWorker);
else
ctx.pools().getExecutorService().execute(taskWorker);
@@ -906,15 +811,14 @@ public class GridTaskProcessor extends
GridProcessorAdapter implements IgniteCha
*
* @param dep Deployment.
* @param cls Class.
- * @param map Thread context map.
+ * @param opts Task execution options.
* @return Task name.
* @throws IgniteCheckedException If {@link @ComputeTaskName} annotation
is found, but has empty value.
*/
- private String taskName(GridDeployment dep, Class<?> cls,
- Map<GridTaskThreadContextKey, Object> map) throws
IgniteCheckedException {
+ private String taskName(GridDeployment dep, Class<?> cls,
TaskExecutionOptions opts) throws IgniteCheckedException {
assert dep != null;
assert cls != null;
- assert map != null;
+ assert opts != null;
String taskName;
@@ -928,7 +832,7 @@ public class GridTaskProcessor extends GridProcessorAdapter
implements IgniteCha
" cannot be empty for class: " + cls);
}
else
- taskName = map.containsKey(TC_TASK_NAME) ?
(String)map.get(TC_TASK_NAME) : cls.getName();
+ taskName = opts.name().orElse(cls.getName());
return taskName;
}
@@ -1015,7 +919,7 @@ public class GridTaskProcessor extends
GridProcessorAdapter implements IgniteCha
* @param ses Task session.
* @throws IgniteCheckedException If send to any of the jobs failed.
*/
- @SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter",
"BusyWait"})
+ @SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter"})
private void sendSessionAttributes(Map<?, ?> attrs, GridTaskSessionImpl
ses)
throws IgniteCheckedException {
assert attrs != null;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskThreadContextKey.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskThreadContextKey.java
deleted file mode 100644
index 59249eca9c9..00000000000
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskThreadContextKey.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.task;
-
-/**
- * Defines keys for thread-local context in task processor.
- */
-public enum GridTaskThreadContextKey {
- /** Task name. */
- TC_TASK_NAME,
-
- /** No failover flag. */
- TC_NO_FAILOVER,
-
- /** No result cache flag. */
- TC_NO_RESULT_CACHE,
-
- /** Projection for the task. */
- TC_SUBGRID,
-
- /** Projection predicate for the task. */
- TC_SUBGRID_PREDICATE,
-
- /** Timeout in milliseconds associated with the task. */
- TC_TIMEOUT,
-
- /** IO manager policy. */
- TC_IO_POLICY,
-
- /** Skip authorization for the task. */
- TC_SKIP_AUTH
-}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index 8b8edcf3c50..fff823d2d37 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -107,9 +107,6 @@ import static
org.apache.ignite.internal.managers.communication.GridIoPolicy.PUB
import static
org.apache.ignite.internal.processors.job.ComputeJobStatusEnum.CANCELLED;
import static
org.apache.ignite.internal.processors.job.ComputeJobStatusEnum.FAILED;
import static
org.apache.ignite.internal.processors.job.ComputeJobStatusEnum.FINISHED;
-import static
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_IO_POLICY;
-import static
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_NO_FAILOVER;
-import static
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_NO_RESULT_CACHE;
/**
* Grid task worker. Handles full task life cycle.
@@ -177,8 +174,8 @@ public class GridTaskWorker<T, R> extends GridWorker
implements GridTimeoutObjec
/** Task class. */
private final Class<?> taskCls;
- /** Optional subgrid. */
- private final Map<GridTaskThreadContextKey, Object> thCtx;
+ /** Task execution options. */
+ private final TaskExecutionOptions opts;
/** */
private ComputeTask<T, R> task;
@@ -285,7 +282,7 @@ public class GridTaskWorker<T, R> extends GridWorker
implements GridTimeoutObjec
* @param task Task instance that might be null.
* @param dep Deployed task.
* @param evtLsnr Event listener.
- * @param thCtx Thread-local context from task processor.
+ * @param opts Task execution options.
* @param subjId Subject ID.
*/
GridTaskWorker(
@@ -297,7 +294,7 @@ public class GridTaskWorker<T, R> extends GridWorker
implements GridTimeoutObjec
@Nullable ComputeTask<T, R> task,
GridDeployment dep,
GridTaskEventListener evtLsnr,
- @Nullable Map<GridTaskThreadContextKey, Object> thCtx,
+ TaskExecutionOptions opts,
UUID subjId) {
super(ctx.config().getIgniteInstanceName(), "grid-task-worker",
ctx.log(GridTaskWorker.class));
@@ -314,7 +311,7 @@ public class GridTaskWorker<T, R> extends GridWorker
implements GridTimeoutObjec
this.task = task;
this.dep = dep;
this.evtLsnr = evtLsnr;
- this.thCtx = thCtx;
+ this.opts = opts;
this.subjId = subjId;
log = U.logger(ctx, logRef, this);
@@ -323,13 +320,9 @@ public class GridTaskWorker<T, R> extends GridWorker
implements GridTimeoutObjec
boolean noResCacheAnnotation = dep.annotation(taskCls,
ComputeTaskNoResultCache.class) != null;
- Boolean noResCacheCtxFlag = getThreadContext(TC_NO_RESULT_CACHE);
+ resCache = !(noResCacheAnnotation || opts.isResultCacheDisabled());
- resCache = !(noResCacheAnnotation || (noResCacheCtxFlag != null &&
noResCacheCtxFlag));
-
- Boolean noFailover = getThreadContext(TC_NO_FAILOVER);
-
- this.noFailover = noFailover != null ? noFailover : false;
+ this.noFailover = opts.isFailoverDisabled();
if (task instanceof AffinityTask) {
AffinityTask affTask = (AffinityTask)task;
@@ -356,16 +349,6 @@ public class GridTaskWorker<T, R> extends GridWorker
implements GridTimeoutObjec
}
}
- /**
- * Gets value from thread-local context.
- *
- * @param key Thread-local context key.
- * @return Thread-local context value, if any.
- */
- @Nullable private <V> V getThreadContext(GridTaskThreadContextKey key) {
- return thCtx == null ? null : (V)thCtx.get(key);
- }
-
/**
* @return Task session ID.
*/
@@ -1448,14 +1431,8 @@ public class GridTaskWorker<T, R> extends GridWorker
implements GridTimeoutObjec
if (internal)
plc = MANAGEMENT_POOL;
- else {
- Byte ctxPlc = getThreadContext(TC_IO_POLICY);
-
- if (ctxPlc != null)
- plc = ctxPlc;
- else
- plc = PUBLIC_POOL;
- }
+ else
+ plc = opts.pool().orElse(PUBLIC_POOL);
// Send job execution request.
ctx.io().sendToGridTopic(node, TOPIC_JOB, req, plc);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/TaskExecutionOptions.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/TaskExecutionOptions.java
new file mode 100644
index 00000000000..f30630d4423
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/TaskExecutionOptions.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.task;
+
+import java.util.Collection;
+import java.util.Optional;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.lang.IgnitePredicate;
+
+/** */
+public class TaskExecutionOptions {
+ /** */
+ private String name;
+
+ /** */
+ private long timeout;
+
+ /** */
+ private String execName;
+
+ /** */
+ private Byte pool;
+
+ /** */
+ private Collection<ClusterNode> projection;
+
+ /** */
+ private IgnitePredicate<ClusterNode> projectionPredicate;
+
+ /** */
+ private boolean isFailoverDisabled;
+
+ /** */
+ private boolean isResultCacheDisabled;
+
+ /** */
+ private boolean isSysTask;
+
+ /** */
+ private boolean isAuthDisabled;
+
+ /** */
+ private TaskExecutionOptions() {}
+
+ /** */
+ public static TaskExecutionOptions options() {
+ return new TaskExecutionOptions();
+ }
+
+ /** */
+ public static TaskExecutionOptions options(Collection<ClusterNode>
projection) {
+ return new TaskExecutionOptions().withProjection(projection);
+ }
+
+ /** */
+ public long timeout() {
+ return timeout;
+ }
+
+ /** */
+ public TaskExecutionOptions withTimeout(long timeout) {
+ this.timeout = timeout;
+
+ return this;
+ }
+
+ /** */
+ public Optional<String> name() {
+ return Optional.ofNullable(name);
+ }
+
+ /** */
+ public TaskExecutionOptions withName(String name) {
+ this.name = name;
+
+ return this;
+ }
+
+ /** */
+ public Collection<ClusterNode> projection() {
+ return projection;
+ }
+
+ /** */
+ public TaskExecutionOptions withProjection(Collection<ClusterNode>
projection) {
+ this.projection = projection;
+
+ return this;
+ }
+
+ /** */
+ public IgnitePredicate<ClusterNode> projectionPredicate() {
+ return projectionPredicate;
+ }
+
+ /** */
+ public TaskExecutionOptions
withProjectionPredicate(IgnitePredicate<ClusterNode> projectionPredicate) {
+ this.projectionPredicate = projectionPredicate;
+
+ return this;
+ }
+
+ /** */
+ public String executor() {
+ return execName;
+ }
+
+ /** */
+ public TaskExecutionOptions withExecutor(String execName) {
+ this.execName = execName;
+
+ return this;
+ }
+
+ /** */
+ public Optional<Byte> pool() {
+ return Optional.ofNullable(pool);
+ }
+
+ /** */
+ public TaskExecutionOptions withPool(byte pool) {
+ this.pool = pool;
+
+ return this;
+ }
+
+ /** */
+ public boolean isFailoverDisabled() {
+ return isFailoverDisabled;
+ }
+
+ /** */
+ public TaskExecutionOptions withFailoverDisabled() {
+ isFailoverDisabled = true;
+
+ return this;
+ }
+
+ /** */
+ public boolean isResultCacheDisabled() {
+ return isResultCacheDisabled;
+ }
+
+ /** */
+ public TaskExecutionOptions withResultCacheDisabled() {
+ isResultCacheDisabled = true;
+
+ return this;
+ }
+
+ /** */
+ public boolean isSystemTask() {
+ return isSysTask;
+ }
+
+ /** */
+ public TaskExecutionOptions asSystemTask() {
+ isSysTask = true;
+
+ return this;
+ }
+
+ /** */
+ public boolean isAuthenticationDisabled() {
+ return isAuthDisabled;
+ }
+
+ /** */
+ public TaskExecutionOptions withAuthenticationDisabled() {
+ isAuthDisabled = true;
+
+ return this;
+ }
+}
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties
b/modules/core/src/main/resources/META-INF/classnames.properties
index 2d74bc0a629..be011d4bf45 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -1791,10 +1791,10 @@
org.apache.ignite.internal.processors.service.ServiceSingleNodeDeploymentResult
org.apache.ignite.internal.processors.service.ServiceSingleNodeDeploymentResultBatch
org.apache.ignite.internal.processors.service.ServiceUndeploymentRequest
org.apache.ignite.internal.processors.task.GridTaskProcessor$1
-org.apache.ignite.internal.processors.task.GridTaskThreadContextKey
org.apache.ignite.internal.processors.task.GridTaskWorker$2
org.apache.ignite.internal.processors.task.GridTaskWorker$4
org.apache.ignite.internal.processors.task.GridTaskWorker$State
+org.apache.ignite.internal.processors.task.TaskExecutionOptions
org.apache.ignite.internal.processors.task.monitor.ComputeTaskStatusEnum
org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor$2
org.apache.ignite.internal.processors.tracing.SpanType
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/GridContinuousTaskSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/GridContinuousTaskSelfTest.java
index efe8baf1d6a..a1d5a5bc667 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/GridContinuousTaskSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/GridContinuousTaskSelfTest.java
@@ -58,6 +58,9 @@ import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
+import static org.apache.ignite.internal.GridClosureCallMode.BALANCE;
+import static
org.apache.ignite.internal.processors.task.TaskExecutionOptions.options;
+
/**
* Continuous task test.
*/
@@ -203,15 +206,18 @@ public class GridContinuousTaskSelfTest extends
GridCommonAbstractTest {
try {
IgniteEx ign = startGrid(0);
- ComputeTaskInternalFuture<String> fut =
ign.context().closure().callAsync(GridClosureCallMode.BALANCE, new
Callable<String>() {
- /** */
- @IgniteInstanceResource
- private IgniteEx g;
+ IgniteInternalFuture<String> fut =
ign.context().closure().callAsync(
+ BALANCE,
+ new Callable<String>() {
+ @IgniteInstanceResource
+ private IgniteEx g;
- @Override public String call() throws Exception {
- return
g.compute(g.cluster()).execute(NestedHoldccTask.class, null);
- }
- }, ign.cluster().nodes());
+ @Override public String call() throws Exception {
+ return
g.compute(g.cluster()).execute(NestedHoldccTask.class, null);
+ }
+ },
+ options(ign.cluster().nodes())
+ );
assertEquals("DONE", fut.get(3000));
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeTopologyExceptionTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeTopologyExceptionTest.java
index 0b9b7802b60..28c378d2aac 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeTopologyExceptionTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeTopologyExceptionTest.java
@@ -29,6 +29,7 @@ import
org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
import static org.apache.ignite.internal.GridClosureCallMode.BALANCE;
+import static
org.apache.ignite.internal.processors.task.TaskExecutionOptions.options;
/**
*
@@ -82,7 +83,8 @@ public class IgniteComputeTopologyExceptionTest extends
GridCommonAbstractTest {
stopGrid(1);
- IgniteInternalFuture<?> fut =
ignite0.context().closure().callAsyncNoFailover(BALANCE,
+ IgniteInternalFuture<?> fut = ignite0.context().closure().callAsync(
+ BALANCE,
new IgniteCallable<Object>() {
@Override public Object call() throws Exception {
fail("Should not be called.");
@@ -90,9 +92,8 @@ public class IgniteComputeTopologyExceptionTest extends
GridCommonAbstractTest {
return null;
}
},
- nodes,
- false,
- 0, false);
+ options(nodes).withFailoverDisabled()
+ );
try {
fut.get();
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/TaskOptionsPropagationTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/TaskOptionsPropagationTest.java
new file mode 100644
index 00000000000..5e2fc4dbdb6
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/TaskOptionsPropagationTest.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.compute;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeTask;
+import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.apache.ignite.compute.ComputeTaskSession;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.util.lang.RunnableX;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteReducer;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.resources.TaskSessionResource;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+
+/** */
+public class TaskOptionsPropagationTest extends GridCommonAbstractTest {
+ /** */
+ private static final String TEST_TASK_NAME = "test-name";
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGrid();
+
+ grid().createCache(DEFAULT_CACHE_NAME).put(0, 0);
+ }
+
+ /** */
+ @Test
+ public void testUserTaskOptionsWithPrecedingSystemTaskExecution() throws
Exception {
+ try (IgniteEx cli = startClientGrid(1)) {
+ cli.compute().withName(TEST_TASK_NAME).affinityCall(
+ DEFAULT_CACHE_NAME,
+ 0,
+ new TestCallable(TEST_TASK_NAME)
+ );
+ }
+ }
+
+ /** */
+ @Test
+ public void testComputeSharedAcrossMultipleThreads() throws Exception {
+ IgniteCompute compute = grid().compute();
+
+ compute.withName(TEST_TASK_NAME);
+
+ runAsync(() -> compute.call(new
TestCallable(TestCallable.class.getName()))).get();
+
+ compute.call(new TestCallable(TEST_TASK_NAME));
+ }
+
+ /** */
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testTaskExecutionOptionsReset() throws Exception {
+ check(ComputeTask.class, (c, t) -> c.execute(t, null));
+ check(ComputeTask.class, (c, t) -> c.executeAsync(t, null).get());
+
+ check(IgniteCallable.class, IgniteCompute::call);
+ check(IgniteCallable.class, (c, t) -> c.callAsync(t).get());
+
+ check(IgniteCallable.class, (c, t) ->
c.call((toList((IgniteCallable<Void>)t))));
+ check(IgniteCallable.class, (c, t) ->
c.callAsync(toList((IgniteCallable<Void>)t)).get());
+
+ check(IgniteCallable.class, (c, t) ->
c.call(toList((IgniteCallable<Void>)t), new TestReducer()));
+ check(IgniteCallable.class, (c, t) ->
c.callAsync(toList((IgniteCallable<Void>)t), new TestReducer()).get());
+
+ check(IgniteRunnable.class, IgniteCompute::run);
+ check(IgniteRunnable.class, (c, t) -> c.runAsync(t).get());
+
+ check(IgniteRunnable.class, (c, t) -> c.run(toList(t)));
+ check(IgniteRunnable.class, (c, t) -> c.runAsync(toList(t)).get());
+
+ check(IgniteRunnable.class, IgniteCompute::broadcast);
+ check(IgniteRunnable.class, (c, t) -> c.broadcastAsync(t).get());
+
+ check(IgniteCallable.class, IgniteCompute::broadcast);
+ check(IgniteCallable.class, (c, t) -> c.broadcastAsync(t).get());
+
+ check(IgniteClosure.class, ((c, t) -> c.broadcast(t, null)));
+ check(IgniteClosure.class, (c, t) -> c.broadcastAsync(t, null).get());
+
+ check(IgniteClosure.class, (c, t) -> c.apply(t, (Void)null));
+ check(IgniteClosure.class, (c, t) -> c.applyAsync(t,
(Void)null).get());
+
+ check(IgniteClosure.class, (c, t) -> c.apply(t, singletonList(null),
new TestReducer()));
+ check(IgniteClosure.class, (c, t) -> c.applyAsync(t,
singletonList(null), new TestReducer()).get());
+
+ check(IgniteRunnable.class, (c, t) ->
c.affinityRun(DEFAULT_CACHE_NAME, "key", t));
+ check(IgniteRunnable.class, (c, t) ->
c.affinityRunAsync(DEFAULT_CACHE_NAME, "key", t).get());
+
+ check(IgniteRunnable.class, (c, t) ->
c.affinityRun(singletonList(DEFAULT_CACHE_NAME), "key", t));
+ check(IgniteRunnable.class, (c, t) ->
c.affinityRunAsync(singletonList(DEFAULT_CACHE_NAME), "key", t).get());
+
+ check(IgniteRunnable.class, (c, t) ->
c.affinityRun(singletonList(DEFAULT_CACHE_NAME), 0, t));
+ check(IgniteRunnable.class, (c, t) ->
c.affinityRunAsync(singletonList(DEFAULT_CACHE_NAME), 0, t).get());
+
+ check(IgniteCallable.class, (c, t) ->
c.affinityCall(DEFAULT_CACHE_NAME, "key", t));
+ check(IgniteCallable.class, (c, t) ->
c.affinityCallAsync(DEFAULT_CACHE_NAME, "key", t).get());
+
+ check(IgniteCallable.class, (c, t) ->
c.affinityCall(singletonList(DEFAULT_CACHE_NAME), "key", t));
+ check(IgniteCallable.class, (c, t) ->
c.affinityCallAsync(singletonList(DEFAULT_CACHE_NAME), "key", t).get());
+
+ check(IgniteCallable.class, (c, t) ->
c.affinityCall(singletonList(DEFAULT_CACHE_NAME), 0, t));
+ check(IgniteCallable.class, (c, t) ->
c.affinityCallAsync(singletonList(DEFAULT_CACHE_NAME), 0, t).get());
+ }
+
+ /** */
+ public <T> void check(Class<T> taskCls, ComputationConsumer<T> consumer)
throws Exception {
+ consumer.accept(grid().compute().withName(TEST_TASK_NAME),
getComputationObject(taskCls, TEST_TASK_NAME));
+ consumer.accept(grid().compute(), getComputationObject(taskCls, null));
+
+ assertThrows(() ->
consumer.accept(grid().compute().withName(TEST_TASK_NAME), null));
+ consumer.accept(grid().compute(), getComputationObject(taskCls, null));
+ }
+
+ /** */
+ private static class TestCallable extends TaskNameChecker implements
IgniteCallable<Void> {
+ /** */
+ public TestCallable(String expName) {
+ super(expName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void call() throws Exception {
+ checkName();
+
+ return null;
+ }
+ }
+
+ /** */
+ private static class TestRunnable extends TaskNameChecker implements
IgniteRunnable {
+ /** */
+ public TestRunnable(String expName) {
+ super(expName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ checkName();
+ }
+ }
+
+ /** */
+ private static class TestClosure extends TaskNameChecker implements
IgniteClosure<Void, Void> {
+ /** */
+ public TestClosure(String expName) {
+ super(expName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void apply(Void arg) {
+ checkName();
+
+ return null;
+ }
+ }
+
+ /** */
+ private static class TestTask extends ComputeTaskAdapter<Void, Void> {
+ /** */
+ private final String name;
+
+ /** */
+ public TestTask(String name) {
+ this.name = name == null ? getClass().getName() : name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull Map<? extends ComputeJob, ClusterNode> map(
+ List<ClusterNode> subgrid,
+ @Nullable Void arg
+ ) throws IgniteException {
+ return singletonMap(new TestJob(name), subgrid.iterator().next());
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Void reduce(List list) throws
IgniteException {
+ return null;
+ }
+
+ /** */
+ private static class TestJob extends TaskNameChecker implements
ComputeJob {
+ /** */
+ public TestJob(String expName) {
+ super(expName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object execute() throws IgniteException {
+ checkName();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cancel() {
+ // No-op.
+ }
+ }
+ }
+
+ /** */
+ private static class TaskNameChecker {
+ /** */
+ @TaskSessionResource
+ private ComputeTaskSession ses;
+
+ /** */
+ private final String expName;
+
+ /** */
+ public TaskNameChecker(String expName) {
+ this.expName = expName == null ? getClass().getName() : expName;
+ }
+
+ /** */
+ protected void checkName() {
+ assertEquals(expName, ses.getTaskName());
+ }
+ }
+
+ /** */
+ private void assertThrows(RunnableX r) {
+ GridTestUtils.assertThrowsWithCause(r, Exception.class);
+ }
+
+ /** */
+ private static class TestReducer implements IgniteReducer<Void, Void> {
+ /** {@inheritDoc} */
+ @Override public boolean collect(@Nullable Void o) {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void reduce() {
+ return null;
+ }
+ }
+
+ /** */
+ private static <T> Collection<T> toList(T t) {
+ return t == null ? null : singletonList(t);
+ }
+
+ /** */
+ private interface ComputationConsumer<T> {
+ /** */
+ void accept(IgniteCompute c, T t) throws Exception;
+ }
+
+ /** */
+ private <T> T getComputationObject(Class<T> taskCls, String name) {
+ if (ComputeTask.class.equals(taskCls))
+ return (T)new TestTask(name);
+ else if (IgniteClosure.class.equals(taskCls))
+ return (T)new TestClosure(name);
+ else if (IgniteCallable.class.equals(taskCls))
+ return (T)new TestCallable(name);
+ else if (IgniteRunnable.class.equals(taskCls))
+ return (T)new TestRunnable(name);
+ else
+ throw new IllegalStateException();
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
index f34166a6a09..922325aff53 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
@@ -87,6 +87,7 @@ import
org.apache.ignite.internal.processors.compute.IgniteComputeCustomExecutor
import
org.apache.ignite.internal.processors.compute.IgniteComputeCustomExecutorSelfTest;
import org.apache.ignite.internal.processors.compute.InterruptComputeJobTest;
import
org.apache.ignite.internal.processors.compute.PublicThreadpoolStarvationTest;
+import
org.apache.ignite.internal.processors.compute.TaskOptionsPropagationTest;
import org.apache.ignite.internal.util.StripedExecutorTest;
import org.apache.ignite.p2p.GridMultinodeRedeployContinuousModeSelfTest;
import org.apache.ignite.p2p.GridMultinodeRedeployIsolatedModeSelfTest;
@@ -183,7 +184,8 @@ import org.junit.runners.Suite;
ComputeJobChangePriorityTest.class,
ComputeJobStatusTest.class,
ComputeTaskWithWithoutFullSupportTest.class,
- InterruptComputeJobTest.class
+ InterruptComputeJobTest.class,
+ TaskOptionsPropagationTest.class
})
public class IgniteComputeGridTestSuite {
}