This is an automated email from the ASF dual-hosted git repository.
mpetrov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 8a36d7d255c IGNITE-27196 Added authorization of Compute Job
cancellation (#12542)
8a36d7d255c is described below
commit 8a36d7d255c1f8fc2296f7eda96b5cce554543df
Author: Mikhail Petrov <[email protected]>
AuthorDate: Wed Dec 3 11:01:22 2025 +0300
IGNITE-27196 Added authorization of Compute Job cancellation (#12542)
---
.../ignite/common/ComputeTaskPermissionsTest.java | 176 ++++++++++++++++-----
.../apache/ignite/internal/GridJobSessionImpl.java | 2 +-
.../internal/processors/job/GridJobProcessor.java | 145 +++++++++++------
.../processors/task/GridTaskProcessor.java | 2 +-
.../internal/processors/task/GridTaskWorker.java | 13 +-
5 files changed, 243 insertions(+), 95 deletions(-)
diff --git
a/modules/clients/src/test/java/org/apache/ignite/common/ComputeTaskPermissionsTest.java
b/modules/clients/src/test/java/org/apache/ignite/common/ComputeTaskPermissionsTest.java
index 390478bef79..02321a705a2 100644
---
a/modules/clients/src/test/java/org/apache/ignite/common/ComputeTaskPermissionsTest.java
+++
b/modules/clients/src/test/java/org/apache/ignite/common/ComputeTaskPermissionsTest.java
@@ -40,8 +40,11 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.ComputeJobAdapter;
import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeJobResultPolicy;
+import org.apache.ignite.compute.ComputeJobSibling;
import org.apache.ignite.compute.ComputeTask;
import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.apache.ignite.compute.ComputeTaskFuture;
import org.apache.ignite.configuration.ClientConfiguration;
import org.apache.ignite.configuration.ClientConnectorConfiguration;
import org.apache.ignite.configuration.ConnectorConfiguration;
@@ -50,6 +53,7 @@ import
org.apache.ignite.configuration.ThinClientConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.management.cache.VerifyBackupPartitionsTask;
import org.apache.ignite.internal.processors.security.AbstractSecurityTest;
+import
org.apache.ignite.internal.processors.security.AbstractTestSecurityPluginProvider;
import org.apache.ignite.internal.processors.security.OperationSecurityContext;
import org.apache.ignite.internal.processors.security.PublicAccessJob;
import org.apache.ignite.internal.processors.security.SecurityContext;
@@ -71,6 +75,8 @@ import org.apache.ignite.plugin.security.SecurityCredentials;
import org.apache.ignite.plugin.security.SecurityException;
import org.apache.ignite.plugin.security.SecurityPermissionSet;
import org.apache.ignite.plugin.security.SecurityPermissionSetBuilder;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
@@ -92,6 +98,7 @@ import static
org.apache.ignite.plugin.security.SecurityPermission.TASK_EXECUTE;
import static
org.apache.ignite.plugin.security.SecurityPermissionSetBuilder.NO_PERMISSIONS;
import static
org.apache.ignite.plugin.security.SecurityPermissionSetBuilder.create;
import static
org.apache.ignite.plugin.security.SecuritySubjectType.REMOTE_CLIENT;
+import static
org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
import static
org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
@@ -111,10 +118,10 @@ public class ComputeTaskPermissionsTest extends
AbstractSecurityTest {
private static final ComputeTask SYSTEM_TASK = new
VerifyBackupPartitionsTask();
/** */
- private static final AtomicInteger EXECUTED_TASK_CNTR = new
AtomicInteger();
+ private static final AtomicInteger EXECUTED_JOB_CNT = new AtomicInteger();
/** */
- private static final AtomicInteger CANCELLED_TASK_CNTR = new
AtomicInteger();
+ private static final AtomicInteger CANCELLED_JOB_CNT = new AtomicInteger();
/** */
private static final String CACHE = DEFAULT_CACHE_NAME;
@@ -128,6 +135,9 @@ public class ComputeTaskPermissionsTest extends
AbstractSecurityTest {
/** */
public static CountDownLatch taskUnblockedLatch;
+ /** */
+ private static ListeningTestLogger listeningLog;
+
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
@@ -138,6 +148,8 @@ public class ComputeTaskPermissionsTest extends
AbstractSecurityTest {
PublicAccessSystemJob.class
);
+ listeningLog = new ListeningTestLogger(log);
+
for (int idx = 0; idx < SRV_NODES_CNT; idx++)
startGrid(idx, false);
@@ -151,6 +163,15 @@ public class ComputeTaskPermissionsTest extends
AbstractSecurityTest {
grid(0).createCache(CACHE);
}
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(
+ String instanceName,
+ AbstractTestSecurityPluginProvider pluginProv
+ ) throws Exception {
+ return super.getConfiguration(instanceName, pluginProv)
+ .setGridLogger(listeningLog);
+ }
+
/** */
private IgniteEx startGrid(int idx, boolean isClient) throws Exception {
String login = getTestIgniteInstanceName(idx);
@@ -373,6 +394,82 @@ public class ComputeTaskPermissionsTest extends
AbstractSecurityTest {
checkCallable(c -> executorService(initiator,
executor).invokeAny(singletonList(c), getTestTimeout(), MILLISECONDS));
}
+ /** */
+ @Test
+ public void testJobCancelAuthorizationSucceeded() throws Exception {
+ taskStartedLatch = new CountDownLatch(SRV_NODES_CNT);
+ taskUnblockedLatch = new CountDownLatch(1);
+
+ CANCELLED_JOB_CNT.set(0);
+ EXECUTED_JOB_CNT.set(0);
+
+ ComputeTaskFuture<Object> fut =
grid(0).compute().executeAsync(CancelAllowedTask.class, null);
+
+ taskStartedLatch.await(getTestTimeout(), MILLISECONDS);
+
+ for (ComputeJobSibling sibling : fut.getTaskSession().getJobSiblings())
+ sibling.cancel();
+
+ fut.get(getTestTimeout());
+
+ assertTrue(waitForCondition(() -> SRV_NODES_CNT ==
CANCELLED_JOB_CNT.get(), getTestTimeout()));
+ assertEquals(0, EXECUTED_JOB_CNT.get());
+ }
+
+ /** */
+ @Test
+ public void testJobCancelAuthorizationFailed() throws Exception {
+ taskStartedLatch = new CountDownLatch(SRV_NODES_CNT);
+ taskUnblockedLatch = new CountDownLatch(1);
+
+ CANCELLED_JOB_CNT.set(0);
+ EXECUTED_JOB_CNT.set(0);
+
+ ComputeTaskFuture<Object> fut =
grid(0).compute().executeAsync(CancelForbiddenTask.class, null);
+
+ taskStartedLatch.await(getTestTimeout(), MILLISECONDS);
+
+ for (ComputeJobSibling sibling :
fut.getTaskSession().getJobSiblings()) {
+ LogListener logLsnr = LogListener.matches("Failed to cancel Ignite
Compute Task Job" +
+ " [sesId=" + fut.getTaskSession().getId() +
+ ", jobId=" + sibling.getJobId() + ']'
+ ).build();
+
+ listeningLog.registerListener(logLsnr);
+
+ // TODO https://issues.apache.org/jira/browse/IGNITE-27195
Authorization errors during Compute Job
+ // cancellation do not propagate from remote nodes back to the
one that initiated cancellation.
+ if (grid(0).context().job().activeJob(sibling.getJobId()) != null)
{
+ assertThrowsAnyCause(
+ log,
+ () -> {
+ sibling.cancel();
+
+ return null;
+ },
+ SecurityException.class,
+ "Authorization failed"
+ );
+ }
+ else
+ sibling.cancel();
+
+ logLsnr.check(getTestTimeout());
+ }
+
+ assertEquals(0, EXECUTED_JOB_CNT.get());
+ assertEquals(0, CANCELLED_JOB_CNT.get());
+
+ assertFalse(fut.isDone());
+
+ taskUnblockedLatch.countDown();
+
+ fut.get(getTestTimeout());
+
+ assertTrue(waitForCondition(() -> SRV_NODES_CNT ==
EXECUTED_JOB_CNT.get(), getTestTimeout()));
+ assertEquals(0, CANCELLED_JOB_CNT.get());
+ }
+
/** */
@Test
public void testSystemTaskCancel() throws Exception {
@@ -420,42 +517,37 @@ public class ComputeTaskPermissionsTest extends
AbstractSecurityTest {
taskStartedLatch = new CountDownLatch(expTaskCnt);
taskUnblockedLatch = new CountDownLatch(1);
- CANCELLED_TASK_CNTR.set(0);
- EXECUTED_TASK_CNTR.set(0);
+ CANCELLED_JOB_CNT.set(0);
+ EXECUTED_JOB_CNT.set(0);
- try {
- Future<?> fut = taskStarter.get();
+ Future<?> fut = taskStarter.get();
- assertTrue(taskStartedLatch.await(getTestTimeout(), MILLISECONDS));
+ assertTrue(taskStartedLatch.await(getTestTimeout(), MILLISECONDS));
- try (
- OperationSecurityContext ignored = initiator == null
- ? null
- : grid(0).context().security().withContext(initiator)
- ) {
- if (expE == null) {
- fut.cancel(true);
+ try (
+ OperationSecurityContext ignored = initiator == null
+ ? null
+ : grid(0).context().security().withContext(initiator)
+ ) {
+ if (expE == null) {
+ fut.cancel(true);
- assertTrue(fut.isCancelled());
+ assertTrue(fut.isCancelled());
- assertTrue(waitForCondition(() -> expTaskCnt ==
CANCELLED_TASK_CNTR.get(), getTestTimeout()));
+ assertTrue(waitForCondition(() -> expTaskCnt ==
CANCELLED_JOB_CNT.get(), getTestTimeout()));
- assertEquals(0, EXECUTED_TASK_CNTR.get());
- }
- else {
- assertThrowsWithCause(() -> fut.cancel(true), expE);
+ assertEquals(0, EXECUTED_JOB_CNT.get());
+ }
+ else {
+ assertThrowsWithCause(() -> fut.cancel(true), expE);
- assertFalse(fut.isCancelled());
+ assertFalse(fut.isCancelled());
- taskUnblockedLatch.countDown();
+ taskUnblockedLatch.countDown();
- assertTrue(waitForCondition(() -> expTaskCnt ==
EXECUTED_TASK_CNTR.get(), getTestTimeout()));
- }
+ assertTrue(waitForCondition(() -> expTaskCnt ==
EXECUTED_JOB_CNT.get(), getTestTimeout()));
}
}
- finally {
- taskUnblockedLatch.countDown();
- }
}
/** */
@@ -516,16 +608,16 @@ public class ComputeTaskPermissionsTest extends
AbstractSecurityTest {
/** */
private void assertCompleted(RunnableX r, int expCnt) {
- EXECUTED_TASK_CNTR.set(0);
+ EXECUTED_JOB_CNT.set(0);
r.run();
- assertEquals(expCnt, EXECUTED_TASK_CNTR.get());
+ assertEquals(expCnt, EXECUTED_JOB_CNT.get());
}
/** */
private void assertFailed(RunnableX r) {
- EXECUTED_TASK_CNTR.set(0);
+ EXECUTED_JOB_CNT.set(0);
try {
r.run();
@@ -547,7 +639,7 @@ public class ComputeTaskPermissionsTest extends
AbstractSecurityTest {
fail();
- assertEquals(0, EXECUTED_TASK_CNTR.get());
+ assertEquals(0, EXECUTED_JOB_CNT.get());
}
/** */
@@ -629,7 +721,7 @@ public class ComputeTaskPermissionsTest extends
AbstractSecurityTest {
private abstract static class AbstractRunnable implements IgniteRunnable {
/** {@inheritDoc} */
@Override public void run() {
- EXECUTED_TASK_CNTR.incrementAndGet();
+ EXECUTED_JOB_CNT.incrementAndGet();
}
}
@@ -637,7 +729,7 @@ public class ComputeTaskPermissionsTest extends
AbstractSecurityTest {
private abstract static class AbstractCallable implements
IgniteCallable<AtomicInteger> {
/** {@inheritDoc} */
@Override public AtomicInteger call() throws Exception {
- EXECUTED_TASK_CNTR.incrementAndGet();
+ EXECUTED_JOB_CNT.incrementAndGet();
return new AtomicInteger(0);
}
@@ -647,7 +739,7 @@ public class ComputeTaskPermissionsTest extends
AbstractSecurityTest {
private abstract static class AbstractClosure implements
IgniteClosure<Boolean, Boolean> {
/** {@inheritDoc} */
@Override public Boolean apply(Boolean o) {
- EXECUTED_TASK_CNTR.incrementAndGet();
+ EXECUTED_JOB_CNT.incrementAndGet();
return null;
}
@@ -681,9 +773,19 @@ public class ComputeTaskPermissionsTest extends
AbstractSecurityTest {
/** {@inheritDoc} */
@Override public @Nullable Object reduce(List<ComputeJobResult>
results) throws IgniteException {
+ for (ComputeJobResult res : results) {
+ if (!res.isCancelled() && res.getException() != null)
+ throw res.getException();
+ }
+
return null;
}
+ /** {@inheritDoc} */
+ @Override public ComputeJobResultPolicy result(ComputeJobResult res,
List<ComputeJobResult> rcvd) throws IgniteException {
+ return ComputeJobResultPolicy.WAIT;
+ }
+
/** */
protected ComputeJob job() {
return new TestJob();
@@ -707,7 +809,7 @@ public class ComputeTaskPermissionsTest extends
AbstractSecurityTest {
/** {@inheritDoc} */
@Override public Object execute() {
- EXECUTED_TASK_CNTR.incrementAndGet();
+ EXECUTED_JOB_CNT.incrementAndGet();
return null;
}
@@ -725,14 +827,14 @@ public class ComputeTaskPermissionsTest extends
AbstractSecurityTest {
taskUnblockedLatch.await(5_000, MILLISECONDS);
}
catch (InterruptedException e) {
- CANCELLED_TASK_CNTR.incrementAndGet();
+ CANCELLED_JOB_CNT.incrementAndGet();
Thread.currentThread().interrupt();
throw new IgniteException(e);
}
- EXECUTED_TASK_CNTR.incrementAndGet();
+ EXECUTED_JOB_CNT.incrementAndGet();
return null;
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/GridJobSessionImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/GridJobSessionImpl.java
index 5d98789420c..219f424f3c5 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/GridJobSessionImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/GridJobSessionImpl.java
@@ -73,7 +73,7 @@ public class GridJobSessionImpl implements
GridTaskSessionInternal {
}
/** {@inheritDoc} */
- @Override public GridTaskSessionInternal session() {
+ @Override public GridTaskSessionImpl session() {
return ses;
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
index 4ae62e2d343..956396c2e39 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
@@ -23,6 +23,7 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
+import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -41,6 +42,7 @@ import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeExecutionRejectedException;
+import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.ComputeJobSibling;
import org.apache.ignite.compute.ComputeTaskSession;
import org.apache.ignite.events.DiscoveryEvent;
@@ -57,6 +59,7 @@ import org.apache.ignite.internal.GridJobSiblingsResponse;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.GridTaskSessionImpl;
import org.apache.ignite.internal.GridTaskSessionRequest;
+import org.apache.ignite.internal.PlatformSecurityAwareJob;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import
org.apache.ignite.internal.managers.collision.GridCollisionJobContextAdapter;
import org.apache.ignite.internal.managers.collision.GridCollisionManager;
@@ -119,6 +122,10 @@ import static
org.apache.ignite.internal.processors.metric.GridMetricManager.CPU
import static
org.apache.ignite.internal.processors.metric.GridMetricManager.SYS_METRICS;
import static
org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
import static
org.apache.ignite.internal.processors.security.SecurityUtils.securitySubjectId;
+import static
org.apache.ignite.internal.processors.security.SecurityUtils.unwrap;
+import static
org.apache.ignite.internal.processors.task.GridTaskProcessor.resolveTaskClass;
+import static org.apache.ignite.plugin.security.SecurityPermission.ADMIN_KILL;
+import static org.apache.ignite.plugin.security.SecurityPermission.TASK_CANCEL;
import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q;
/**
@@ -787,51 +794,24 @@ public class GridJobProcessor extends
GridProcessorAdapter {
return;
}
- // Put either job ID or session ID (they are unique).
- cancelReqs.putIfAbsent(jobId != null ? jobId : sesId, sys);
-
- Predicate<GridJobWorker> idsMatch = idMatch(sesId, jobId);
-
- // If we don't have jobId then we have to iterate
- if (jobId == null) {
- if (!jobAlwaysActivate) {
- for (GridJobWorker job : passiveJobs.values()) {
- if (idsMatch.test(job))
- cancelPassiveJob(job);
- }
- }
-
- for (GridJobWorker job : activeJobs.values()) {
- if (idsMatch.test(job))
- cancelActiveJob(job, sys);
- }
+ GridJobWorker jobWorker = findJobWorker(sesId, jobId);
- for (GridJobWorker job : syncRunningJobs.values()) {
- if (idsMatch.test(job))
- cancelJob(job, sys);
- }
- }
- else {
- if (!jobAlwaysActivate) {
- GridJobWorker passiveJob = passiveJobs.get(jobId);
-
- if (passiveJob != null && idsMatch.test(passiveJob) &&
cancelPassiveJob(passiveJob))
- return;
- }
+ if (jobWorker != null && !sys)
+ authorizeJobCancel(jobWorker);
- GridJobWorker activeJob = activeJobs.get(jobId);
-
- if (activeJob != null && idsMatch.test(activeJob)) {
- cancelActiveJob(activeJob, sys);
+ // Put either job ID or session ID (they are unique).
+ cancelReqs.putIfAbsent(jobId != null ? jobId : sesId, sys);
- return;
- }
+ if (jobWorker == null)
+ return;
- activeJob = syncRunningJobs.get(jobId);
+ if (!cancelPassiveJob(jobWorker) && !cancelActiveJob(jobWorker,
sys))
+ cancelJob(jobWorker, sys);
+ }
+ catch (Exception e) {
+ U.error(log, "Failed to cancel Ignite Compute Task Job [sesId=" +
sesId + ", jobId=" + jobId + ']', e);
- if (activeJob != null && idsMatch.test(activeJob))
- cancelJob(activeJob, sys);
- }
+ throw e;
}
finally {
rwLock.readUnlock();
@@ -845,9 +825,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
* @return {@code True} if succeeded.
*/
private boolean cancelPassiveJob(GridJobWorker job) {
- assert !jobAlwaysActivate;
-
- if (removeFromPassive(job)) {
+ if (!jobAlwaysActivate && removeFromPassive(job)) {
if (log.isDebugEnabled())
log.debug("Job has been cancelled before activation: " + job);
@@ -867,7 +845,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
* @param job Job to cancel.
* @param sys Flag indicating whether this is a system cancel.
*/
- private void cancelActiveJob(GridJobWorker job, boolean sys) {
+ private boolean cancelActiveJob(GridJobWorker job, boolean sys) {
if (removeFromActive(job)) {
cancelledJobs.put(job.getJobId(), job);
@@ -877,7 +855,11 @@ public class GridJobProcessor extends GridProcessorAdapter
{
else
// No reply, since it is not cancel from collision.
cancelJob(job, sys);
+
+ return true;
}
+
+ return false;
}
/**
@@ -1711,7 +1693,6 @@ public class GridJobProcessor extends
GridProcessorAdapter {
* @param nodeId Node ID.
* @param req Request.
*/
- @SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter"})
private void processTaskSessionRequest(UUID nodeId, GridTaskSessionRequest
req) {
if (!rwLock.tryReadLock()) {
if (log.isDebugEnabled())
@@ -2424,6 +2405,80 @@ public class GridJobProcessor extends
GridProcessorAdapter {
.collect(groupingBy(GridJobWorker::status, counting()));
}
+ /** */
+ private void authorizeJobCancel(GridJobWorker jobWorker) {
+ if (!ctx.security().enabled())
+ return;
+
+ GridTaskSessionImpl taskSes = jobWorker.getSession().session();
+
+ Class<?> taskCls = resolveTaskClass(taskSes.getTaskName(), null, null);
+
+ if (taskCls == null || !ctx.security().isSystemType(taskCls))
+ ctx.security().authorize(taskSes.getTaskName(), TASK_CANCEL);
+ else {
+ authorizeJobCancel(
+ jobWorker.getJob(),
+
Objects.equals(taskSes.initiatorSecurityContext().subject().id(),
ctx.security().securityContext().subject().id())
+ );
+ }
+ }
+
+ /** */
+ public void authorizeJobCancel(ComputeJob job, boolean
isCanceledByInitiator) {
+ Object executable = unwrap(job);
+
+ if (!ctx.security().isSystemType(executable.getClass()))
+ ctx.security().authorize(executable.getClass().getName(),
TASK_CANCEL);
+ else if (executable instanceof PlatformSecurityAwareJob)
+
ctx.security().authorize(((PlatformSecurityAwareJob)executable).name(),
TASK_CANCEL);
+ else if (!isCanceledByInitiator)
+ ctx.security().authorize(ADMIN_KILL);
+ }
+
+ /** */
+ private GridJobWorker findJobWorker(@Nullable final IgniteUuid sesId,
@Nullable final IgniteUuid jobId) {
+ Predicate<GridJobWorker> pred = idMatch(sesId, jobId);
+
+ GridJobWorker res = findJobWorker(passiveJobs, jobId, pred);
+
+ if (res == null) {
+ res = findJobWorker(activeJobs, jobId, pred);
+
+ if (res == null)
+ res = findJobWorker(syncRunningJobs, jobId, pred);
+ }
+
+ return res;
+ }
+
+ /** */
+ private GridJobWorker findJobWorker(
+ @Nullable Map<IgniteUuid, GridJobWorker> workers,
+ @Nullable IgniteUuid jobId,
+ Predicate<GridJobWorker> pred
+ ) {
+ if (workers == null)
+ return null;
+
+ if (jobId == null) {
+ for (GridJobWorker jobWorker : workers.values()) {
+ if (jobWorker != null && pred.test(jobWorker))
+ return jobWorker;
+ }
+
+ return null;
+ }
+ else {
+ GridJobWorker jobWorker = workers.get(jobId);
+
+ if (jobWorker != null && pred.test(jobWorker))
+ return jobWorker;
+ }
+
+ return null;
+ }
+
/**
* @param sesId Task session ID.
* @param jobId Job ID.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
index 54775dcb63a..ebcfc3ad5ee 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
@@ -1599,7 +1599,7 @@ public class GridTaskProcessor extends
GridProcessorAdapter implements IgniteCha
* execution by its name, and corresponding to this name task class was
not found by the default classloader on
* the local node.
*/
- private Class<?> resolveTaskClass(@Nullable String taskName, @Nullable
Class<?> taskCls, @Nullable ComputeTask<?, ?> task) {
+ public static Class<?> resolveTaskClass(@Nullable String taskName,
@Nullable Class<?> taskCls, @Nullable ComputeTask<?, ?> task) {
if (taskCls != null)
return taskCls;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index 0d0de757abd..16c91b86edf 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -114,7 +114,6 @@ import static
org.apache.ignite.internal.processors.job.ComputeJobStatusEnum.FIN
import static
org.apache.ignite.internal.processors.security.SecurityUtils.authorizeAll;
import static
org.apache.ignite.internal.processors.security.SecurityUtils.unwrap;
import static org.apache.ignite.internal.util.lang.ClusterNodeFunc.node2id;
-import static org.apache.ignite.plugin.security.SecurityPermission.ADMIN_KILL;
import static org.apache.ignite.plugin.security.SecurityPermission.TASK_CANCEL;
import static
org.apache.ignite.plugin.security.SecurityPermission.TASK_EXECUTE;
@@ -1780,16 +1779,8 @@ public class GridTaskWorker<T, R> extends GridWorker
implements GridTimeoutObjec
ses.initiatorSecurityContext().subject().id(),
ctx.security().securityContext().subject().id());
- for (GridJobResultImpl jobRes : jobRes.values()) {
- Object executable = unwrap(jobRes.getJob());
-
- if (!ctx.security().isSystemType(executable.getClass()))
- ctx.security().authorize(executable.getClass().getName(),
TASK_CANCEL);
- else if (executable instanceof PlatformSecurityAwareJob)
-
ctx.security().authorize(((PlatformSecurityAwareJob)executable).name(),
TASK_CANCEL);
- else if (!isClosedByInitiator)
- ctx.security().authorize(ADMIN_KILL);
- }
+ for (GridJobResultImpl jobRes : jobRes.values())
+ ctx.job().authorizeJobCancel(jobRes.getJob(),
isClosedByInitiator);
}
}
}