This is an automated email from the ASF dual-hosted git repository.

mpetrov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 2abc2470c2c IGNITE-18545 Fixed task execution options propagation. 
(#10482)
2abc2470c2c is described below

commit 2abc2470c2c0d5226aad5694fb664cd4d45eecfd
Author: Mikhail Petrov <[email protected]>
AuthorDate: Sun Jan 22 22:00:11 2023 +0300

    IGNITE-18545 Fixed task execution options propagation. (#10482)
---
 .../main/java/org/apache/ignite/IgniteCompute.java |   4 +-
 .../apache/ignite/internal/IgniteComputeImpl.java  | 254 +++++++-------
 .../internal/executor/GridExecutorService.java     |  32 +-
 .../processors/affinity/GridAffinityProcessor.java |  10 +-
 .../processors/cache/GridCacheAdapter.java         |  61 ++--
 .../datastructures/CacheDataStructuresManager.java |  21 +-
 .../distributed/GridDistributedCacheAdapter.java   |  16 +-
 .../snapshot/IgniteSnapshotManager.java            |  64 ++--
 .../cache/query/GridCacheQueryManager.java         |  18 +-
 .../processors/closure/GridClosureProcessor.java   | 367 +++++----------------
 .../platform/client/compute/ClientComputeTask.java |  21 +-
 .../platform/compute/PlatformCompute.java          |   4 -
 .../handlers/cache/GridCacheCommandHandler.java    |  18 +-
 .../rest/handlers/task/GridTaskCommandHandler.java |  13 +-
 .../processors/service/GridServiceProxy.java       |  22 +-
 .../processors/task/GridTaskProcessor.java         | 166 ++--------
 .../processors/task/GridTaskThreadContextKey.java  |  47 ---
 .../internal/processors/task/GridTaskWorker.java   |  41 +--
 .../processors/task/TaskExecutionOptions.java      | 189 +++++++++++
 .../main/resources/META-INF/classnames.properties  |   2 +-
 .../internal/GridContinuousTaskSelfTest.java       |  22 +-
 .../IgniteComputeTopologyExceptionTest.java        |   9 +-
 .../compute/TaskOptionsPropagationTest.java        | 299 +++++++++++++++++
 .../testsuites/IgniteComputeGridTestSuite.java     |   4 +-
 24 files changed, 939 insertions(+), 765 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java 
b/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java
index 6e9cbea852d..e6110c9d189 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java
@@ -724,9 +724,7 @@ public interface IgniteCompute extends IgniteAsyncSupport {
 
     /**
      * Sets task timeout for the next executed task in the <b>current 
thread</b>.
-     * When task starts execution, the timeout is reset, so one timeout is 
used only once. You may use
-     * this method to set task name when executing jobs directly, without 
explicitly
-     * defining {@link ComputeTask}.
+     * When task starts execution, the timeout is reset, so one timeout is 
used only once.
      * <p>
      * Here is an example.
      * <pre class="brush:java">
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java
index ca9f5344d01..f4a0d125f5a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java
@@ -25,7 +25,6 @@ import java.io.ObjectStreamException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
-import java.util.concurrent.Callable;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteCompute;
 import org.apache.ignite.IgniteDeploymentException;
@@ -35,6 +34,7 @@ import org.apache.ignite.compute.ComputeTask;
 import org.apache.ignite.compute.ComputeTaskFuture;
 import org.apache.ignite.internal.cluster.ClusterGroupAdapter;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
+import org.apache.ignite.internal.processors.task.TaskExecutionOptions;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -50,11 +50,6 @@ import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.internal.GridClosureCallMode.BALANCE;
 import static org.apache.ignite.internal.GridClosureCallMode.BROADCAST;
-import static 
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_NO_FAILOVER;
-import static 
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_NO_RESULT_CACHE;
-import static 
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBGRID_PREDICATE;
-import static 
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_TASK_NAME;
-import static 
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_TIMEOUT;
 
 /**
  * {@link IgniteCompute} implementation.
@@ -73,6 +68,11 @@ public class IgniteComputeImpl extends 
AsyncSupportAdapter<IgniteCompute>
     /** Custom executor name. */
     private String execName;
 
+    /** Default task execution options. */
+    private final ThreadLocal<TaskExecutionOptions> opts = 
ThreadLocal.withInitial(() ->
+        TaskExecutionOptions.options(prj.nodes()).withExecutor(execName)
+    );
+
     /**
      * Required by {@link Externalizable}.
      */
@@ -105,7 +105,6 @@ public class IgniteComputeImpl extends 
AsyncSupportAdapter<IgniteCompute>
      *
      * @param ctx Kernal context.
      * @param prj Projection.
-     * @param subjId Subject ID.
      * @param async Async support flag.
      * @param execName Custom executor name.
      */
@@ -130,8 +129,6 @@ public class IgniteComputeImpl extends 
AsyncSupportAdapter<IgniteCompute>
 
     /** {@inheritDoc} */
     @Override public void affinityRun(String cacheName, Object affKey, 
IgniteRunnable job) {
-        CU.validateCacheName(cacheName);
-
         try {
             saveOrGet(affinityRunAsync0(cacheName, affKey, job));
         }
@@ -143,7 +140,6 @@ public class IgniteComputeImpl extends 
AsyncSupportAdapter<IgniteCompute>
     /** {@inheritDoc} */
     @Override public IgniteFuture<Void> affinityRunAsync(String cacheName, 
Object affKey,
         IgniteRunnable job) throws IgniteException {
-        CU.validateCacheName(cacheName);
 
         return (IgniteFuture<Void>)createFuture(affinityRunAsync0(cacheName, 
affKey, job));
     }
@@ -157,12 +153,13 @@ public class IgniteComputeImpl extends 
AsyncSupportAdapter<IgniteCompute>
      * @return Internal future.
      */
     private IgniteInternalFuture<?> affinityRunAsync0(String cacheName, Object 
affKey, IgniteRunnable job) {
-        A.notNull(affKey, "affKey");
-        A.notNull(job, "job");
-
         guard();
 
         try {
+            A.notNull(affKey, "affKey");
+            A.notNull(job, "job");
+            CU.validateCacheName(cacheName);
+
             // In case cache key is passed instead of affinity key.
             final Object affKey0 = ctx.affinity().affinityKey(cacheName, 
affKey);
             int partId = ctx.affinity().partition(cacheName, affKey0);
@@ -171,20 +168,20 @@ public class IgniteComputeImpl extends 
AsyncSupportAdapter<IgniteCompute>
                 throw new IgniteCheckedException("Failed map key to partition: 
[cache=" + cacheName + " key="
                     + affKey + ']');
 
-            return 
ctx.closure().affinityRun(Collections.singletonList(cacheName), partId, job, 
prj.nodes(), execName);
+            return 
ctx.closure().affinityRun(Collections.singletonList(cacheName), partId, job, 
opts.get());
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
         }
         finally {
+            opts.remove();
+
             unguard();
         }
     }
 
     /** {@inheritDoc} */
     @Override public void affinityRun(@NotNull Collection<String> cacheNames, 
Object affKey, IgniteRunnable job) {
-        CU.validateCacheNames(cacheNames);
-
         try {
             saveOrGet(affinityRunAsync0(cacheNames, affKey, job));
         }
@@ -196,8 +193,6 @@ public class IgniteComputeImpl extends 
AsyncSupportAdapter<IgniteCompute>
     /** {@inheritDoc} */
     @Override public IgniteFuture<Void> affinityRunAsync(@NotNull 
Collection<String> cacheNames, Object affKey,
         IgniteRunnable job) throws IgniteException {
-        CU.validateCacheNames(cacheNames);
-
         return (IgniteFuture<Void>)createFuture(affinityRunAsync0(cacheNames, 
affKey, job));
     }
 
@@ -211,13 +206,14 @@ public class IgniteComputeImpl extends 
AsyncSupportAdapter<IgniteCompute>
      */
     private IgniteInternalFuture<?> affinityRunAsync0(@NotNull 
Collection<String> cacheNames, Object affKey,
         IgniteRunnable job) {
-        A.notNull(affKey, "affKey");
-        A.notNull(job, "job");
-        A.ensure(!cacheNames.isEmpty(), "cachesNames mustn't be empty");
-
         guard();
 
         try {
+            A.notNull(affKey, "affKey");
+            A.notNull(job, "job");
+            A.ensure(!cacheNames.isEmpty(), "cachesNames mustn't be empty");
+            CU.validateCacheNames(cacheNames);
+
             final String cacheName = F.first(cacheNames);
 
             // In case cache key is passed instead of affinity key.
@@ -228,20 +224,20 @@ public class IgniteComputeImpl extends 
AsyncSupportAdapter<IgniteCompute>
                 throw new IgniteCheckedException("Failed map key to partition: 
[cache=" + cacheName + " key="
                     + affKey + ']');
 
-            return ctx.closure().affinityRun(cacheNames, partId, job, 
prj.nodes(), execName);
+            return ctx.closure().affinityRun(cacheNames, partId, job, 
opts.get());
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
         }
         finally {
+            opts.remove();
+
             unguard();
         }
     }
 
     /** {@inheritDoc} */
     @Override public void affinityRun(@NotNull Collection<String> cacheNames, 
int partId, IgniteRunnable job) {
-        CU.validateCacheNames(cacheNames);
-
         try {
             saveOrGet(affinityRunAsync0(cacheNames, partId, job));
         }
@@ -253,8 +249,6 @@ public class IgniteComputeImpl extends 
AsyncSupportAdapter<IgniteCompute>
     /** {@inheritDoc} */
     @Override public IgniteFuture<Void> affinityRunAsync(@NotNull 
Collection<String> cacheNames, int partId,
         IgniteRunnable job) throws IgniteException {
-        CU.validateCacheNames(cacheNames);
-
         return (IgniteFuture<Void>)createFuture(affinityRunAsync0(cacheNames, 
partId, job));
     }
 
@@ -268,27 +262,28 @@ public class IgniteComputeImpl extends 
AsyncSupportAdapter<IgniteCompute>
      */
     private IgniteInternalFuture<?> affinityRunAsync0(@NotNull 
Collection<String> cacheNames, int partId,
         IgniteRunnable job) {
-        A.ensure(partId >= 0, "partId = " + partId);
-        A.notNull(job, "job");
-        A.ensure(!cacheNames.isEmpty(), "cachesNames mustn't be empty");
-
         guard();
 
         try {
-            return ctx.closure().affinityRun(cacheNames, partId, job, 
prj.nodes(), execName);
+            A.ensure(partId >= 0, "partId = " + partId);
+            A.notNull(job, "job");
+            A.ensure(!cacheNames.isEmpty(), "cachesNames mustn't be empty");
+            CU.validateCacheNames(cacheNames);
+
+            return ctx.closure().affinityRun(cacheNames, partId, job, 
opts.get());
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
         }
         finally {
+            opts.remove();
+
             unguard();
         }
     }
 
     /** {@inheritDoc} */
     @Override public <R> R affinityCall(String cacheName, Object affKey, 
IgniteCallable<R> job) {
-        CU.validateCacheName(cacheName);
-
         try {
             return saveOrGet(affinityCallAsync0(cacheName, affKey, job));
         }
@@ -300,8 +295,6 @@ public class IgniteComputeImpl extends 
AsyncSupportAdapter<IgniteCompute>
     /** {@inheritDoc} */
     @Override public <R> IgniteFuture<R> affinityCallAsync(String cacheName, 
Object affKey,
         IgniteCallable<R> job) throws IgniteException {
-        CU.validateCacheName(cacheName);
-
         return createFuture(affinityCallAsync0(cacheName, affKey, job));
     }
 
@@ -315,12 +308,13 @@ public class IgniteComputeImpl extends 
AsyncSupportAdapter<IgniteCompute>
      */
     private <R> IgniteInternalFuture<R> affinityCallAsync0(String cacheName, 
Object affKey,
         IgniteCallable<R> job) {
-        A.notNull(affKey, "affKey");
-        A.notNull(job, "job");
-
         guard();
 
         try {
+            A.notNull(affKey, "affKey");
+            A.notNull(job, "job");
+            CU.validateCacheName(cacheName);
+
             // In case cache key is passed instead of affinity key.
             final Object affKey0 = ctx.affinity().affinityKey(cacheName, 
affKey);
             int partId = ctx.affinity().partition(cacheName, affKey0);
@@ -329,20 +323,20 @@ public class IgniteComputeImpl extends 
AsyncSupportAdapter<IgniteCompute>
                 throw new IgniteCheckedException("Failed map key to partition: 
[cache=" + cacheName + " key="
                     + affKey + ']');
 
-            return 
ctx.closure().affinityCall(Collections.singletonList(cacheName), partId, job, 
prj.nodes(), execName);
+            return 
ctx.closure().affinityCall(Collections.singletonList(cacheName), partId, job, 
opts.get());
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
         }
         finally {
+            opts.remove();
+
             unguard();
         }
     }
 
     /** {@inheritDoc} */
     @Override public <R> R affinityCall(@NotNull Collection<String> 
cacheNames, Object affKey, IgniteCallable<R> job) {
-        CU.validateCacheNames(cacheNames);
-
         try {
             return saveOrGet(affinityCallAsync0(cacheNames, affKey, job));
         }
@@ -354,8 +348,6 @@ public class IgniteComputeImpl extends 
AsyncSupportAdapter<IgniteCompute>
     /** {@inheritDoc} */
     @Override public <R> IgniteFuture<R> affinityCallAsync(@NotNull 
Collection<String> cacheNames, Object affKey,
         IgniteCallable<R> job) throws IgniteException {
-        CU.validateCacheNames(cacheNames);
-
         return createFuture(affinityCallAsync0(cacheNames, affKey, job));
     }
 
@@ -369,13 +361,14 @@ public class IgniteComputeImpl extends 
AsyncSupportAdapter<IgniteCompute>
      */
     private <R> IgniteInternalFuture<R> affinityCallAsync0(@NotNull 
Collection<String> cacheNames, Object affKey,
         IgniteCallable<R> job) {
-        A.notNull(affKey, "affKey");
-        A.notNull(job, "job");
-        A.ensure(!cacheNames.isEmpty(), "cachesNames mustn't be empty");
-
         guard();
 
         try {
+            A.notNull(affKey, "affKey");
+            A.notNull(job, "job");
+            A.ensure(!cacheNames.isEmpty(), "cachesNames mustn't be empty");
+            CU.validateCacheNames(cacheNames);
+
             final String cacheName = F.first(cacheNames);
 
             // In case cache key is passed instead of affinity key.
@@ -386,20 +379,20 @@ public class IgniteComputeImpl extends 
AsyncSupportAdapter<IgniteCompute>
                 throw new IgniteCheckedException("Failed map key to partition: 
[cache=" + cacheName + " key="
                     + affKey + ']');
 
-            return ctx.closure().affinityCall(cacheNames, partId, job, 
prj.nodes(), execName);
+            return ctx.closure().affinityCall(cacheNames, partId, job, 
opts.get());
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
         }
         finally {
+            opts.remove();
+
             unguard();
         }
     }
 
     /** {@inheritDoc} */
     @Override public <R> R affinityCall(@NotNull Collection<String> 
cacheNames, int partId, IgniteCallable<R> job) {
-        CU.validateCacheNames(cacheNames);
-
         try {
             return saveOrGet(affinityCallAsync0(cacheNames, partId, job));
         }
@@ -411,8 +404,6 @@ public class IgniteComputeImpl extends 
AsyncSupportAdapter<IgniteCompute>
     /** {@inheritDoc} */
     @Override public <R> IgniteFuture<R> affinityCallAsync(@NotNull 
Collection<String> cacheNames, int partId,
         IgniteCallable<R> job) throws IgniteException {
-        CU.validateCacheNames(cacheNames);
-
         return createFuture(affinityCallAsync0(cacheNames, partId, job));
     }
 
@@ -426,19 +417,22 @@ public class IgniteComputeImpl extends 
AsyncSupportAdapter<IgniteCompute>
      */
     private <R> IgniteInternalFuture<R> affinityCallAsync0(@NotNull 
Collection<String> cacheNames, int partId,
         IgniteCallable<R> job) {
-        A.ensure(partId >= 0, "partId = " + partId);
-        A.notNull(job, "job");
-        A.ensure(!cacheNames.isEmpty(), "cachesNames mustn't be empty");
-
         guard();
 
         try {
-            return ctx.closure().affinityCall(cacheNames, partId, job, 
prj.nodes(), execName);
+            A.ensure(partId >= 0, "partId = " + partId);
+            A.notNull(job, "job");
+            A.ensure(!cacheNames.isEmpty(), "cachesNames mustn't be empty");
+            CU.validateCacheNames(cacheNames);
+
+            return ctx.closure().affinityCall(cacheNames, partId, job, 
opts.get());
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
         }
         finally {
+            opts.remove();
+
             unguard();
         }
     }
@@ -446,7 +440,7 @@ public class IgniteComputeImpl extends 
AsyncSupportAdapter<IgniteCompute>
     /** {@inheritDoc} */
     @Override public <T, R> R execute(String taskName, @Nullable T arg) {
         try {
-            return (R)saveOrGet(executeAsync0(taskName, arg));
+            return saveOrGet(executeAsync0(taskName, arg));
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -466,16 +460,16 @@ public class IgniteComputeImpl extends 
AsyncSupportAdapter<IgniteCompute>
      * @return Internal future.
      */
     private <T, R> IgniteInternalFuture<R> executeAsync0(String taskName, 
@Nullable T arg) {
-        A.notNull(taskName, "taskName");
-
         guard();
 
         try {
-            ctx.task().setThreadContextIfNotNull(TC_SUBGRID_PREDICATE, 
prj.predicate());
+            A.notNull(taskName, "taskName");
 
-            return ctx.task().execute(taskName, arg, execName);
+            return ctx.task().execute(taskName, arg, opts.get());
         }
         finally {
+            opts.remove();
+            
             unguard();
         }
     }
@@ -483,7 +477,7 @@ public class IgniteComputeImpl extends 
AsyncSupportAdapter<IgniteCompute>
     /** {@inheritDoc} */
     @Override public <T, R> R execute(Class<? extends ComputeTask<T, R>> 
taskCls, @Nullable T arg) {
         try {
-            return (R)saveOrGet(executeAsync0(taskCls, arg));
+            return saveOrGet(executeAsync0(taskCls, arg));
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -504,16 +498,16 @@ public class IgniteComputeImpl extends 
AsyncSupportAdapter<IgniteCompute>
      * @return Internal future.
      */
     private <T, R> IgniteInternalFuture<R> executeAsync0(Class<? extends 
ComputeTask<T, R>> taskCls, @Nullable T arg) {
-        A.notNull(taskCls, "taskCls");
-
         guard();
 
         try {
-            ctx.task().setThreadContextIfNotNull(TC_SUBGRID_PREDICATE, 
prj.predicate());
+            A.notNull(taskCls, "taskCls");
 
-            return ctx.task().execute(taskCls, arg, execName);
+            return ctx.task().execute(taskCls, arg, 
opts.get().withProjectionPredicate(prj.predicate()));
         }
         finally {
+            opts.remove();
+            
             unguard();
         }
     }
@@ -521,7 +515,7 @@ public class IgniteComputeImpl extends 
AsyncSupportAdapter<IgniteCompute>
     /** {@inheritDoc} */
     @Override public <T, R> R execute(ComputeTask<T, R> task, @Nullable T arg) 
{
         try {
-            return (R)saveOrGet(executeAsync0(task, arg));
+            return saveOrGet(executeAsync0(task, arg));
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -542,16 +536,16 @@ public class IgniteComputeImpl extends 
AsyncSupportAdapter<IgniteCompute>
      * @return Task future.
      */
     public <T, R> ComputeTaskInternalFuture<R> executeAsync0(ComputeTask<T, R> 
task, @Nullable T arg) {
-        A.notNull(task, "task");
-
         guard();
 
         try {
-            ctx.task().setThreadContextIfNotNull(TC_SUBGRID_PREDICATE, 
prj.predicate());
+            A.notNull(task, "task");
 
-            return ctx.task().execute(task, arg, execName);
+            return ctx.task().execute(task, arg, 
opts.get().withProjectionPredicate(prj.predicate()));
         }
         finally {
+            opts.remove();
+
             unguard();
         }
     }
@@ -578,14 +572,16 @@ public class IgniteComputeImpl extends 
AsyncSupportAdapter<IgniteCompute>
      * @return Internal future.
      */
     private IgniteInternalFuture<?> broadcastAsync0(IgniteRunnable job) {
-        A.notNull(job, "job");
-
         guard();
 
         try {
-            return ctx.closure().runAsync(BROADCAST, job, prj.nodes(), 
execName);
+            A.notNull(job, "job");
+
+            return ctx.closure().runAsync(BROADCAST, job, opts.get());
         }
         finally {
+            opts.remove();
+
             unguard();
         }
     }
@@ -612,14 +608,16 @@ public class IgniteComputeImpl extends 
AsyncSupportAdapter<IgniteCompute>
      * @return Internal future.
      */
     private <R> IgniteInternalFuture<Collection<R>> 
broadcastAsync0(IgniteCallable<R> job) {
-        A.notNull(job, "job");
-
         guard();
 
         try {
-            return ctx.closure().callAsync(BROADCAST, 
Collections.singletonList(job), prj.nodes(), execName);
+            A.notNull(job, "job");
+
+            return ctx.closure().callAsync(BROADCAST, 
Collections.singletonList(job), opts.get());
         }
         finally {
+            opts.remove();
+
             unguard();
         }
     }
@@ -648,14 +646,16 @@ public class IgniteComputeImpl extends 
AsyncSupportAdapter<IgniteCompute>
      * @return Internal future.
      */
     private <R, T> IgniteInternalFuture<Collection<R>> 
broadcastAsync0(IgniteClosure<T, R> job, @Nullable T arg) {
-        A.notNull(job, "job");
-
         guard();
 
         try {
-            return ctx.closure().broadcast(job, arg, prj.nodes(), execName);
+            A.notNull(job, "job");
+
+            return ctx.closure().broadcast(job, arg, opts.get());
         }
         finally {
+            opts.remove();
+
             unguard();
         }
     }
@@ -682,14 +682,16 @@ public class IgniteComputeImpl extends 
AsyncSupportAdapter<IgniteCompute>
      * @return Internal future.
      */
     private IgniteInternalFuture<?> runAsync0(IgniteRunnable job) {
-        A.notNull(job, "job");
-
         guard();
 
         try {
-            return ctx.closure().runAsync(BALANCE, job, prj.nodes(), execName);
+            A.notNull(job, "job");
+
+            return ctx.closure().runAsync(BALANCE, job, opts.get());
         }
         finally {
+            opts.remove();
+
             unguard();
         }
     }
@@ -717,14 +719,16 @@ public class IgniteComputeImpl extends 
AsyncSupportAdapter<IgniteCompute>
      * @return Internal future.
      */
     private IgniteInternalFuture<?> runAsync0(Collection<? extends 
IgniteRunnable> jobs) {
-        A.notEmpty(jobs, "jobs");
-
         guard();
 
         try {
-            return ctx.closure().runAsync(BALANCE, jobs, prj.nodes(), 
execName);
+            A.notEmpty(jobs, "jobs");
+
+            return ctx.closure().runAsync(BALANCE, jobs, opts.get());
         }
         finally {
+            opts.remove();
+
             unguard();
         }
     }
@@ -742,7 +746,7 @@ public class IgniteComputeImpl extends 
AsyncSupportAdapter<IgniteCompute>
     /** {@inheritDoc} */
     @Override public <R, T> IgniteFuture<R> applyAsync(IgniteClosure<T, R> 
job, @Nullable T arg)
         throws IgniteException {
-        return (IgniteFuture<R>)createFuture(applyAsync0(job, arg));
+        return createFuture(applyAsync0(job, arg));
     }
 
     /**
@@ -753,14 +757,16 @@ public class IgniteComputeImpl extends 
AsyncSupportAdapter<IgniteCompute>
      * @return Internal future.
      */
     private <R, T> IgniteInternalFuture<R> applyAsync0(IgniteClosure<T, R> 
job, @Nullable T arg) {
-        A.notNull(job, "job");
-
         guard();
 
         try {
-            return ctx.closure().callAsync(job, arg, prj.nodes(), execName);
+            A.notNull(job, "job");
+
+            return ctx.closure().callAsync(job, arg, opts.get());
         }
         finally {
+            opts.remove();
+
             unguard();
         }
     }
@@ -777,7 +783,7 @@ public class IgniteComputeImpl extends 
AsyncSupportAdapter<IgniteCompute>
 
     /** {@inheritDoc} */
     @Override public <R> IgniteFuture<R> callAsync(IgniteCallable<R> job) 
throws IgniteException {
-        return (IgniteFuture<R>)createFuture(callAsync0(job));
+        return createFuture(callAsync0(job));
     }
 
     /**
@@ -787,14 +793,16 @@ public class IgniteComputeImpl extends 
AsyncSupportAdapter<IgniteCompute>
      * @return Internal future.
      */
     private <R> IgniteInternalFuture<R> callAsync0(IgniteCallable<R> job) {
-        A.notNull(job, "job");
-
         guard();
 
         try {
-            return ctx.closure().callAsync(BALANCE, job, prj.nodes(), 
execName);
+            A.notNull(job, "job");
+
+            return ctx.closure().callAsync(BALANCE, job, opts.get());
         }
         finally {
+            opts.remove();
+
             unguard();
         }
     }
@@ -812,7 +820,7 @@ public class IgniteComputeImpl extends 
AsyncSupportAdapter<IgniteCompute>
     /** {@inheritDoc} */
     @Override public <R> IgniteFuture<Collection<R>> callAsync(
         Collection<? extends IgniteCallable<R>> jobs) throws IgniteException {
-        return (IgniteFuture<Collection<R>>)createFuture(callAsync0(jobs));
+        return createFuture(callAsync0(jobs));
     }
 
     /**
@@ -822,14 +830,16 @@ public class IgniteComputeImpl extends 
AsyncSupportAdapter<IgniteCompute>
      * @return Internal future.
      */
     private <R> IgniteInternalFuture<Collection<R>> callAsync0(Collection<? 
extends IgniteCallable<R>> jobs) {
-        A.notEmpty(jobs, "jobs");
-
         guard();
 
         try {
-            return ctx.closure().callAsync(BALANCE, (Collection<? extends 
Callable<R>>)jobs, prj.nodes(), execName);
+            A.notEmpty(jobs, "jobs");
+
+            return ctx.closure().callAsync(BALANCE, jobs, opts.get());
         }
         finally {
+            opts.remove();
+
             unguard();
         }
     }
@@ -847,7 +857,7 @@ public class IgniteComputeImpl extends 
AsyncSupportAdapter<IgniteCompute>
     /** {@inheritDoc} */
     @Override public <T, R> IgniteFuture<Collection<R>> 
applyAsync(IgniteClosure<T, R> job,
         Collection<? extends T> args) throws IgniteException {
-        return (IgniteFuture<Collection<R>>)createFuture(applyAsync0(job, 
args));
+        return createFuture(applyAsync0(job, args));
     }
 
     /**
@@ -859,15 +869,17 @@ public class IgniteComputeImpl extends 
AsyncSupportAdapter<IgniteCompute>
      */
     private <T, R> IgniteInternalFuture<Collection<R>> applyAsync0(final 
IgniteClosure<T, R> job,
         @Nullable Collection<? extends T> args) {
-        A.notNull(job, "job");
-        A.notNull(args, "args");
-
         guard();
 
         try {
-            return ctx.closure().callAsync(job, args, prj.nodes(), execName);
+            A.notNull(job, "job");
+            A.notNull(args, "args");
+
+            return ctx.closure().callAsync(job, args, opts.get());
         }
         finally {
+            opts.remove();
+
             unguard();
         }
     }
@@ -885,7 +897,7 @@ public class IgniteComputeImpl extends 
AsyncSupportAdapter<IgniteCompute>
     /** {@inheritDoc} */
     @Override public <R1, R2> IgniteFuture<R2> callAsync(Collection<? extends 
IgniteCallable<R1>> jobs,
         IgniteReducer<R1, R2> rdc) throws IgniteException {
-        return (IgniteFuture<R2>)createFuture(callAsync0(jobs, rdc));
+        return createFuture(callAsync0(jobs, rdc));
     }
 
     /**
@@ -897,15 +909,17 @@ public class IgniteComputeImpl extends 
AsyncSupportAdapter<IgniteCompute>
      */
     private <R1, R2> IgniteInternalFuture<R2> callAsync0(Collection<? extends 
IgniteCallable<R1>> jobs,
         IgniteReducer<R1, R2> rdc) {
-        A.notEmpty(jobs, "jobs");
-        A.notNull(rdc, "rdc");
-
         guard();
 
         try {
-            return ctx.closure().forkjoinAsync(BALANCE, jobs, rdc, 
prj.nodes(), execName);
+            A.notEmpty(jobs, "jobs");
+            A.notNull(rdc, "rdc");
+
+            return ctx.closure().forkjoinAsync(BALANCE, jobs, rdc, opts.get());
         }
         finally {
+            opts.remove();
+
             unguard();
         }
     }
@@ -937,16 +951,18 @@ public class IgniteComputeImpl extends 
AsyncSupportAdapter<IgniteCompute>
      */
     private <R1, R2, T> IgniteInternalFuture<R2> applyAsync0(IgniteClosure<T, 
R1> job, Collection<? extends T> args,
         IgniteReducer<R1, R2> rdc) {
-        A.notNull(job, "job");
-        A.notNull(rdc, "rdc");
-        A.notNull(args, "args");
-
         guard();
 
         try {
-            return ctx.closure().callAsync(job, args, rdc, prj.nodes(), 
execName);
+            A.notNull(job, "job");
+            A.notNull(rdc, "rdc");
+            A.notNull(args, "args");
+
+            return ctx.closure().callAsync(job, args, rdc, opts.get());
         }
         finally {
+            opts.remove();
+
             unguard();
         }
     }
@@ -970,7 +986,7 @@ public class IgniteComputeImpl extends 
AsyncSupportAdapter<IgniteCompute>
         guard();
 
         try {
-            ctx.task().setThreadContext(TC_TASK_NAME, taskName);
+            opts.get().withName(taskName);
         }
         finally {
             unguard();
@@ -986,7 +1002,7 @@ public class IgniteComputeImpl extends 
AsyncSupportAdapter<IgniteCompute>
         guard();
 
         try {
-            ctx.task().setThreadContext(TC_TIMEOUT, timeout);
+            opts.get().withTimeout(timeout);
         }
         finally {
             unguard();
@@ -1000,7 +1016,7 @@ public class IgniteComputeImpl extends 
AsyncSupportAdapter<IgniteCompute>
         guard();
 
         try {
-            ctx.task().setThreadContext(TC_NO_FAILOVER, true);
+            opts.get().withFailoverDisabled();
         }
         finally {
             unguard();
@@ -1014,7 +1030,7 @@ public class IgniteComputeImpl extends 
AsyncSupportAdapter<IgniteCompute>
         guard();
 
         try {
-            ctx.task().setThreadContext(TC_NO_RESULT_CACHE, true);
+            opts.get().withResultCacheDisabled();
         }
         finally {
             unguard();
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/executor/GridExecutorService.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/executor/GridExecutorService.java
index cf1c78b17b7..1f9f87e3f48 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/executor/GridExecutorService.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/executor/GridExecutorService.java
@@ -43,6 +43,7 @@ import 
org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterGroupAdapter;
 import org.apache.ignite.internal.compute.ComputeTaskTimeoutCheckedException;
+import org.apache.ignite.internal.processors.task.TaskExecutionOptions;
 import org.apache.ignite.internal.util.typedef.CX1;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -259,7 +260,7 @@ public class GridExecutorService implements 
ExecutorService, Externalizable {
         ctx.gateway().readLock();
 
         try {
-            return addFuture(ctx.closure().callAsync(BALANCE, task, 
prj.nodes()));
+            return addFuture(ctx.closure().callAsync(BALANCE, task, 
options()));
         }
         finally {
             ctx.gateway().readUnlock();
@@ -275,14 +276,16 @@ public class GridExecutorService implements 
ExecutorService, Externalizable {
         ctx.gateway().readLock();
 
         try {
-            IgniteInternalFuture<T> fut = ctx.closure().runAsync(BALANCE, 
task, prj.nodes()).chain(
-                new CX1<IgniteInternalFuture<?>, T>() {
-                    @Override public T applyx(IgniteInternalFuture<?> fut) 
throws IgniteCheckedException {
-                        fut.get();
+            IgniteInternalFuture<T> fut = ctx.closure()
+                .runAsync(BALANCE, task, options())
+                .chain(
+                    new CX1<IgniteInternalFuture<?>, T>() {
+                        @Override public T applyx(IgniteInternalFuture<?> fut) 
throws IgniteCheckedException {
+                            fut.get();
 
-                        return res;
-                    }
-                });
+                            return res;
+                        }
+                    });
 
             return addFuture(fut);
         }
@@ -300,7 +303,7 @@ public class GridExecutorService implements 
ExecutorService, Externalizable {
         ctx.gateway().readLock();
 
         try {
-            return addFuture(ctx.closure().runAsync(BALANCE, task, 
prj.nodes()));
+            return addFuture(ctx.closure().runAsync(BALANCE, task, options()));
         }
         finally {
             ctx.gateway().readUnlock();
@@ -364,7 +367,7 @@ public class GridExecutorService implements 
ExecutorService, Externalizable {
             ctx.gateway().readLock();
 
             try {
-                fut = ctx.closure().callAsync(BALANCE, task, prj.nodes());
+                fut = ctx.closure().callAsync(BALANCE, task, options());
             }
             finally {
                 ctx.gateway().readUnlock();
@@ -497,7 +500,7 @@ public class GridExecutorService implements 
ExecutorService, Externalizable {
             ctx.gateway().readLock();
 
             try {
-                fut = ctx.closure().callAsync(BALANCE, cmd, prj.nodes());
+                fut = ctx.closure().callAsync(BALANCE, cmd, options());
             }
             finally {
                 ctx.gateway().readUnlock();
@@ -574,7 +577,7 @@ public class GridExecutorService implements 
ExecutorService, Externalizable {
         ctx.gateway().readLock();
 
         try {
-            addFuture(ctx.closure().runAsync(BALANCE, cmd, prj.nodes()));
+            addFuture(ctx.closure().runAsync(BALANCE, cmd, options()));
         }
         finally {
             ctx.gateway().readUnlock();
@@ -727,4 +730,9 @@ public class GridExecutorService implements 
ExecutorService, Externalizable {
             }
         }
     }
+
+    /** @return Task execution options with node projection automatically set. 
*/
+    private TaskExecutionOptions options() {
+        return TaskExecutionOptions.options(prj.nodes());
+    }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
index caddc85d9b6..4067df9665f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
@@ -70,11 +70,13 @@ import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
+
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.GridClosureCallMode.BROADCAST;
 import static 
org.apache.ignite.internal.processors.affinity.GridAffinityUtils.affinityJob;
 import static 
org.apache.ignite.internal.processors.affinity.GridAffinityUtils.unmarshall;
+import static 
org.apache.ignite.internal.processors.task.TaskExecutionOptions.options;
 
 /**
  * Data affinity processor.
@@ -606,7 +608,13 @@ public class GridAffinityProcessor extends 
GridProcessorAdapter {
      */
     private IgniteInternalFuture<AffinityInfo> affinityInfoFromNode(String 
cacheName, AffinityTopologyVersion topVer, ClusterNode n) {
         IgniteInternalFuture<GridTuple3<GridAffinityMessage, 
GridAffinityMessage, GridAffinityAssignment>> fut = ctx.closure()
-            .callAsyncNoFailover(BROADCAST, affinityJob(cacheName, topVer), 
F.asList(n), true/*system pool*/, 0, false);
+            .callAsync(
+                BROADCAST,
+                affinityJob(cacheName, topVer),
+                options(F.asList(n))
+                    .withFailoverDisabled()
+                    .asSystemTask()
+            );
 
         return fut.chain(
             new CX1<IgniteInternalFuture<GridTuple3<GridAffinityMessage, 
GridAffinityMessage, GridAffinityAssignment>>, AffinityInfo>() {
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 36378b008f8..9eb04c8b208 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -76,7 +76,6 @@ import org.apache.ignite.compute.ComputeTaskAdapter;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.TransactionConfiguration;
-import org.apache.ignite.internal.ComputeTaskInternalFuture;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteFeatures;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -174,8 +173,7 @@ import static 
org.apache.ignite.internal.processors.cache.distributed.dht.topolo
 import static org.apache.ignite.internal.processors.dr.GridDrType.DR_LOAD;
 import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE;
 import static 
org.apache.ignite.internal.processors.metric.impl.MetricUtils.cacheMetricsRegistryName;
-import static 
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_NO_FAILOVER;
-import static 
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBGRID;
+import static 
org.apache.ignite.internal.processors.task.TaskExecutionOptions.options;
 import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
 import static 
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
 import static 
org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
@@ -1181,10 +1179,11 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         Collection<ClusterNode> srvNodes = 
ctx.grid().cluster().forCacheNodes(name(), !near, near, false).nodes();
 
         if (!srvNodes.isEmpty()) {
-            ctx.kernalContext().task().setThreadContext(TC_SUBGRID, srvNodes);
-
             return ctx.kernalContext().task().execute(
-                new ClearTask(ctx.name(), 
ctx.affinity().affinityTopologyVersion(), keys, near), null);
+                new ClearTask(ctx.name(), 
ctx.affinity().affinityTopologyVersion(), keys, near),
+                null,
+                options(srvNodes)
+            );
         }
         else
             return new GridFinishedFuture<>();
@@ -1211,8 +1210,12 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
             return new GridFinishedFuture<>();
         }
 
-        return ctx.closures().affinityRun(Collections.singleton(name()), part,
-            new PartitionPreloadJob(ctx.name(), part), grp.nodes(), null);
+        return ctx.closures().affinityRun(
+            Collections.singleton(name()),
+            part,
+            new PartitionPreloadJob(ctx.name(), part),
+            options(grp.nodes())
+        );
     }
 
     /**
@@ -3818,12 +3821,13 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         if (nodes.isEmpty())
             return new GridFinishedFuture<>();
 
-        return ctx.closures().callAsyncNoFailover(BROADCAST,
+        return ctx.closures().callAsync(
+            BROADCAST,
             new LoadKeysCallable<>(ctx.name(), keys, update, plc, keepBinary),
-            nodes,
-            true,
-            0,
-            false);
+            options(nodes)
+                .withFailoverDisabled()
+                .asSystemTask()
+        );
     }
 
     /**
@@ -3929,8 +3933,6 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
     IgniteInternalFuture<?> globalLoadCacheAsync(@Nullable 
IgniteBiPredicate<K, V> p, @Nullable Object... args)
         throws IgniteCheckedException {
 
-        ctx.kernalContext().task().setThreadContext(TC_NO_FAILOVER, true);
-
         CacheOperationContext opCtx = ctx.operationContextPerCall();
 
         ExpiryPolicy plc = opCtx != null ? opCtx.expiry() : null;
@@ -3944,10 +3946,10 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
 
         final boolean keepBinary = opCtx != null && opCtx.isKeepBinary();
 
-        ComputeTaskInternalFuture fut = 
ctx.kernalContext().closure().callAsync(BROADCAST,
-            Collections.singletonList(
-                new LoadCacheJobV2<>(ctx.name(), 
ctx.affinity().affinityTopologyVersion(), p, args, plc, keepBinary)),
-            nodes);
+        IgniteInternalFuture<?> fut = ctx.kernalContext().closure().callAsync(
+            BROADCAST,
+            Collections.singletonList(new LoadCacheJobV2<>(ctx.name(), 
ctx.affinity().affinityTopologyVersion(), p, args, plc, keepBinary)),
+            options(nodes));
 
         return fut;
     }
@@ -3982,10 +3984,11 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         if (nodes.isEmpty())
             return new GridFinishedFuture<>(0);
 
-        ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes);
-
         return ctx.kernalContext().task().execute(
-            new SizeTask(ctx.name(), ctx.affinity().affinityTopologyVersion(), 
peekModes), null);
+            new SizeTask(ctx.name(), ctx.affinity().affinityTopologyVersion(), 
peekModes),
+            null,
+            options(nodes)
+        );
     }
 
     /** {@inheritDoc} */
@@ -4003,10 +4006,11 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         if (nodes.isEmpty())
             return new GridFinishedFuture<>(0L);
 
-        ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes);
-
         return ctx.kernalContext().task().execute(
-            new SizeLongTask(ctx.name(), 
ctx.affinity().affinityTopologyVersion(), peekModes), null);
+            new SizeLongTask(ctx.name(), 
ctx.affinity().affinityTopologyVersion(), peekModes),
+            null,
+            options(nodes)
+        );
     }
 
     /** {@inheritDoc} */
@@ -4032,10 +4036,11 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         if (nodes.isEmpty())
             return new GridFinishedFuture<>(0L);
 
-        ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes);
-
         return ctx.kernalContext().task().execute(
-            new PartitionSizeLongTask(ctx.name(), 
ctx.affinity().affinityTopologyVersion(), peekModes, part), null);
+            new PartitionSizeLongTask(ctx.name(), 
ctx.affinity().affinityTopologyVersion(), peekModes, part),
+            null,
+            options(nodes)
+        );
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
index c5f493b694d..2defa91c2b4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
@@ -73,6 +73,7 @@ import org.jetbrains.annotations.Nullable;
 
 import static javax.cache.event.EventType.REMOVED;
 import static org.apache.ignite.internal.GridClosureCallMode.BROADCAST;
+import static 
org.apache.ignite.internal.processors.task.TaskExecutionOptions.options;
 
 /**
  *
@@ -474,11 +475,13 @@ public class CacheDataStructuresManager extends 
GridCacheManagerAdapter {
             Collection<ClusterNode> nodes = cctx.discovery().nodes(topVer);
 
             try {
-                cctx.closures().callAsyncNoFailover(BROADCAST,
+                cctx.closures().callAsync(
+                    BROADCAST,
                     new BlockSetCallable(cctx.name(), id),
-                    nodes,
-                    true,
-                    0, false).get();
+                    options(nodes)
+                        .withFailoverDisabled()
+                        .asSystemTask()
+                ).get();
 
                 // Separated cache will be destroyed after the set is blocked.
                 if (separated)
@@ -504,11 +507,13 @@ public class CacheDataStructuresManager extends 
GridCacheManagerAdapter {
             Collection<ClusterNode> affNodes = CU.affinityNodes(cctx, topVer);
 
             try {
-                cctx.closures().callAsyncNoFailover(BROADCAST,
+                cctx.closures().callAsync(
+                    BROADCAST,
                     new RemoveSetDataCallable(cctx.name(), id, topVer),
-                    affNodes,
-                    true,
-                    0, false).get();
+                    options(affNodes)
+                        .withFailoverDisabled()
+                        .asSystemTask()
+                ).get();
             }
             catch (IgniteCheckedException e) {
                 if (e.hasCause(ClusterTopologyCheckedException.class)) {
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
index 5214abaa84b..4a5abb62b25 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
@@ -62,7 +62,7 @@ import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 import static 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
-import static 
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBGRID;
+import static 
org.apache.ignite.internal.processors.task.TaskExecutionOptions.options;
 
 /**
  * Distributed cache implementation.
@@ -184,10 +184,11 @@ public abstract class GridDistributedCacheAdapter<K, V> 
extends GridCacheAdapter
                 Collection<ClusterNode> nodes = 
ctx.grid().cluster().forDataNodes(name()).nodes();
 
                 if (!nodes.isEmpty()) {
-                    ctx.kernalContext().task().setThreadContext(TC_SUBGRID, 
nodes);
-
                     retry = !ctx.kernalContext().task().execute(
-                        new RemoveAllTask(ctx.name(), topVer, skipStore, 
keepBinary), null).get();
+                        new RemoveAllTask(ctx.name(), topVer, skipStore, 
keepBinary),
+                        null,
+                        options(nodes)
+                    ).get();
                 }
             }
             while (ctx.affinity().affinityTopologyVersion().compareTo(topVer) 
!= 0 || retry);
@@ -225,10 +226,11 @@ public abstract class GridDistributedCacheAdapter<K, V> 
extends GridCacheAdapter
         Collection<ClusterNode> nodes = 
ctx.grid().cluster().forDataNodes(name()).nodes();
 
         if (!nodes.isEmpty()) {
-            ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes);
-
             IgniteInternalFuture<Boolean> rmvAll = 
ctx.kernalContext().task().execute(
-                new RemoveAllTask(ctx.name(), topVer, skipStore, keepBinary), 
null);
+                new RemoveAllTask(ctx.name(), topVer, skipStore, keepBinary),
+                null,
+                options(nodes)
+            );
 
             rmvAll.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() 
{
                 @Override public void apply(IgniteInternalFuture<Boolean> fut) 
{
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
index 5d655f5b760..212e5ba8b07 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
@@ -225,8 +225,7 @@ import static 
org.apache.ignite.internal.processors.cache.persistence.tree.io.Pa
 import static 
org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO.getType;
 import static 
org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO.getVersion;
 import static 
org.apache.ignite.internal.processors.configuration.distributed.DistributedLongProperty.detachedLongProperty;
-import static 
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SKIP_AUTH;
-import static 
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBGRID;
+import static 
org.apache.ignite.internal.processors.task.TaskExecutionOptions.options;
 import static org.apache.ignite.internal.util.GridUnsafe.bufferAddress;
 import static 
org.apache.ignite.internal.util.IgniteUtils.isLocalNodeCoordinator;
 import static 
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.END_SNAPSHOT;
@@ -1264,12 +1263,13 @@ public class IgniteSnapshotManager extends 
GridCacheSharedManagerAdapter
         cctx.kernalContext().security().authorize(ADMIN_SNAPSHOT);
 
         return cctx.kernalContext().closure()
-            .callAsyncNoFailover(BROADCAST,
+            .callAsync(
+                BROADCAST,
                 new CancelSnapshotCallable(null, name),
-                cctx.discovery().aliveServerNodes(),
-                false,
-                0,
-                true);
+                options(cctx.discovery().aliveServerNodes())
+                    .withFailoverDisabled()
+                    .withAuthenticationDisabled()
+            );
     }
 
     /**
@@ -1282,12 +1282,13 @@ public class IgniteSnapshotManager extends 
GridCacheSharedManagerAdapter
         cctx.kernalContext().security().authorize(ADMIN_SNAPSHOT);
 
         IgniteInternalFuture<Boolean> fut0 = cctx.kernalContext().closure()
-            .callAsyncNoFailover(BROADCAST,
+            .callAsync(
+                BROADCAST,
                 new CancelSnapshotCallable(reqId, null),
-                cctx.discovery().aliveServerNodes(),
-                false,
-                0,
-                true);
+                options(cctx.discovery().aliveServerNodes())
+                    .withFailoverDisabled()
+                    .withAuthenticationDisabled()
+            );
 
         return new IgniteFutureImpl<>(fut0);
     }
@@ -1431,12 +1432,13 @@ public class IgniteSnapshotManager extends 
GridCacheSharedManagerAdapter
         Collection<ClusterNode> bltNodes = 
F.view(cctx.discovery().serverNodes(AffinityTopologyVersion.NONE),
             (node) -> CU.baselineNode(node, kctx0.state().clusterState()));
 
-        kctx0.task().setThreadContext(TC_SKIP_AUTH, true);
-        kctx0.task().setThreadContext(TC_SUBGRID, bltNodes);
-
         SnapshotMetadataCollectorTaskArg taskArg = new 
SnapshotMetadataCollectorTaskArg(name, snpPath);
 
-        kctx0.task().execute(SnapshotMetadataCollectorTask.class, 
taskArg).listen(f0 -> {
+        kctx0.task().execute(
+            SnapshotMetadataCollectorTask.class,
+            taskArg,
+            options(bltNodes).withAuthenticationDisabled()
+        ).listen(f0 -> {
             if (f0.error() == null) {
                 Map<ClusterNode, List<SnapshotMetadata>> metas = f0.result();
 
@@ -1511,14 +1513,14 @@ public class IgniteSnapshotManager extends 
GridCacheSharedManagerAdapter
                     return;
                 }
 
-                kctx0.task().setThreadContext(TC_SKIP_AUTH, true);
-                kctx0.task().setThreadContext(TC_SUBGRID, new 
ArrayList<>(metas.keySet()));
-
                 Class<? extends AbstractSnapshotVerificationTask> cls =
                     includeCustomHandlers ? SnapshotHandlerRestoreTask.class : 
SnapshotPartitionsVerifyTask.class;
 
-                kctx0.task().execute(cls, new 
SnapshotPartitionsVerifyTaskArg(grps, metas, snpPath))
-                    .listen(f1 -> {
+                kctx0.task().execute(
+                        cls,
+                        new SnapshotPartitionsVerifyTaskArg(grps, metas, 
snpPath),
+                        options(new 
ArrayList<>(metas.keySet())).withAuthenticationDisabled()
+                    ).listen(f1 -> {
                         if (f1.error() == null)
                             res.onDone(f1.result());
                         else if (f1.error() instanceof 
IgniteSnapshotVerifyException)
@@ -1693,12 +1695,13 @@ public class IgniteSnapshotManager extends 
GridCacheSharedManagerAdapter
                     throw new IgniteException("There is no alive server nodes 
in the cluster");
 
                 return new 
IgniteSnapshotFutureImpl(cctx.kernalContext().closure()
-                    .callAsyncNoFailover(BALANCE,
+                    .callAsync(
+                        BALANCE,
                         new CreateSnapshotCallable(name),
-                        Collections.singletonList(crd),
-                        false,
-                        0,
-                        true));
+                        options(Collections.singletonList(crd))
+                            .withFailoverDisabled()
+                            .withAuthenticationDisabled()
+                    ));
             }
 
             ClusterSnapshotFuture snpFut0;
@@ -2323,10 +2326,11 @@ public class IgniteSnapshotManager extends 
GridCacheSharedManagerAdapter
         Collection<ClusterNode> bltNodes = 
F.view(cctx.discovery().serverNodes(AffinityTopologyVersion.NONE),
             (node) -> CU.baselineNode(node, 
cctx.kernalContext().state().clusterState()));
 
-        cctx.kernalContext().task().setThreadContext(TC_SKIP_AUTH, true);
-        cctx.kernalContext().task().setThreadContext(TC_SUBGRID, bltNodes);
-
-        return new 
IgniteFutureImpl<>(cctx.kernalContext().task().execute(taskCls, snpName));
+        return new IgniteFutureImpl<>(cctx.kernalContext().task().execute(
+            taskCls,
+            snpName,
+            options(bltNodes).withAuthenticationDisabled()
+        ));
     }
 
     /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 31f02b8cc61..a139361a2a9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -135,6 +135,7 @@ import 
org.apache.ignite.spi.indexing.IndexingQueryFilterImpl;
 import org.apache.ignite.spi.indexing.IndexingSpi;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
+
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_QUIET;
 import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
 import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
@@ -150,6 +151,7 @@ import static 
org.apache.ignite.internal.processors.cache.query.GridCacheQueryTy
 import static 
org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS;
 import static 
org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.TEXT;
 import static 
org.apache.ignite.internal.processors.security.SecurityUtils.securitySubjectId;
+import static 
org.apache.ignite.internal.processors.task.TaskExecutionOptions.options;
 
 /**
  * Query and index manager.
@@ -1868,7 +1870,13 @@ public abstract class GridCacheQueryManager<K, V> 
extends GridCacheManagerAdapte
 
             // Get metadata from remote nodes.
             if (!nodes.isEmpty())
-                rmtFut = cctx.closures().callAsyncNoFailover(BROADCAST, 
Collections.singleton(job), nodes, true, 0);
+                rmtFut = cctx.closures().callAsync(
+                    BROADCAST,
+                    Collections.singleton(job),
+                    options(nodes)
+                        .withFailoverDisabled()
+                        .asSystemTask()
+                );
 
             // Get local metadata.
             IgniteInternalFuture<Collection<CacheSqlMetadata>> locFut = 
cctx.closures().callLocalSafe(job, true);
@@ -1968,7 +1976,13 @@ public abstract class GridCacheQueryManager<K, V> 
extends GridCacheManagerAdapte
                 if (!allNodesNew)
                     return sqlMetadata();
 
-                rmtFut = cctx.closures().callAsyncNoFailover(BROADCAST, 
Collections.singleton(job), nodes, true, 0);
+                rmtFut = cctx.closures().callAsync(
+                    BROADCAST,
+                    Collections.singleton(job),
+                    options(nodes)
+                        .withFailoverDisabled()
+                        .asSystemTask()
+                );
             }
 
             // Get local metadata.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
index 068a35fb8a2..69bdd33378d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
@@ -50,6 +50,7 @@ import 
org.apache.ignite.internal.processors.GridProcessorAdapter;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.pool.PoolProcessor;
 import org.apache.ignite.internal.processors.resource.GridNoImplicitInjection;
+import org.apache.ignite.internal.processors.task.TaskExecutionOptions;
 import org.apache.ignite.internal.util.GridSpinReadWriteLock;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -75,10 +76,6 @@ import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.compute.ComputeJobResultPolicy.FAILOVER;
 import static org.apache.ignite.compute.ComputeJobResultPolicy.REDUCE;
-import static 
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_NO_FAILOVER;
-import static 
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SKIP_AUTH;
-import static 
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBGRID;
-import static 
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_TIMEOUT;
 
 /**
  *
@@ -147,28 +144,13 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
     /**
      * @param mode Distribution mode.
      * @param jobs Closures to execute.
-     * @param nodes Grid nodes.
-     * @param execName Custom executor name.
-     * @return Task execution future.
-     */
-    public ComputeTaskInternalFuture<?> runAsync(GridClosureCallMode mode, 
@Nullable Collection<? extends Runnable> jobs,
-        @Nullable Collection<ClusterNode> nodes, @Nullable String execName) {
-        return runAsync(mode, jobs, nodes, false, execName);
-    }
-
-    /**
-     * @param mode Distribution mode.
-     * @param jobs Closures to execute.
-     * @param nodes Grid nodes.
-     * @param sys If {@code true}, then system pool will be used.
-     * @param execName Custom executor name.
+     * @param opts Task execution options.
      * @return Task execution future.
      */
-    public ComputeTaskInternalFuture<?> runAsync(GridClosureCallMode mode,
+    public ComputeTaskInternalFuture<?> runAsync(
+        GridClosureCallMode mode,
         Collection<? extends Runnable> jobs,
-        @Nullable Collection<ClusterNode> nodes,
-        boolean sys,
-        @Nullable String execName
+        TaskExecutionOptions opts
     ) {
         assert mode != null;
         assert !F.isEmpty(jobs) : jobs;
@@ -181,12 +163,10 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
                     new IgniteCheckedException("Closure processor cannot be 
used on stopped grid: " + ctx.igniteInstanceName()));
             }
 
-            if (F.isEmpty(nodes))
+            if (F.isEmpty(opts.projection()))
                 return ComputeTaskInternalFuture.finishedFuture(ctx, T1.class, 
U.emptyTopologyException());
 
-            ctx.task().setThreadContext(TC_SUBGRID, nodes);
-
-            return ctx.task().execute(new T1(mode, jobs), null, sys, execName);
+            return ctx.task().execute(new T1(mode, jobs), null, opts);
         }
         finally {
             busyLock.readUnlock();
@@ -196,39 +176,13 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
     /**
      * @param mode Distribution mode.
      * @param job Closure to execute.
-     * @param nodes Grid nodes.
-     * @return Task execution future.
-     */
-    public ComputeTaskInternalFuture<?> runAsync(GridClosureCallMode mode, 
Runnable job,
-        @Nullable Collection<ClusterNode> nodes) {
-        return runAsync(mode, job, nodes, null);
-    }
-
-    /**
-     * @param mode Distribution mode.
-     * @param job Closure to execute.
-     * @param nodes Grid nodes.
-     * @param execName Custom executor name.
-     * @return Task execution future.
-     */
-    public ComputeTaskInternalFuture<?> runAsync(GridClosureCallMode mode, 
Runnable job,
-        @Nullable Collection<ClusterNode> nodes, @Nullable String execName) {
-        return runAsync(mode, job, nodes, false, execName);
-    }
-
-    /**
-     * @param mode Distribution mode.
-     * @param job Closure to execute.
-     * @param nodes Grid nodes.
-     * @param sys If {@code true}, then system pool will be used.
-     * @param execName Custom executor name.
+     * @param opts Task execution options.
      * @return Task execution future.
      */
-    public ComputeTaskInternalFuture<?> runAsync(GridClosureCallMode mode,
+    public ComputeTaskInternalFuture<?> runAsync(
+        GridClosureCallMode mode,
         Runnable job,
-        @Nullable Collection<ClusterNode> nodes,
-        boolean sys,
-        @Nullable String execName
+        TaskExecutionOptions opts
     ) {
         assert mode != null;
         assert job != null;
@@ -236,12 +190,10 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
         busyLock.readLock();
 
         try {
-            if (F.isEmpty(nodes))
+            if (F.isEmpty(opts.projection()))
                 return ComputeTaskInternalFuture.finishedFuture(ctx, T2.class, 
U.emptyTopologyException());
 
-            ctx.task().setThreadContext(TC_SUBGRID, nodes);
-
-            return ctx.task().execute(new T2(mode, job), null, sys, execName);
+            return ctx.task().execute(new T2(mode, job), null, opts);
         }
         finally {
             busyLock.readUnlock();
@@ -359,17 +311,16 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
      * @param mode Distribution mode.
      * @param jobs Closures to execute.
      * @param rdc Reducer.
-     * @param nodes Grid nodes.
-     * @param execName Custom executor name.
+     * @param opts Task execution options.
      * @param <R1> Type.
      * @param <R2> Type.
      * @return Reduced result.
      */
-    public <R1, R2> ComputeTaskInternalFuture<R2> 
forkjoinAsync(GridClosureCallMode mode,
+    public <R1, R2> ComputeTaskInternalFuture<R2> forkjoinAsync(
+        GridClosureCallMode mode,
         Collection<? extends Callable<R1>> jobs,
         IgniteReducer<R1, R2> rdc,
-        @Nullable Collection<ClusterNode> nodes,
-        @Nullable String execName
+        TaskExecutionOptions opts
     ) {
         assert mode != null;
         assert rdc != null;
@@ -378,12 +329,10 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
         busyLock.readLock();
 
         try {
-            if (F.isEmpty(nodes))
+            if (F.isEmpty(opts.projection()))
                 return ComputeTaskInternalFuture.finishedFuture(ctx, T3.class, 
U.emptyTopologyException());
 
-            ctx.task().setThreadContext(TC_SUBGRID, nodes);
-
-            return ctx.task().execute(new T3<>(mode, jobs, rdc), null, 
execName);
+            return ctx.task().execute(new T3<>(mode, jobs, rdc), null, opts);
         }
         finally {
             busyLock.readUnlock();
@@ -393,112 +342,55 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
     /**
      * @param mode Distribution mode.
      * @param jobs Closures to execute.
-     * @param nodes Grid nodes.
-     * @param <R> Type.
-     * @return Grid future for collection of closure results.
-     */
-    public <R> ComputeTaskInternalFuture<Collection<R>> callAsync(
-        GridClosureCallMode mode,
-        @Nullable Collection<? extends Callable<R>> jobs,
-        @Nullable Collection<ClusterNode> nodes) {
-        return callAsync(mode, jobs, nodes, null);
-    }
-
-    /**
-     * @param mode Distribution mode.
-     * @param jobs Closures to execute.
-     * @param nodes Grid nodes.
-     * @param execName Custom executor name.
+     * @param opts Task execution options.
      * @param <R> Type.
      * @return Grid future for collection of closure results.
      */
-    public <R> ComputeTaskInternalFuture<Collection<R>> callAsync(
+    public <R> IgniteInternalFuture<Collection<R>> callAsync(
         GridClosureCallMode mode,
-        @Nullable Collection<? extends Callable<R>> jobs,
-        @Nullable Collection<ClusterNode> nodes,
-        @Nullable String execName) {
-        return callAsync(mode, jobs, nodes, false, execName);
-    }
-
-    /**
-     * @param mode Distribution mode.
-     * @param jobs Closures to execute.
-     * @param nodes Grid nodes.
-     * @param sys If {@code true}, then system pool will be used.
-     * @param execName Custom executor name.
-     * @param <R> Type.
-     * @return Grid future for collection of closure results.
-     */
-    public <R> ComputeTaskInternalFuture<Collection<R>> 
callAsync(GridClosureCallMode mode,
         Collection<? extends Callable<R>> jobs,
-        @Nullable Collection<ClusterNode> nodes,
-        boolean sys,
-        @Nullable String execName
+        TaskExecutionOptions opts
     ) {
         assert mode != null;
+        assert opts.isFailoverDisabled() || !F.isEmpty(jobs);
+
         assert !F.isEmpty(jobs);
 
         busyLock.readLock();
 
         try {
-            if (F.isEmpty(nodes))
-                return ComputeTaskInternalFuture.finishedFuture(ctx, T6.class, 
U.emptyTopologyException());
+            if (F.isEmpty(jobs) && opts.isFailoverDisabled())
+                return new GridFinishedFuture<>();
 
-            ctx.task().setThreadContext(TC_SUBGRID, nodes);
+            if (F.isEmpty(opts.projection()))
+                return ComputeTaskInternalFuture.finishedFuture(ctx, T6.class, 
U.emptyTopologyException());
 
-            return ctx.task().execute(new T6<>(mode, jobs), null, sys, 
execName);
+            return ctx.task().execute(new T6<>(mode, jobs), null, opts);
         }
         finally {
             busyLock.readUnlock();
         }
     }
 
-    /**
-     * @param mode Distribution mode.
-     * @param job Closure to execute.
-     * @param nodes Grid nodes.
-     * @param <R> Type.
-     * @return Grid future for collection of closure results.
-     */
-    public <R> ComputeTaskInternalFuture<R> callAsync(GridClosureCallMode mode,
-        @Nullable Callable<R> job, @Nullable Collection<ClusterNode> nodes) {
-        return callAsync(mode, job, nodes, null);
-    }
-
-    /**
-     * @param mode Distribution mode.
-     * @param job Closure to execute.
-     * @param nodes Grid nodes.
-     * @param execName Custom executor name.
-     * @param <R> Type.
-     * @return Grid future for collection of closure results.
-     */
-    public <R> ComputeTaskInternalFuture<R> callAsync(GridClosureCallMode mode,
-        @Nullable Callable<R> job, @Nullable Collection<ClusterNode> nodes,
-        @Nullable String execName) {
-        return callAsync(mode, job, nodes, false, execName);
-    }
-
     /**
      * @param cacheNames Cache names.
      * @param partId Partition.
      * @param job Closure to execute.
-     * @param nodes Grid nodes.
-     * @param execName Custom executor name.
+     * @param opts Task execution options.
      * @return Grid future for collection of closure results.
      * @throws IgniteCheckedException If failed.
      */
     public <R> ComputeTaskInternalFuture<R> affinityCall(@NotNull 
Collection<String> cacheNames,
         int partId,
         Callable<R> job,
-        @Nullable Collection<ClusterNode> nodes,
-        @Nullable String execName) throws IgniteCheckedException {
+        TaskExecutionOptions opts
+    ) throws IgniteCheckedException {
         assert partId >= 0 : partId;
 
         busyLock.readLock();
 
         try {
-            if (F.isEmpty(nodes))
+            if (F.isEmpty(opts.projection()))
                 return ComputeTaskInternalFuture.finishedFuture(ctx, T5.class, 
U.emptyTopologyException());
 
             final String cacheName = F.first(cacheNames);
@@ -509,10 +401,7 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
             if (node == null)
                 return ComputeTaskInternalFuture.finishedFuture(ctx, T5.class, 
U.emptyTopologyException());
 
-            ctx.task().setThreadContext(TC_SUBGRID, nodes);
-
-            return ctx.task().execute(new T5(node, job, cacheNames, partId, 
mapTopVer), null,
-                false, execName);
+            return ctx.task().execute(new T5(node, job, cacheNames, partId, 
mapTopVer), null, opts);
         }
         finally {
             busyLock.readUnlock();
@@ -523,22 +412,22 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
      * @param cacheNames Cache names.
      * @param partId Partition.
      * @param job Job.
-     * @param nodes Grid nodes.
-     * @param execName Custom executor name.
+     * @param opts Task execution options.
      * @return Job future.
      * @throws IgniteCheckedException If failed.
      */
-    public ComputeTaskInternalFuture<?> affinityRun(@NotNull 
Collection<String> cacheNames,
+    public ComputeTaskInternalFuture<?> affinityRun(
+        @NotNull Collection<String> cacheNames,
         int partId,
         Runnable job,
-        @Nullable Collection<ClusterNode> nodes,
-        @Nullable String execName) throws IgniteCheckedException {
+        TaskExecutionOptions opts
+    ) throws IgniteCheckedException {
         assert partId >= 0 : partId;
 
         busyLock.readLock();
 
         try {
-            if (F.isEmpty(nodes))
+            if (F.isEmpty(opts.projection()))
                 return ComputeTaskInternalFuture.finishedFuture(ctx, T4.class, 
U.emptyTopologyException());
 
             final String cacheName = F.first(cacheNames);
@@ -549,10 +438,7 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
             if (node == null)
                 return ComputeTaskInternalFuture.finishedFuture(ctx, T4.class, 
U.emptyTopologyException());
 
-            ctx.task().setThreadContext(TC_SUBGRID, nodes);
-
-            return ctx.task().execute(new T4(node, job, cacheNames, partId, 
mapTopVer), null,
-                false, execName);
+            return ctx.task().execute(new T4(node, job, cacheNames, partId, 
mapTopVer), null, opts);
         }
         finally {
             busyLock.readUnlock();
@@ -560,118 +446,30 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
     }
 
     /**
-     * @param <R> Type.
      * @param mode Distribution mode.
      * @param job Closure to execute.
-     * @param nodes Grid nodes.
-     * @param sys If {@code true}, then system pool will be used.
-     * @param timeout Timeout.
-     * @param skipAuth Skip authorization check.
-     * @return Grid future for collection of closure results.
-     */
-    public <R> IgniteInternalFuture<R> callAsyncNoFailover(
-        GridClosureCallMode mode,
-        @Nullable Callable<R> job,
-        @Nullable Collection<ClusterNode> nodes,
-        boolean sys,
-        long timeout,
-        boolean skipAuth) {
-        assert mode != null;
-        assert timeout >= 0 : timeout;
-
-        busyLock.readLock();
-
-        try {
-            if (job == null)
-                return new GridFinishedFuture<>();
-
-            if (F.isEmpty(nodes))
-                return new GridFinishedFuture<>(U.emptyTopologyException());
-
-            ctx.task().setThreadContext(TC_NO_FAILOVER, true);
-            ctx.task().setThreadContext(TC_SUBGRID, nodes);
-
-            if (skipAuth)
-                ctx.task().setThreadContext(TC_SKIP_AUTH, true);
-
-            if (timeout > 0)
-                ctx.task().setThreadContext(TC_TIMEOUT, timeout);
-
-            return ctx.task().execute(new T7<>(mode, job), null, sys);
-        }
-        finally {
-            busyLock.readUnlock();
-        }
-    }
-
-    /**
-     * @param mode Distribution mode.
-     * @param jobs Closures to execute.
-     * @param nodes Grid nodes.
-     * @param sys If {@code true}, then system pool will be used.
-     * @param timeout If greater than 0 limits task execution. Cannot be 
negative.
      * @param <R> Type.
+     * @param opts Task execution options.
      * @return Grid future for collection of closure results.
      */
-    public <R> IgniteInternalFuture<Collection<R>> callAsyncNoFailover(
+    public <R> IgniteInternalFuture<R> callAsync(
         GridClosureCallMode mode,
-        @Nullable Collection<? extends Callable<R>> jobs,
-        @Nullable Collection<ClusterNode> nodes,
-        boolean sys,
-        long timeout
+        Callable<R> job,
+        TaskExecutionOptions opts
     ) {
         assert mode != null;
-        assert timeout >= 0 : timeout;
+        assert opts.isFailoverDisabled() || job != null;
 
         busyLock.readLock();
 
         try {
-            if (F.isEmpty(jobs))
+            if (job == null && opts.isFailoverDisabled())
                 return new GridFinishedFuture<>();
 
-            if (F.isEmpty(nodes))
-                return new GridFinishedFuture<>(U.emptyTopologyException());
-
-            ctx.task().setThreadContext(TC_NO_FAILOVER, true);
-            ctx.task().setThreadContext(TC_SUBGRID, nodes);
-
-            if (timeout > 0)
-                ctx.task().setThreadContext(TC_TIMEOUT, timeout);
-
-            return ctx.task().execute(new T6<>(mode, jobs), null, sys);
-        }
-        finally {
-            busyLock.readUnlock();
-        }
-    }
-
-    /**
-     * @param mode Distribution mode.
-     * @param job Closure to execute.
-     * @param nodes Grid nodes.
-     * @param sys If {@code true}, then system pool will be used.
-     * @param execName Custom executor name.
-     * @param <R> Type.
-     * @return Grid future for collection of closure results.
-     */
-    public <R> ComputeTaskInternalFuture<R> callAsync(GridClosureCallMode mode,
-        Callable<R> job,
-        @Nullable Collection<ClusterNode> nodes,
-        boolean sys,
-        @Nullable String execName
-    ) {
-        assert mode != null;
-        assert job != null;
-
-        busyLock.readLock();
-
-        try {
-            if (F.isEmpty(nodes))
+            if (F.isEmpty(opts.projection()))
                 return ComputeTaskInternalFuture.finishedFuture(ctx, T7.class, 
U.emptyTopologyException());
 
-            ctx.task().setThreadContext(TC_SUBGRID, nodes);
-
-            return ctx.task().execute(new T7<>(mode, job), null, sys, 
execName);
+            return ctx.task().execute(new T7<>(mode, job), null, opts);
         }
         finally {
             busyLock.readUnlock();
@@ -681,21 +479,17 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
     /**
      * @param job Job closure.
      * @param arg Optional job argument.
-     * @param nodes Grid nodes.
-     * @param execName Custom executor name.
+     * @param opts Task execution options.
      * @return Grid future for execution result.
      */
-    public <T, R> ComputeTaskInternalFuture<R> callAsync(IgniteClosure<T, R> 
job, @Nullable T arg,
-        @Nullable Collection<ClusterNode> nodes, @Nullable String execName) {
+    public <T, R> ComputeTaskInternalFuture<R> callAsync(IgniteClosure<T, R> 
job, @Nullable T arg, TaskExecutionOptions opts) {
         busyLock.readLock();
 
         try {
-            if (F.isEmpty(nodes))
+            if (F.isEmpty(opts.projection()))
                 return ComputeTaskInternalFuture.finishedFuture(ctx, T8.class, 
U.emptyTopologyException());
 
-            ctx.task().setThreadContext(TC_SUBGRID, nodes);
-
-            return ctx.task().execute(new T8(job, arg), null, false, execName);
+            return ctx.task().execute(new T8(job, arg), null, opts);
         }
         finally {
             busyLock.readUnlock();
@@ -705,21 +499,17 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
     /**
      * @param job Job closure.
      * @param arg Optional job argument.
-     * @param nodes Grid nodes.
-     * @param execName Custom executor name.
+     * @param opts Task execution options.
      * @return Grid future for execution result.
      */
-    public <T, R> IgniteInternalFuture<Collection<R>> 
broadcast(IgniteClosure<T, R> job, @Nullable T arg,
-        @Nullable Collection<ClusterNode> nodes, @Nullable String execName) {
+    public <T, R> IgniteInternalFuture<Collection<R>> 
broadcast(IgniteClosure<T, R> job, @Nullable T arg, TaskExecutionOptions opts) {
         busyLock.readLock();
 
         try {
-            if (F.isEmpty(nodes))
+            if (F.isEmpty(opts.projection()))
                 return new GridFinishedFuture<>(U.emptyTopologyException());
 
-            ctx.task().setThreadContext(TC_SUBGRID, nodes);
-
-            return ctx.task().execute(new T11<>(job), arg, false, execName);
+            return ctx.task().execute(new T11<>(job), arg, opts);
         }
         finally {
             busyLock.readUnlock();
@@ -729,24 +519,21 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
     /**
      * @param job Job closure.
      * @param args Job arguments.
-     * @param nodes Grid nodes.
-     * @param execName Custom executor name.
+     * @param opts Task execution options.
      * @return Grid future for execution result.
      */
-    public <T, R> ComputeTaskInternalFuture<Collection<R>> 
callAsync(IgniteClosure<T, R> job,
+    public <T, R> ComputeTaskInternalFuture<Collection<R>> callAsync(
+        IgniteClosure<T, R> job,
         @Nullable Collection<? extends T> args,
-        @Nullable Collection<ClusterNode> nodes,
-        @Nullable String execName
+        TaskExecutionOptions opts
     ) {
         busyLock.readLock();
 
         try {
-            if (F.isEmpty(nodes))
+            if (F.isEmpty(opts.projection()))
                 return ComputeTaskInternalFuture.finishedFuture(ctx, T9.class, 
U.emptyTopologyException());
 
-            ctx.task().setThreadContext(TC_SUBGRID, nodes);
-
-            return ctx.task().execute(new T9<>(job, args), null, false, 
execName);
+            return ctx.task().execute(new T9<>(job, args), null, opts);
         }
         finally {
             busyLock.readUnlock();
@@ -757,22 +544,22 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
      * @param job Job closure.
      * @param args Job arguments.
      * @param rdc Reducer.
-     * @param nodes Grid nodes.
-     * @param execName Custom executor name.
+     * @param opts Task execution options.
      * @return Grid future for execution result.
      */
-    public <T, R1, R2> ComputeTaskInternalFuture<R2> 
callAsync(IgniteClosure<T, R1> job,
-        Collection<? extends T> args, IgniteReducer<R1, R2> rdc, @Nullable 
Collection<ClusterNode> nodes,
-        @Nullable String execName) {
+    public <T, R1, R2> ComputeTaskInternalFuture<R2> callAsync(
+        IgniteClosure<T, R1> job,
+        Collection<? extends T> args,
+        IgniteReducer<R1, R2> rdc,
+        TaskExecutionOptions opts
+    ) {
         busyLock.readLock();
 
         try {
-            if (F.isEmpty(nodes))
+            if (F.isEmpty(opts.projection()))
                 return ComputeTaskInternalFuture.finishedFuture(ctx, 
T10.class, U.emptyTopologyException());
 
-            ctx.task().setThreadContext(TC_SUBGRID, nodes);
-
-            return ctx.task().execute(new T10<>(job, args, rdc), null, false, 
execName);
+            return ctx.task().execute(new T10<>(job, args, rdc), null, opts);
         }
         finally {
             busyLock.readUnlock();
@@ -1175,7 +962,7 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
 
     /**
      * Task that is free of dragged in enclosing context for the method
-     * {@link GridClosureProcessor#runAsync(GridClosureCallMode, Collection, 
Collection,String)}.
+     * {@link GridClosureProcessor#runAsync(GridClosureCallMode, Collection, 
TaskExecutionOptions)}.
      */
     private class T1 extends TaskNoReduceAdapter<Void> implements 
GridNoImplicitInjection {
         /** */
@@ -1206,7 +993,7 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
 
     /**
      * Task that is free of dragged in enclosing context for the method
-     * {@link GridClosureProcessor#runAsync(GridClosureCallMode, Runnable, 
Collection, String)}.
+     * {@link GridClosureProcessor#runAsync(GridClosureCallMode, Runnable, 
TaskExecutionOptions)}.
      */
     private class T2 extends TaskNoReduceAdapter<Void> implements 
GridNoImplicitInjection {
         /** */
@@ -1237,7 +1024,7 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
 
     /**
      * Task that is free of dragged in enclosing context for the method
-     * {@link GridClosureProcessor#forkjoinAsync(GridClosureCallMode, 
Collection, org.apache.ignite.lang.IgniteReducer, Collection, String)}
+     * {@link GridClosureProcessor#forkjoinAsync(GridClosureCallMode, 
Collection, IgniteReducer, TaskExecutionOptions)}
      */
     private class T3<R1, R2> extends GridPeerDeployAwareTaskAdapter<Void, R2> 
implements GridNoImplicitInjection {
         /** */
@@ -1423,7 +1210,7 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
 
     /**
      * Task that is free of dragged in enclosing context for the method
-     * {@link GridClosureProcessor#callAsync(GridClosureCallMode, Collection, 
Collection, String)}
+     * {@link GridClosureProcessor#callAsync(GridClosureCallMode, Collection, 
TaskExecutionOptions)}
      */
     private class T6<R> extends GridPeerDeployAwareTaskAdapter<Void, 
Collection<R>> implements GridNoImplicitInjection {
         /** */
@@ -1466,7 +1253,7 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
 
     /**
      * Task that is free of dragged in enclosing context for the method
-     * {@link GridClosureProcessor#callAsync(GridClosureCallMode, Callable, 
Collection, String)}
+     * {@link GridClosureProcessor#callAsync(GridClosureCallMode, Callable, 
TaskExecutionOptions)}
      */
     private class T7<R> extends GridPeerDeployAwareTaskAdapter<Void, R> 
implements GridNoImplicitInjection {
         /** */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/compute/ClientComputeTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/compute/ClientComputeTask.java
index 48118378e1a..b579fb9ab5c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/compute/ClientComputeTask.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/compute/ClientComputeTask.java
@@ -31,16 +31,14 @@ import 
org.apache.ignite.internal.processors.platform.client.ClientObjectNotific
 import org.apache.ignite.internal.processors.platform.client.ClientStatus;
 import 
org.apache.ignite.internal.processors.platform.client.IgniteClientException;
 import org.apache.ignite.internal.processors.task.GridTaskProcessor;
+import org.apache.ignite.internal.processors.task.TaskExecutionOptions;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgnitePredicate;
 
 import static 
org.apache.ignite.internal.processors.platform.client.ClientMessageParser.OP_COMPUTE_TASK_FINISHED;
-import static 
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_NO_FAILOVER;
-import static 
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_NO_RESULT_CACHE;
-import static 
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBGRID_PREDICATE;
-import static 
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_TIMEOUT;
+import static 
org.apache.ignite.internal.processors.task.TaskExecutionOptions.options;
 
 /**
  * Client compute task.
@@ -101,12 +99,17 @@ class ClientComputeTask implements ClientCloseableResource 
{
         IgnitePredicate<ClusterNode> nodePredicate = F.isEmpty(nodeIds) ? node 
-> !node.isClient() :
             F.nodeForNodeIds(nodeIds);
 
-        task.setThreadContext(TC_SUBGRID_PREDICATE, nodePredicate);
-        task.setThreadContext(TC_TIMEOUT, timeout);
-        task.setThreadContext(TC_NO_FAILOVER, (flags & NO_FAILOVER_FLAG_MASK) 
!= 0);
-        task.setThreadContext(TC_NO_RESULT_CACHE, (flags & 
NO_RESULT_CACHE_FLAG_MASK) != 0);
+        TaskExecutionOptions opts = options()
+            .withProjectionPredicate(nodePredicate)
+            .withTimeout(timeout);
 
-        taskFut = task.execute(taskName, arg);
+        if ((flags & NO_FAILOVER_FLAG_MASK) != 0)
+            opts.withFailoverDisabled();
+
+        if ((flags & NO_RESULT_CACHE_FLAG_MASK) != 0)
+            opts.withResultCacheDisabled();
+
+        taskFut = task.execute(taskName, arg, opts);
 
         // Fail fast.
         if (taskFut.isDone() && taskFut.error() != null)
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
index 298133233d5..835bc54ef08 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
@@ -46,8 +46,6 @@ import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.jetbrains.annotations.Nullable;
 
-import static 
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBGRID;
-
 /**
  * Interop compute.
  */
@@ -291,8 +289,6 @@ public class PlatformCompute extends PlatformAbstractTarget 
{
                 ((PlatformBalancingMultiClosureTask)task).jobs(jobs);
         }
 
-        platformCtx.kernalContext().task().setThreadContext(TC_SUBGRID, 
computeForPlatform.clusterGroup().nodes());
-
         return executeNative0(task);
     }
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java
index 3e46c80290b..3ff4f8ab5f0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java
@@ -121,7 +121,7 @@ import static 
org.apache.ignite.internal.processors.rest.GridRestCommand.CACHE_S
 import static 
org.apache.ignite.internal.processors.rest.GridRestCommand.CACHE_UPDATE_TLL;
 import static 
org.apache.ignite.internal.processors.rest.GridRestCommand.DESTROY_CACHE;
 import static 
org.apache.ignite.internal.processors.rest.GridRestCommand.GET_OR_CREATE_CACHE;
-import static 
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_NO_FAILOVER;
+import static 
org.apache.ignite.internal.processors.task.TaskExecutionOptions.options;
 
 /**
  * Command handler for API requests.
@@ -762,11 +762,11 @@ public class GridCacheCommandHandler extends 
GridRestCommandHandlerAdapter {
         else {
             ClusterGroup prj = 
ctx.grid().cluster().forPredicate(F.nodeForNodeId(destId));
 
-            ctx.task().setThreadContext(TC_NO_FAILOVER, true);
-
-            return ctx.closure().callAsync(BALANCE,
+            return ctx.closure().callAsync(
+                BALANCE,
                 new FlaggedCacheOperationCallable(cacheName, cacheFlags, op, 
key),
-                prj.nodes());
+                options(prj.nodes()).withFailoverDisabled()
+            );
         }
     }
 
@@ -796,11 +796,11 @@ public class GridCacheCommandHandler extends 
GridRestCommandHandlerAdapter {
         else {
             ClusterGroup prj = 
ctx.grid().cluster().forPredicate(F.nodeForNodeId(destId));
 
-            ctx.task().setThreadContext(TC_NO_FAILOVER, true);
-
-            return ctx.closure().callAsync(BALANCE,
+            return ctx.closure().callAsync(
+                BALANCE,
                 new CacheOperationCallable(cacheName, op, key),
-                prj.nodes());
+                options(prj.nodes()).withFailoverDisabled()
+            );
         }
     }
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
index 4994307b60e..c92986e14b3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
@@ -77,8 +77,7 @@ import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.SYS
 import static org.apache.ignite.internal.processors.rest.GridRestCommand.EXE;
 import static org.apache.ignite.internal.processors.rest.GridRestCommand.NOOP;
 import static 
org.apache.ignite.internal.processors.rest.GridRestCommand.RESULT;
-import static 
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_NO_FAILOVER;
-import static 
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_TIMEOUT;
+import static 
org.apache.ignite.internal.processors.task.TaskExecutionOptions.options;
 import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q;
 
 /**
@@ -216,22 +215,20 @@ public class GridTaskCommandHandler extends 
GridRestCommandHandlerAdapter {
                 final IgniteInternalFuture<Object> taskFut;
 
                 if (locExec) {
-                    ctx.task().setThreadContext(TC_TIMEOUT, timeout);
-
                     Object arg = !F.isEmpty(params) ? params.size() == 1 ? 
params.get(0) : params.toArray() : null;
 
-                    taskFut = ctx.task().execute(name, arg);
+                    taskFut = ctx.task().execute(name, arg, 
options().withTimeout(timeout));
                 }
                 else {
                     // Using predicate instead of node intentionally
                     // in order to provide user well-structured 
EmptyProjectionException.
                     ClusterGroup prj = 
ctx.grid().cluster().forPredicate(F.nodeForNodeId(req.destinationId()));
 
-                    ctx.task().setThreadContext(TC_NO_FAILOVER, true);
-
                     taskFut = ctx.closure().callAsync(
                         BALANCE,
-                        new ExeCallable(name, params, timeout), prj.nodes());
+                        new ExeCallable(name, params, timeout),
+                        options(prj.nodes()).withFailoverDisabled()
+                    );
                 }
 
                 if (async) {
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
index f9fa1cd9db1..882c46fb85a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
@@ -47,7 +47,6 @@ import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.binary.BinaryArray;
 import org.apache.ignite.internal.binary.BinaryMarshaller;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
-import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.processors.metric.impl.HistogramMetricImpl;
 import org.apache.ignite.internal.processors.platform.PlatformNativeException;
 import org.apache.ignite.internal.processors.platform.services.PlatformService;
@@ -64,7 +63,8 @@ import org.apache.ignite.services.Service;
 import org.apache.ignite.services.ServiceCallInterceptor;
 import org.jetbrains.annotations.Nullable;
 
-import static 
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_IO_POLICY;
+import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.SERVICE_POOL;
+import static 
org.apache.ignite.internal.processors.task.TaskExecutionOptions.options;
 
 /**
  * Wrapper for making {@link org.apache.ignite.services.Service} class proxies.
@@ -218,16 +218,16 @@ public class GridServiceProxy<T> implements Serializable {
                         }
                     }
                     else {
-                        ctx.task().setThreadContext(TC_IO_POLICY, 
GridIoPolicy.SERVICE_POOL);
-
                         // Execute service remotely.
-                        return 
unmarshalResult(ctx.closure().callAsyncNoFailover(
-                            GridClosureCallMode.BROADCAST,
-                            new ServiceProxyCallable(methodName(mtd), name, 
mtd.getParameterTypes(), args, callAttrs),
-                            Collections.singleton(node),
-                            false,
-                            waitTimeout,
-                            true).get());
+                        return unmarshalResult(ctx.closure().callAsync(
+                                GridClosureCallMode.BROADCAST,
+                                new ServiceProxyCallable(methodName(mtd), 
name, mtd.getParameterTypes(), args, callAttrs),
+                                options(Collections.singleton(node))
+                                    .withPool(SERVICE_POOL)
+                                    .withFailoverDisabled()
+                                    .withTimeout(waitTimeout)
+                                    .withAuthenticationDisabled()
+                            ).get());
                     }
                 }
                 catch (InvocationTargetException e) {
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
index 6d4cd232bb4..46425bf4a32 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.task;
 
 import java.util.Collection;
 import java.util.Collections;
-import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -106,11 +105,7 @@ import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.SYS
 import static 
org.apache.ignite.internal.processors.cache.GridCacheUtils.isPersistenceEnabled;
 import static 
org.apache.ignite.internal.processors.metric.GridMetricManager.SYS_METRICS;
 import static 
org.apache.ignite.internal.processors.security.SecurityUtils.securitySubjectId;
-import static 
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SKIP_AUTH;
-import static 
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBGRID;
-import static 
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBGRID_PREDICATE;
-import static 
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_TASK_NAME;
-import static 
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_TIMEOUT;
+import static 
org.apache.ignite.internal.processors.task.TaskExecutionOptions.options;
 
 /**
  * This class defines task processor.
@@ -128,10 +123,6 @@ public class GridTaskProcessor extends 
GridProcessorAdapter implements IgniteCha
     /** Wait for 5 seconds to allow discovery to take effect (best effort). */
     private static final long DISCO_TIMEOUT = 5000;
 
-    /** */
-    private static final Map<GridTaskThreadContextKey, Object> EMPTY_ENUM_MAP =
-        new EnumMap<>(GridTaskThreadContextKey.class);
-
     /** */
     private final Marshaller marsh;
 
@@ -150,9 +141,6 @@ public class GridTaskProcessor extends GridProcessorAdapter 
implements IgniteCha
     /** Total executed tasks metric. */
     private final LongAdderMetric execTasks;
 
-    /** */
-    private final ThreadLocal<Map<GridTaskThreadContextKey, Object>> thCtx = 
new ThreadLocal<>();
-
     /** */
     private final GridSpinReadWriteLock lock = new GridSpinReadWriteLock();
 
@@ -359,52 +347,6 @@ public class GridTaskProcessor extends 
GridProcessorAdapter implements IgniteCha
             log.debug("Stopped task processor.");
     }
 
-    /**
-     * Sets the thread-local context value.
-     *
-     * @param key Key.
-     * @param val Value.
-     */
-    public void setThreadContext(GridTaskThreadContextKey key, Object val) {
-        assert key != null;
-        assert val != null;
-
-        Map<GridTaskThreadContextKey, Object> map = thCtx.get();
-
-        // NOTE: access to 'map' is always single-threaded since it's held
-        // in a thread local.
-        if (map == null)
-            thCtx.set(map = new EnumMap<>(GridTaskThreadContextKey.class));
-
-        map.put(key, val);
-    }
-
-    /**
-     * Sets the thread-local context value, if it is not null.
-     *
-     * @param key Key.
-     * @param val Value.
-     */
-    public void setThreadContextIfNotNull(GridTaskThreadContextKey key, 
@Nullable Object val) {
-        if (val != null)
-            setThreadContext(key, val);
-    }
-
-    /**
-     * Gets thread-local context value for a given {@code key}.
-     *
-     * @param key Thread-local context key.
-     * @return Thread-local context value associated with given {@code key} - 
or {@code null}
-     *      if value with given {@code key} doesn't exist.
-     */
-    @Nullable public <T> T getThreadContext(GridTaskThreadContextKey key) {
-        assert (key != null);
-
-        Map<GridTaskThreadContextKey, Object> map = thCtx.get();
-
-        return map == null ? null : (T)map.get(key);
-    }
-
     /**
      * Gets currently used deployments.
      *
@@ -446,19 +388,22 @@ public class GridTaskProcessor extends 
GridProcessorAdapter implements IgniteCha
      * @param <R> Task return value type.
      */
     public <T, R> ComputeTaskInternalFuture<R> execute(Class<? extends 
ComputeTask<T, R>> taskCls, @Nullable T arg) {
-        return execute(taskCls, arg, null);
+        return execute(taskCls, arg, options());
     }
 
     /**
      * @param taskCls Task class.
      * @param arg Optional execution argument.
-     * @param execName Name of the custom executor.
+     * @param opts Task execution options.
      * @return Task future.
      * @param <T> Task argument type.
      * @param <R> Task return value type.
      */
-    public <T, R> ComputeTaskInternalFuture<R> execute(Class<? extends 
ComputeTask<T, R>> taskCls, @Nullable T arg,
-        @Nullable String execName) {
+    public <T, R> ComputeTaskInternalFuture<R> execute(
+        Class<? extends ComputeTask<T, R>> taskCls, 
+        @Nullable T arg,
+        TaskExecutionOptions opts
+    ) {
         assert taskCls != null;
 
         lock.readLock();
@@ -467,8 +412,7 @@ public class GridTaskProcessor extends GridProcessorAdapter 
implements IgniteCha
             if (stopping)
                 throw new IllegalStateException("Failed to execute task due to 
grid shutdown: " + taskCls);
 
-            return startTask(null, taskCls, null, 
IgniteUuid.fromUuid(ctx.localNodeId()), arg,
-                false, execName);
+            return startTask(null, taskCls, null, 
IgniteUuid.fromUuid(ctx.localNodeId()), arg, opts);
         }
         finally {
             lock.readUnlock();
@@ -483,52 +427,25 @@ public class GridTaskProcessor extends 
GridProcessorAdapter implements IgniteCha
      * @param <R> Task return value type.
      */
     public <T, R> ComputeTaskInternalFuture<R> execute(ComputeTask<T, R> task, 
@Nullable T arg) {
-        return execute(task, arg, false, null);
-    }
-
-    /**
-     * @param task Actual task.
-     * @param arg Optional task argument.
-     * @param execName Name of the custom executor.
-     * @return Task future.
-     * @param <T> Task argument type.
-     * @param <R> Task return value type.
-     */
-    public <T, R> ComputeTaskInternalFuture<R> execute(ComputeTask<T, R> task, 
@Nullable T arg, String execName) {
-        return execute(task, arg, false, execName);
+        return execute(task, arg, options());
     }
 
     /**
      * @param task Actual task.
      * @param arg Optional task argument.
-     * @param sys If {@code true}, then system pool will be used.
+     * @param opts Task execution options.
      * @return Task future.
      * @param <T> Task argument type.
      * @param <R> Task return value type.
      */
-    public <T, R> ComputeTaskInternalFuture<R> execute(ComputeTask<T, R> task, 
@Nullable T arg, boolean sys) {
-        return execute(task, arg, sys, null);
-    }
-
-    /**
-     * @param task Actual task.
-     * @param arg Optional task argument.
-     * @param sys If {@code true}, then system pool will be used.
-     * @param execName Name of the custom executor.
-     * @return Task future.
-     * @param <T> Task argument type.
-     * @param <R> Task return value type.
-     */
-    public <T, R> ComputeTaskInternalFuture<R> execute(ComputeTask<T, R> task, 
@Nullable T arg, boolean sys,
-        @Nullable String execName) {
+    public <T, R> ComputeTaskInternalFuture<R> execute(ComputeTask<T, R> task, 
@Nullable T arg, TaskExecutionOptions opts) {
         lock.readLock();
 
         try {
             if (stopping)
                 throw new IllegalStateException("Failed to execute task due to 
grid shutdown: " + task);
 
-            return startTask(null, null, task, 
IgniteUuid.fromUuid(ctx.localNodeId()), arg,
-                sys, execName);
+            return startTask(null, null, task, 
IgniteUuid.fromUuid(ctx.localNodeId()), arg, opts);
         }
         finally {
             lock.readUnlock();
@@ -567,18 +484,18 @@ public class GridTaskProcessor extends 
GridProcessorAdapter implements IgniteCha
      * @param <R> Task return value type.
      */
     public <T, R> ComputeTaskInternalFuture<R> execute(String taskName, 
@Nullable T arg) {
-        return execute(taskName, arg, null);
+        return execute(taskName, arg, options());
     }
 
     /**
      * @param taskName Task name.
      * @param arg Optional execution argument.
-     * @param execName Name of the custom executor.
+     * @param opts Task execution options.
      * @return Task future.
      * @param <T> Task argument type.
      * @param <R> Task return value type.
      */
-    public <T, R> ComputeTaskInternalFuture<R> execute(String taskName, 
@Nullable T arg, @Nullable String execName) {
+    public <T, R> ComputeTaskInternalFuture<R> execute(String taskName, 
@Nullable T arg, @Nullable TaskExecutionOptions opts) {
         assert taskName != null;
 
         lock.readLock();
@@ -587,8 +504,7 @@ public class GridTaskProcessor extends GridProcessorAdapter 
implements IgniteCha
             if (stopping)
                 throw new IllegalStateException("Failed to execute task due to 
grid shutdown: " + taskName);
 
-            return startTask(taskName, null, null, 
IgniteUuid.fromUuid(ctx.localNodeId()), arg,
-                false, execName);
+            return startTask(taskName, null, null, 
IgniteUuid.fromUuid(ctx.localNodeId()), arg, opts);
         }
         finally {
             lock.readUnlock();
@@ -599,10 +515,8 @@ public class GridTaskProcessor extends 
GridProcessorAdapter implements IgniteCha
      * @param taskName Task name.
      * @param taskCls Task class.
      * @param task Task.
-     * @param sesId Task session ID.
      * @param arg Optional task argument.
-     * @param sys If {@code true}, then system pool will be used.
-     * @param execName Name of the custom executor.
+     * @param opts Task execution options.
      * @return Task future.
      */
     private <T, R> ComputeTaskInternalFuture<R> startTask(
@@ -611,8 +525,8 @@ public class GridTaskProcessor extends GridProcessorAdapter 
implements IgniteCha
         @Nullable ComputeTask<T, R> task,
         IgniteUuid sesId,
         @Nullable T arg,
-        boolean sys,
-        @Nullable String execName) {
+        @Nullable TaskExecutionOptions opts
+    ) {
         assert sesId != null;
 
         String taskClsName;
@@ -626,21 +540,12 @@ public class GridTaskProcessor extends 
GridProcessorAdapter implements IgniteCha
         else
             taskClsName = taskCls != null ? taskCls.getName() : taskName;
 
-        // Get values from thread-local context.
-        Map<GridTaskThreadContextKey, Object> map = thCtx.get();
-
-        if (map == null)
-            map = EMPTY_ENUM_MAP;
-        else
-            // Reset thread-local context.
-            thCtx.set(null);
-
-        if (map.get(TC_SKIP_AUTH) == null)
+        if (!opts.isAuthenticationDisabled())
             ctx.security().authorize(taskClsName, 
SecurityPermission.TASK_EXECUTE);
 
-        Long timeout = (Long)map.get(TC_TIMEOUT);
+        assert opts.timeout() >= 0;
 
-        long timeout0 = timeout == null || timeout == 0 ? Long.MAX_VALUE : 
timeout;
+        long timeout0 = opts.timeout() == 0 ? Long.MAX_VALUE : opts.timeout();
 
         long startTime = U.currentTimeMillis();
 
@@ -694,7 +599,7 @@ public class GridTaskProcessor extends GridProcessorAdapter 
implements IgniteCha
                     throw new IgniteDeploymentCheckedException("Failed to 
auto-deploy task " +
                         "(was task (re|un)deployed?): " + taskCls);
 
-                taskName = taskName(dep, taskCls, map);
+                taskName = taskName(dep, taskCls, opts);
             }
             catch (IgniteCheckedException e) {
                 taskName = taskCls.getName();
@@ -734,7 +639,7 @@ public class GridTaskProcessor extends GridProcessorAdapter 
implements IgniteCha
                     throw new IgniteDeploymentCheckedException("Failed to 
auto-deploy task " +
                         "(was task (re|un)deployed?): " + cls);
 
-                taskName = taskName(dep, taskCls, map);
+                taskName = taskName(dep, taskCls, opts);
             }
             catch (IgniteCheckedException e) {
                 taskName = task.getClass().getName();
@@ -753,10 +658,10 @@ public class GridTaskProcessor extends 
GridProcessorAdapter implements IgniteCha
 
         Collection<UUID> top = null;
 
-        final IgnitePredicate<ClusterNode> topPred = 
(IgnitePredicate<ClusterNode>)map.get(TC_SUBGRID_PREDICATE);
+        final IgnitePredicate<ClusterNode> topPred = 
opts.projectionPredicate();
 
         if (topPred == null) {
-            final Collection<ClusterNode> nodes = 
(Collection<ClusterNode>)map.get(TC_SUBGRID);
+            final Collection<ClusterNode> nodes = opts.projection();
 
             top = nodes != null ? F.nodeIds(nodes) : null;
         }
@@ -783,7 +688,7 @@ public class GridTaskProcessor extends GridProcessorAdapter 
implements IgniteCha
             emptyMap(),
             fullSup,
             internal,
-            execName,
+            opts.executor(),
             ctx.security().enabled() ? 
ctx.security().securityContext().subject().login() : null
         );
 
@@ -813,7 +718,7 @@ public class GridTaskProcessor extends GridProcessorAdapter 
implements IgniteCha
                     task,
                     dep,
                     new TaskEventListener(),
-                    map,
+                    opts,
                     securitySubjectId(ctx));
 
                 GridTaskWorker<?, ?> taskWorker0 = tasks.putIfAbsent(sesId, 
taskWorker);
@@ -842,7 +747,7 @@ public class GridTaskProcessor extends GridProcessorAdapter 
implements IgniteCha
                     if (dep.annotation(taskCls, ComputeTaskMapAsync.class) != 
null) {
                         try {
                             // Start task execution in another thread.
-                            if (sys)
+                            if (opts.isSystemTask())
                                 
ctx.pools().getSystemExecutorService().execute(taskWorker);
                             else
                                 
ctx.pools().getExecutorService().execute(taskWorker);
@@ -906,15 +811,14 @@ public class GridTaskProcessor extends 
GridProcessorAdapter implements IgniteCha
      *
      * @param dep Deployment.
      * @param cls Class.
-     * @param map Thread context map.
+     * @param opts Task execution options.
      * @return Task name.
      * @throws IgniteCheckedException If {@link @ComputeTaskName} annotation 
is found, but has empty value.
      */
-    private String taskName(GridDeployment dep, Class<?> cls,
-        Map<GridTaskThreadContextKey, Object> map) throws 
IgniteCheckedException {
+    private String taskName(GridDeployment dep, Class<?> cls, 
TaskExecutionOptions opts) throws IgniteCheckedException {
         assert dep != null;
         assert cls != null;
-        assert map != null;
+        assert opts != null;
 
         String taskName;
 
@@ -928,7 +832,7 @@ public class GridTaskProcessor extends GridProcessorAdapter 
implements IgniteCha
                     " cannot be empty for class: " + cls);
         }
         else
-            taskName = map.containsKey(TC_TASK_NAME) ? 
(String)map.get(TC_TASK_NAME) : cls.getName();
+            taskName = opts.name().orElse(cls.getName());
 
         return taskName;
     }
@@ -1015,7 +919,7 @@ public class GridTaskProcessor extends 
GridProcessorAdapter implements IgniteCha
      * @param ses Task session.
      * @throws IgniteCheckedException If send to any of the jobs failed.
      */
-    @SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter", 
"BusyWait"})
+    @SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter"})
     private void sendSessionAttributes(Map<?, ?> attrs, GridTaskSessionImpl 
ses)
         throws IgniteCheckedException {
         assert attrs != null;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskThreadContextKey.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskThreadContextKey.java
deleted file mode 100644
index 59249eca9c9..00000000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskThreadContextKey.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.task;
-
-/**
- * Defines keys for thread-local context in task processor.
- */
-public enum GridTaskThreadContextKey {
-    /** Task name. */
-    TC_TASK_NAME,
-
-    /** No failover flag. */
-    TC_NO_FAILOVER,
-
-    /** No result cache flag. */
-    TC_NO_RESULT_CACHE,
-
-    /** Projection for the task. */
-    TC_SUBGRID,
-
-    /** Projection predicate for the task. */
-    TC_SUBGRID_PREDICATE,
-
-    /** Timeout in milliseconds associated with the task. */
-    TC_TIMEOUT,
-
-    /** IO manager policy. */
-    TC_IO_POLICY,
-
-    /** Skip authorization for the task. */
-    TC_SKIP_AUTH
-}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index 8b8edcf3c50..fff823d2d37 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -107,9 +107,6 @@ import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.PUB
 import static 
org.apache.ignite.internal.processors.job.ComputeJobStatusEnum.CANCELLED;
 import static 
org.apache.ignite.internal.processors.job.ComputeJobStatusEnum.FAILED;
 import static 
org.apache.ignite.internal.processors.job.ComputeJobStatusEnum.FINISHED;
-import static 
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_IO_POLICY;
-import static 
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_NO_FAILOVER;
-import static 
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_NO_RESULT_CACHE;
 
 /**
  * Grid task worker. Handles full task life cycle.
@@ -177,8 +174,8 @@ public class GridTaskWorker<T, R> extends GridWorker 
implements GridTimeoutObjec
     /** Task class. */
     private final Class<?> taskCls;
 
-    /** Optional subgrid. */
-    private final Map<GridTaskThreadContextKey, Object> thCtx;
+    /** Task execution options. */
+    private final TaskExecutionOptions opts;
 
     /** */
     private ComputeTask<T, R> task;
@@ -285,7 +282,7 @@ public class GridTaskWorker<T, R> extends GridWorker 
implements GridTimeoutObjec
      * @param task Task instance that might be null.
      * @param dep Deployed task.
      * @param evtLsnr Event listener.
-     * @param thCtx Thread-local context from task processor.
+     * @param opts Task execution options.
      * @param subjId Subject ID.
      */
     GridTaskWorker(
@@ -297,7 +294,7 @@ public class GridTaskWorker<T, R> extends GridWorker 
implements GridTimeoutObjec
         @Nullable ComputeTask<T, R> task,
         GridDeployment dep,
         GridTaskEventListener evtLsnr,
-        @Nullable Map<GridTaskThreadContextKey, Object> thCtx,
+        TaskExecutionOptions opts,
         UUID subjId) {
         super(ctx.config().getIgniteInstanceName(), "grid-task-worker", 
ctx.log(GridTaskWorker.class));
 
@@ -314,7 +311,7 @@ public class GridTaskWorker<T, R> extends GridWorker 
implements GridTimeoutObjec
         this.task = task;
         this.dep = dep;
         this.evtLsnr = evtLsnr;
-        this.thCtx = thCtx;
+        this.opts = opts;
         this.subjId = subjId;
 
         log = U.logger(ctx, logRef, this);
@@ -323,13 +320,9 @@ public class GridTaskWorker<T, R> extends GridWorker 
implements GridTimeoutObjec
 
         boolean noResCacheAnnotation = dep.annotation(taskCls, 
ComputeTaskNoResultCache.class) != null;
 
-        Boolean noResCacheCtxFlag = getThreadContext(TC_NO_RESULT_CACHE);
+        resCache = !(noResCacheAnnotation || opts.isResultCacheDisabled());
 
-        resCache = !(noResCacheAnnotation || (noResCacheCtxFlag != null && 
noResCacheCtxFlag));
-
-        Boolean noFailover = getThreadContext(TC_NO_FAILOVER);
-
-        this.noFailover = noFailover != null ? noFailover : false;
+        this.noFailover = opts.isFailoverDisabled();
 
         if (task instanceof AffinityTask) {
             AffinityTask affTask = (AffinityTask)task;
@@ -356,16 +349,6 @@ public class GridTaskWorker<T, R> extends GridWorker 
implements GridTimeoutObjec
         }
     }
 
-    /**
-     * Gets value from thread-local context.
-     *
-     * @param key Thread-local context key.
-     * @return Thread-local context value, if any.
-     */
-    @Nullable private <V> V getThreadContext(GridTaskThreadContextKey key) {
-        return thCtx == null ? null : (V)thCtx.get(key);
-    }
-
     /**
      * @return Task session ID.
      */
@@ -1448,14 +1431,8 @@ public class GridTaskWorker<T, R> extends GridWorker 
implements GridTimeoutObjec
 
                         if (internal)
                             plc = MANAGEMENT_POOL;
-                        else {
-                            Byte ctxPlc = getThreadContext(TC_IO_POLICY);
-
-                            if (ctxPlc != null)
-                                plc = ctxPlc;
-                            else
-                                plc = PUBLIC_POOL;
-                        }
+                        else
+                            plc = opts.pool().orElse(PUBLIC_POOL);
 
                         // Send job execution request.
                         ctx.io().sendToGridTopic(node, TOPIC_JOB, req, plc);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/TaskExecutionOptions.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/TaskExecutionOptions.java
new file mode 100644
index 00000000000..f30630d4423
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/TaskExecutionOptions.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.task;
+
+import java.util.Collection;
+import java.util.Optional;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.lang.IgnitePredicate;
+
+/** */
+public class TaskExecutionOptions {
+    /** */
+    private String name;
+
+    /** */
+    private long timeout;
+
+    /** */
+    private String execName;
+
+    /** */
+    private Byte pool;
+
+    /** */
+    private Collection<ClusterNode> projection;
+
+    /** */
+    private IgnitePredicate<ClusterNode> projectionPredicate;
+
+    /** */
+    private boolean isFailoverDisabled;
+
+    /** */
+    private boolean isResultCacheDisabled;
+
+    /** */
+    private boolean isSysTask;
+
+    /** */
+    private boolean isAuthDisabled;
+
+    /** */
+    private TaskExecutionOptions() {}
+
+    /** */
+    public static TaskExecutionOptions options() {
+        return new TaskExecutionOptions();
+    }
+
+    /** */
+    public static TaskExecutionOptions options(Collection<ClusterNode> 
projection) {
+        return new TaskExecutionOptions().withProjection(projection);
+    }
+
+    /** */
+    public long timeout() {
+        return timeout;
+    }
+
+    /** */
+    public TaskExecutionOptions withTimeout(long timeout) {
+        this.timeout = timeout;
+
+        return this;
+    }
+
+    /** */
+    public Optional<String> name() {
+        return Optional.ofNullable(name);
+    }
+
+    /** */
+    public TaskExecutionOptions withName(String name) {
+        this.name = name;
+
+        return this;
+    }
+
+    /** */
+    public Collection<ClusterNode> projection() {
+        return projection;
+    }
+
+    /** */
+    public TaskExecutionOptions withProjection(Collection<ClusterNode> 
projection) {
+        this.projection = projection;
+
+        return this;
+    }
+
+    /** */
+    public IgnitePredicate<ClusterNode> projectionPredicate() {
+        return projectionPredicate;
+    }
+
+    /** */
+    public TaskExecutionOptions 
withProjectionPredicate(IgnitePredicate<ClusterNode> projectionPredicate) {
+        this.projectionPredicate = projectionPredicate;
+
+        return this;
+    }
+
+    /** */
+    public String executor() {
+        return execName;
+    }
+
+    /** */
+    public TaskExecutionOptions withExecutor(String execName) {
+        this.execName = execName;
+
+        return this;
+    }
+
+    /** */
+    public Optional<Byte> pool() {
+        return Optional.ofNullable(pool);
+    }
+
+    /** */
+    public TaskExecutionOptions withPool(byte pool) {
+        this.pool = pool;
+
+        return this;
+    }
+
+    /** */
+    public boolean isFailoverDisabled() {
+        return isFailoverDisabled;
+    }
+
+    /** */
+    public TaskExecutionOptions withFailoverDisabled() {
+        isFailoverDisabled = true;
+
+        return this;
+    }
+
+    /** */
+    public boolean isResultCacheDisabled() {
+        return isResultCacheDisabled;
+    }
+
+    /** */
+    public TaskExecutionOptions withResultCacheDisabled() {
+        isResultCacheDisabled = true;
+
+        return this;
+    }
+
+    /** */
+    public boolean isSystemTask() {
+        return isSysTask;
+    }
+
+    /** */
+    public TaskExecutionOptions asSystemTask() {
+        isSysTask = true;
+
+        return this;
+    }
+
+    /** */
+    public boolean isAuthenticationDisabled() {
+        return isAuthDisabled;
+    }
+
+    /** */
+    public TaskExecutionOptions withAuthenticationDisabled() {
+        isAuthDisabled = true;
+
+        return this;
+    }
+}
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties 
b/modules/core/src/main/resources/META-INF/classnames.properties
index 2d74bc0a629..be011d4bf45 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -1791,10 +1791,10 @@ 
org.apache.ignite.internal.processors.service.ServiceSingleNodeDeploymentResult
 
org.apache.ignite.internal.processors.service.ServiceSingleNodeDeploymentResultBatch
 org.apache.ignite.internal.processors.service.ServiceUndeploymentRequest
 org.apache.ignite.internal.processors.task.GridTaskProcessor$1
-org.apache.ignite.internal.processors.task.GridTaskThreadContextKey
 org.apache.ignite.internal.processors.task.GridTaskWorker$2
 org.apache.ignite.internal.processors.task.GridTaskWorker$4
 org.apache.ignite.internal.processors.task.GridTaskWorker$State
+org.apache.ignite.internal.processors.task.TaskExecutionOptions
 org.apache.ignite.internal.processors.task.monitor.ComputeTaskStatusEnum
 org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor$2
 org.apache.ignite.internal.processors.tracing.SpanType
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/GridContinuousTaskSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/GridContinuousTaskSelfTest.java
index efe8baf1d6a..a1d5a5bc667 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/GridContinuousTaskSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/GridContinuousTaskSelfTest.java
@@ -58,6 +58,9 @@ import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import org.junit.Test;
 
+import static org.apache.ignite.internal.GridClosureCallMode.BALANCE;
+import static 
org.apache.ignite.internal.processors.task.TaskExecutionOptions.options;
+
 /**
  * Continuous task test.
  */
@@ -203,15 +206,18 @@ public class GridContinuousTaskSelfTest extends 
GridCommonAbstractTest {
         try {
             IgniteEx ign = startGrid(0);
 
-            ComputeTaskInternalFuture<String> fut = 
ign.context().closure().callAsync(GridClosureCallMode.BALANCE, new 
Callable<String>() {
-                /** */
-                @IgniteInstanceResource
-                private IgniteEx g;
+            IgniteInternalFuture<String> fut = 
ign.context().closure().callAsync(
+                BALANCE,
+                new Callable<String>() {
+                    @IgniteInstanceResource
+                    private IgniteEx g;
 
-                @Override public String call() throws Exception {
-                    return 
g.compute(g.cluster()).execute(NestedHoldccTask.class, null);
-                }
-            }, ign.cluster().nodes());
+                    @Override public String call() throws Exception {
+                        return 
g.compute(g.cluster()).execute(NestedHoldccTask.class, null);
+                    }
+                },
+                options(ign.cluster().nodes())
+            );
 
             assertEquals("DONE", fut.get(3000));
         }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeTopologyExceptionTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeTopologyExceptionTest.java
index 0b9b7802b60..28c378d2aac 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeTopologyExceptionTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeTopologyExceptionTest.java
@@ -29,6 +29,7 @@ import 
org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Test;
 
 import static org.apache.ignite.internal.GridClosureCallMode.BALANCE;
+import static 
org.apache.ignite.internal.processors.task.TaskExecutionOptions.options;
 
 /**
  *
@@ -82,7 +83,8 @@ public class IgniteComputeTopologyExceptionTest extends 
GridCommonAbstractTest {
 
         stopGrid(1);
 
-        IgniteInternalFuture<?> fut = 
ignite0.context().closure().callAsyncNoFailover(BALANCE,
+        IgniteInternalFuture<?> fut = ignite0.context().closure().callAsync(
+            BALANCE,
             new IgniteCallable<Object>() {
                 @Override public Object call() throws Exception {
                     fail("Should not be called.");
@@ -90,9 +92,8 @@ public class IgniteComputeTopologyExceptionTest extends 
GridCommonAbstractTest {
                     return null;
                 }
             },
-            nodes,
-            false,
-            0, false);
+            options(nodes).withFailoverDisabled()
+        );
 
         try {
             fut.get();
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/TaskOptionsPropagationTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/TaskOptionsPropagationTest.java
new file mode 100644
index 00000000000..5e2fc4dbdb6
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/TaskOptionsPropagationTest.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.compute;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeTask;
+import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.apache.ignite.compute.ComputeTaskSession;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.util.lang.RunnableX;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteReducer;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.resources.TaskSessionResource;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+
+/** */
+public class TaskOptionsPropagationTest extends GridCommonAbstractTest {
+    /** */
+    private static final String TEST_TASK_NAME = "test-name";
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrid();
+
+        grid().createCache(DEFAULT_CACHE_NAME).put(0, 0);
+    }
+
+    /** */
+    @Test
+    public void testUserTaskOptionsWithPrecedingSystemTaskExecution() throws 
Exception {
+        try (IgniteEx cli = startClientGrid(1)) {
+            cli.compute().withName(TEST_TASK_NAME).affinityCall(
+                DEFAULT_CACHE_NAME,
+                0,
+                new TestCallable(TEST_TASK_NAME)
+            );
+        }
+    }
+
+    /** */
+    @Test
+    public void testComputeSharedAcrossMultipleThreads() throws Exception {
+        IgniteCompute compute = grid().compute();
+
+        compute.withName(TEST_TASK_NAME);
+
+        runAsync(() -> compute.call(new 
TestCallable(TestCallable.class.getName()))).get();
+
+        compute.call(new TestCallable(TEST_TASK_NAME));
+    }
+
+    /** */
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testTaskExecutionOptionsReset() throws Exception {
+        check(ComputeTask.class, (c, t) -> c.execute(t, null));
+        check(ComputeTask.class, (c, t) -> c.executeAsync(t, null).get());
+
+        check(IgniteCallable.class, IgniteCompute::call);
+        check(IgniteCallable.class, (c, t) -> c.callAsync(t).get());
+
+        check(IgniteCallable.class, (c, t) -> 
c.call((toList((IgniteCallable<Void>)t))));
+        check(IgniteCallable.class, (c, t) -> 
c.callAsync(toList((IgniteCallable<Void>)t)).get());
+
+        check(IgniteCallable.class, (c, t) -> 
c.call(toList((IgniteCallable<Void>)t), new TestReducer()));
+        check(IgniteCallable.class, (c, t) -> 
c.callAsync(toList((IgniteCallable<Void>)t), new TestReducer()).get());
+
+        check(IgniteRunnable.class, IgniteCompute::run);
+        check(IgniteRunnable.class, (c, t) -> c.runAsync(t).get());
+
+        check(IgniteRunnable.class, (c, t) -> c.run(toList(t)));
+        check(IgniteRunnable.class, (c, t) -> c.runAsync(toList(t)).get());
+
+        check(IgniteRunnable.class, IgniteCompute::broadcast);
+        check(IgniteRunnable.class, (c, t) -> c.broadcastAsync(t).get());
+
+        check(IgniteCallable.class, IgniteCompute::broadcast);
+        check(IgniteCallable.class, (c, t) -> c.broadcastAsync(t).get());
+
+        check(IgniteClosure.class, ((c, t) -> c.broadcast(t, null)));
+        check(IgniteClosure.class, (c, t) -> c.broadcastAsync(t, null).get());
+
+        check(IgniteClosure.class, (c, t) -> c.apply(t, (Void)null));
+        check(IgniteClosure.class, (c, t) -> c.applyAsync(t, 
(Void)null).get());
+
+        check(IgniteClosure.class, (c, t) -> c.apply(t, singletonList(null), 
new TestReducer()));
+        check(IgniteClosure.class, (c, t) -> c.applyAsync(t, 
singletonList(null), new TestReducer()).get());
+
+        check(IgniteRunnable.class, (c, t) -> 
c.affinityRun(DEFAULT_CACHE_NAME, "key", t));
+        check(IgniteRunnable.class, (c, t) -> 
c.affinityRunAsync(DEFAULT_CACHE_NAME, "key", t).get());
+
+        check(IgniteRunnable.class, (c, t) -> 
c.affinityRun(singletonList(DEFAULT_CACHE_NAME), "key", t));
+        check(IgniteRunnable.class, (c, t) -> 
c.affinityRunAsync(singletonList(DEFAULT_CACHE_NAME), "key", t).get());
+
+        check(IgniteRunnable.class, (c, t) -> 
c.affinityRun(singletonList(DEFAULT_CACHE_NAME), 0, t));
+        check(IgniteRunnable.class, (c, t) -> 
c.affinityRunAsync(singletonList(DEFAULT_CACHE_NAME), 0, t).get());
+
+        check(IgniteCallable.class, (c, t) -> 
c.affinityCall(DEFAULT_CACHE_NAME, "key", t));
+        check(IgniteCallable.class, (c, t) -> 
c.affinityCallAsync(DEFAULT_CACHE_NAME, "key", t).get());
+
+        check(IgniteCallable.class, (c, t) -> 
c.affinityCall(singletonList(DEFAULT_CACHE_NAME), "key", t));
+        check(IgniteCallable.class, (c, t) -> 
c.affinityCallAsync(singletonList(DEFAULT_CACHE_NAME), "key", t).get());
+
+        check(IgniteCallable.class, (c, t) -> 
c.affinityCall(singletonList(DEFAULT_CACHE_NAME), 0, t));
+        check(IgniteCallable.class, (c, t) -> 
c.affinityCallAsync(singletonList(DEFAULT_CACHE_NAME), 0, t).get());
+    }
+
+    /** */
+    public <T> void check(Class<T> taskCls, ComputationConsumer<T> consumer) 
throws Exception {
+        consumer.accept(grid().compute().withName(TEST_TASK_NAME), 
getComputationObject(taskCls, TEST_TASK_NAME));
+        consumer.accept(grid().compute(), getComputationObject(taskCls, null));
+
+        assertThrows(() -> 
consumer.accept(grid().compute().withName(TEST_TASK_NAME), null));
+        consumer.accept(grid().compute(), getComputationObject(taskCls, null));
+    }
+
+    /** */
+    private static class TestCallable extends TaskNameChecker implements 
IgniteCallable<Void> {
+        /** */
+        public TestCallable(String expName) {
+            super(expName);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void call() throws Exception {
+            checkName();
+
+            return null;
+        }
+    }
+
+    /** */
+    private static class TestRunnable extends TaskNameChecker implements 
IgniteRunnable {
+        /** */
+        public TestRunnable(String expName) {
+            super(expName);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            checkName();
+        }
+    }
+
+    /** */
+    private static class TestClosure extends TaskNameChecker implements 
IgniteClosure<Void, Void> {
+        /** */
+        public TestClosure(String expName) {
+            super(expName);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void apply(Void arg) {
+            checkName();
+
+            return null;
+        }
+    }
+
+    /** */
+    private static class TestTask extends ComputeTaskAdapter<Void, Void> {
+        /** */
+        private final String name;
+
+        /** */
+        public TestTask(String name) {
+            this.name = name == null ? getClass().getName() : name;
+        }
+
+        /** {@inheritDoc} */
+        @Override public @NotNull Map<? extends ComputeJob, ClusterNode> map(
+            List<ClusterNode> subgrid,
+            @Nullable Void arg
+        ) throws IgniteException {
+            return singletonMap(new TestJob(name), subgrid.iterator().next());
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Void reduce(List list) throws 
IgniteException {
+            return null;
+        }
+
+        /** */
+        private static class TestJob extends TaskNameChecker implements 
ComputeJob {
+            /** */
+            public TestJob(String expName) {
+                super(expName);
+            }
+
+            /** {@inheritDoc} */
+            @Override public Object execute() throws IgniteException {
+                checkName();
+
+                return null;
+            }
+
+            /** {@inheritDoc} */
+            @Override public void cancel() {
+                // No-op.
+            }
+        }
+    }
+
+    /** */
+    private static class TaskNameChecker {
+        /** */
+        @TaskSessionResource
+        private ComputeTaskSession ses;
+
+        /** */
+        private final String expName;
+
+        /** */
+        public TaskNameChecker(String expName) {
+            this.expName = expName == null ? getClass().getName() : expName;
+        }
+
+        /** */
+        protected void checkName() {
+            assertEquals(expName, ses.getTaskName());
+        }
+    }
+
+    /** */
+    private void assertThrows(RunnableX r) {
+        GridTestUtils.assertThrowsWithCause(r, Exception.class);
+    }
+
+    /** */
+    private static class TestReducer implements IgniteReducer<Void, Void> {
+        /** {@inheritDoc} */
+        @Override public boolean collect(@Nullable Void o) {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void reduce() {
+            return null;
+        }
+    }
+
+    /** */
+    private static <T> Collection<T> toList(T t) {
+        return t == null ? null : singletonList(t);
+    }
+
+    /** */
+    private interface ComputationConsumer<T> {
+        /** */
+        void accept(IgniteCompute c, T t) throws Exception;
+    }
+
+    /** */
+    private <T> T getComputationObject(Class<T> taskCls, String name) {
+        if (ComputeTask.class.equals(taskCls))
+            return (T)new TestTask(name);
+        else if (IgniteClosure.class.equals(taskCls))
+            return (T)new TestClosure(name);
+        else if (IgniteCallable.class.equals(taskCls))
+            return (T)new TestCallable(name);
+        else if (IgniteRunnable.class.equals(taskCls))
+            return (T)new TestRunnable(name);
+        else
+            throw new IllegalStateException();
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
index f34166a6a09..922325aff53 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
@@ -87,6 +87,7 @@ import 
org.apache.ignite.internal.processors.compute.IgniteComputeCustomExecutor
 import 
org.apache.ignite.internal.processors.compute.IgniteComputeCustomExecutorSelfTest;
 import org.apache.ignite.internal.processors.compute.InterruptComputeJobTest;
 import 
org.apache.ignite.internal.processors.compute.PublicThreadpoolStarvationTest;
+import 
org.apache.ignite.internal.processors.compute.TaskOptionsPropagationTest;
 import org.apache.ignite.internal.util.StripedExecutorTest;
 import org.apache.ignite.p2p.GridMultinodeRedeployContinuousModeSelfTest;
 import org.apache.ignite.p2p.GridMultinodeRedeployIsolatedModeSelfTest;
@@ -183,7 +184,8 @@ import org.junit.runners.Suite;
     ComputeJobChangePriorityTest.class,
     ComputeJobStatusTest.class,
     ComputeTaskWithWithoutFullSupportTest.class,
-    InterruptComputeJobTest.class
+    InterruptComputeJobTest.class,
+    TaskOptionsPropagationTest.class
 })
 public class IgniteComputeGridTestSuite {
 }


Reply via email to