This is an automated email from the ASF dual-hosted git repository.

mpetrov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a36d7d255c IGNITE-27196 Added authorization of Compute Job 
cancellation (#12542)
8a36d7d255c is described below

commit 8a36d7d255c1f8fc2296f7eda96b5cce554543df
Author: Mikhail Petrov <[email protected]>
AuthorDate: Wed Dec 3 11:01:22 2025 +0300

    IGNITE-27196 Added authorization of Compute Job cancellation (#12542)
---
 .../ignite/common/ComputeTaskPermissionsTest.java  | 176 ++++++++++++++++-----
 .../apache/ignite/internal/GridJobSessionImpl.java |   2 +-
 .../internal/processors/job/GridJobProcessor.java  | 145 +++++++++++------
 .../processors/task/GridTaskProcessor.java         |   2 +-
 .../internal/processors/task/GridTaskWorker.java   |  13 +-
 5 files changed, 243 insertions(+), 95 deletions(-)

diff --git 
a/modules/clients/src/test/java/org/apache/ignite/common/ComputeTaskPermissionsTest.java
 
b/modules/clients/src/test/java/org/apache/ignite/common/ComputeTaskPermissionsTest.java
index 390478bef79..02321a705a2 100644
--- 
a/modules/clients/src/test/java/org/apache/ignite/common/ComputeTaskPermissionsTest.java
+++ 
b/modules/clients/src/test/java/org/apache/ignite/common/ComputeTaskPermissionsTest.java
@@ -40,8 +40,11 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.compute.ComputeJob;
 import org.apache.ignite.compute.ComputeJobAdapter;
 import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeJobResultPolicy;
+import org.apache.ignite.compute.ComputeJobSibling;
 import org.apache.ignite.compute.ComputeTask;
 import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.apache.ignite.compute.ComputeTaskFuture;
 import org.apache.ignite.configuration.ClientConfiguration;
 import org.apache.ignite.configuration.ClientConnectorConfiguration;
 import org.apache.ignite.configuration.ConnectorConfiguration;
@@ -50,6 +53,7 @@ import 
org.apache.ignite.configuration.ThinClientConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.management.cache.VerifyBackupPartitionsTask;
 import org.apache.ignite.internal.processors.security.AbstractSecurityTest;
+import 
org.apache.ignite.internal.processors.security.AbstractTestSecurityPluginProvider;
 import org.apache.ignite.internal.processors.security.OperationSecurityContext;
 import org.apache.ignite.internal.processors.security.PublicAccessJob;
 import org.apache.ignite.internal.processors.security.SecurityContext;
@@ -71,6 +75,8 @@ import org.apache.ignite.plugin.security.SecurityCredentials;
 import org.apache.ignite.plugin.security.SecurityException;
 import org.apache.ignite.plugin.security.SecurityPermissionSet;
 import org.apache.ignite.plugin.security.SecurityPermissionSetBuilder;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import org.junit.Test;
@@ -92,6 +98,7 @@ import static 
org.apache.ignite.plugin.security.SecurityPermission.TASK_EXECUTE;
 import static 
org.apache.ignite.plugin.security.SecurityPermissionSetBuilder.NO_PERMISSIONS;
 import static 
org.apache.ignite.plugin.security.SecurityPermissionSetBuilder.create;
 import static 
org.apache.ignite.plugin.security.SecuritySubjectType.REMOTE_CLIENT;
+import static 
org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
 import static 
org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
 import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
 
@@ -111,10 +118,10 @@ public class ComputeTaskPermissionsTest extends 
AbstractSecurityTest {
     private static final ComputeTask SYSTEM_TASK = new 
VerifyBackupPartitionsTask();
 
     /** */
-    private static final AtomicInteger EXECUTED_TASK_CNTR = new 
AtomicInteger();
+    private static final AtomicInteger EXECUTED_JOB_CNT = new AtomicInteger();
 
     /** */
-    private static final AtomicInteger CANCELLED_TASK_CNTR = new 
AtomicInteger();
+    private static final AtomicInteger CANCELLED_JOB_CNT = new AtomicInteger();
 
     /** */
     private static final String CACHE = DEFAULT_CACHE_NAME;
@@ -128,6 +135,9 @@ public class ComputeTaskPermissionsTest extends 
AbstractSecurityTest {
     /** */
     public static CountDownLatch taskUnblockedLatch;
 
+    /** */
+    private static ListeningTestLogger listeningLog;
+
     /** {@inheritDoc} */
     @Override protected void beforeTestsStarted() throws Exception {
         super.beforeTestsStarted();
@@ -138,6 +148,8 @@ public class ComputeTaskPermissionsTest extends 
AbstractSecurityTest {
             PublicAccessSystemJob.class
         );
 
+        listeningLog = new ListeningTestLogger(log);
+
         for (int idx = 0; idx < SRV_NODES_CNT; idx++)
             startGrid(idx, false);
 
@@ -151,6 +163,15 @@ public class ComputeTaskPermissionsTest extends 
AbstractSecurityTest {
         grid(0).createCache(CACHE);
     }
 
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(
+        String instanceName,
+        AbstractTestSecurityPluginProvider pluginProv
+    ) throws Exception {
+        return super.getConfiguration(instanceName, pluginProv)
+            .setGridLogger(listeningLog);
+    }
+
     /** */
     private IgniteEx startGrid(int idx, boolean isClient) throws Exception {
         String login = getTestIgniteInstanceName(idx);
@@ -373,6 +394,82 @@ public class ComputeTaskPermissionsTest extends 
AbstractSecurityTest {
         checkCallable(c -> executorService(initiator, 
executor).invokeAny(singletonList(c), getTestTimeout(), MILLISECONDS));
     }
 
+    /** */
+    @Test
+    public void testJobCancelAuthorizationSucceeded() throws Exception {
+        taskStartedLatch = new CountDownLatch(SRV_NODES_CNT);
+        taskUnblockedLatch = new CountDownLatch(1);
+
+        CANCELLED_JOB_CNT.set(0);
+        EXECUTED_JOB_CNT.set(0);
+
+        ComputeTaskFuture<Object> fut = 
grid(0).compute().executeAsync(CancelAllowedTask.class, null);
+
+        taskStartedLatch.await(getTestTimeout(), MILLISECONDS);
+
+        for (ComputeJobSibling sibling : fut.getTaskSession().getJobSiblings())
+            sibling.cancel();
+
+        fut.get(getTestTimeout());
+
+        assertTrue(waitForCondition(() -> SRV_NODES_CNT == 
CANCELLED_JOB_CNT.get(), getTestTimeout()));
+        assertEquals(0, EXECUTED_JOB_CNT.get());
+    }
+
+    /** */
+    @Test
+    public void testJobCancelAuthorizationFailed() throws Exception {
+        taskStartedLatch = new CountDownLatch(SRV_NODES_CNT);
+        taskUnblockedLatch = new CountDownLatch(1);
+
+        CANCELLED_JOB_CNT.set(0);
+        EXECUTED_JOB_CNT.set(0);
+
+        ComputeTaskFuture<Object> fut = 
grid(0).compute().executeAsync(CancelForbiddenTask.class, null);
+
+        taskStartedLatch.await(getTestTimeout(), MILLISECONDS);
+
+        for (ComputeJobSibling sibling : 
fut.getTaskSession().getJobSiblings()) {
+            LogListener logLsnr = LogListener.matches("Failed to cancel Ignite 
Compute Task Job" +
+                " [sesId=" + fut.getTaskSession().getId() +
+                ", jobId=" + sibling.getJobId() + ']'
+            ).build();
+
+            listeningLog.registerListener(logLsnr);
+
+            // TODO https://issues.apache.org/jira/browse/IGNITE-27195 
Authorization errors during Compute Job
+            //  cancellation do not propagate from remote nodes back to the 
one that initiated cancellation.
+            if (grid(0).context().job().activeJob(sibling.getJobId()) != null) 
{
+                assertThrowsAnyCause(
+                    log,
+                    () -> {
+                        sibling.cancel();
+
+                        return null;
+                    },
+                    SecurityException.class,
+                    "Authorization failed"
+                );
+            }
+            else
+                sibling.cancel();
+
+            logLsnr.check(getTestTimeout());
+        }
+
+        assertEquals(0, EXECUTED_JOB_CNT.get());
+        assertEquals(0, CANCELLED_JOB_CNT.get());
+
+        assertFalse(fut.isDone());
+
+        taskUnblockedLatch.countDown();
+
+        fut.get(getTestTimeout());
+
+        assertTrue(waitForCondition(() -> SRV_NODES_CNT == 
EXECUTED_JOB_CNT.get(), getTestTimeout()));
+        assertEquals(0, CANCELLED_JOB_CNT.get());
+    }
+
     /** */
     @Test
     public void testSystemTaskCancel() throws Exception {
@@ -420,42 +517,37 @@ public class ComputeTaskPermissionsTest extends 
AbstractSecurityTest {
         taskStartedLatch = new CountDownLatch(expTaskCnt);
         taskUnblockedLatch = new CountDownLatch(1);
 
-        CANCELLED_TASK_CNTR.set(0);
-        EXECUTED_TASK_CNTR.set(0);
+        CANCELLED_JOB_CNT.set(0);
+        EXECUTED_JOB_CNT.set(0);
 
-        try {
-            Future<?> fut = taskStarter.get();
+        Future<?> fut = taskStarter.get();
 
-            assertTrue(taskStartedLatch.await(getTestTimeout(), MILLISECONDS));
+        assertTrue(taskStartedLatch.await(getTestTimeout(), MILLISECONDS));
 
-            try (
-                OperationSecurityContext ignored = initiator == null
-                    ? null
-                    : grid(0).context().security().withContext(initiator)
-            ) {
-                if (expE == null) {
-                    fut.cancel(true);
+        try (
+            OperationSecurityContext ignored = initiator == null
+                ? null
+                : grid(0).context().security().withContext(initiator)
+        ) {
+            if (expE == null) {
+                fut.cancel(true);
 
-                    assertTrue(fut.isCancelled());
+                assertTrue(fut.isCancelled());
 
-                    assertTrue(waitForCondition(() -> expTaskCnt == 
CANCELLED_TASK_CNTR.get(), getTestTimeout()));
+                assertTrue(waitForCondition(() -> expTaskCnt == 
CANCELLED_JOB_CNT.get(), getTestTimeout()));
 
-                    assertEquals(0, EXECUTED_TASK_CNTR.get());
-                }
-                else {
-                    assertThrowsWithCause(() -> fut.cancel(true), expE);
+                assertEquals(0, EXECUTED_JOB_CNT.get());
+            }
+            else {
+                assertThrowsWithCause(() -> fut.cancel(true), expE);
 
-                    assertFalse(fut.isCancelled());
+                assertFalse(fut.isCancelled());
 
-                    taskUnblockedLatch.countDown();
+                taskUnblockedLatch.countDown();
 
-                    assertTrue(waitForCondition(() -> expTaskCnt == 
EXECUTED_TASK_CNTR.get(), getTestTimeout()));
-                }
+                assertTrue(waitForCondition(() -> expTaskCnt == 
EXECUTED_JOB_CNT.get(), getTestTimeout()));
             }
         }
-        finally {
-            taskUnblockedLatch.countDown();
-        }
     }
 
     /** */
@@ -516,16 +608,16 @@ public class ComputeTaskPermissionsTest extends 
AbstractSecurityTest {
 
     /** */
     private void assertCompleted(RunnableX r, int expCnt) {
-        EXECUTED_TASK_CNTR.set(0);
+        EXECUTED_JOB_CNT.set(0);
 
         r.run();
 
-        assertEquals(expCnt, EXECUTED_TASK_CNTR.get());
+        assertEquals(expCnt, EXECUTED_JOB_CNT.get());
     }
 
     /** */
     private void assertFailed(RunnableX r) {
-        EXECUTED_TASK_CNTR.set(0);
+        EXECUTED_JOB_CNT.set(0);
 
         try {
             r.run();
@@ -547,7 +639,7 @@ public class ComputeTaskPermissionsTest extends 
AbstractSecurityTest {
 
         fail();
 
-        assertEquals(0, EXECUTED_TASK_CNTR.get());
+        assertEquals(0, EXECUTED_JOB_CNT.get());
     }
 
     /** */
@@ -629,7 +721,7 @@ public class ComputeTaskPermissionsTest extends 
AbstractSecurityTest {
     private abstract static class AbstractRunnable implements IgniteRunnable {
         /** {@inheritDoc} */
         @Override public void run() {
-            EXECUTED_TASK_CNTR.incrementAndGet();
+            EXECUTED_JOB_CNT.incrementAndGet();
         }
     }
 
@@ -637,7 +729,7 @@ public class ComputeTaskPermissionsTest extends 
AbstractSecurityTest {
     private abstract static class AbstractCallable implements 
IgniteCallable<AtomicInteger> {
         /** {@inheritDoc} */
         @Override public AtomicInteger call() throws Exception {
-            EXECUTED_TASK_CNTR.incrementAndGet();
+            EXECUTED_JOB_CNT.incrementAndGet();
 
             return new AtomicInteger(0);
         }
@@ -647,7 +739,7 @@ public class ComputeTaskPermissionsTest extends 
AbstractSecurityTest {
     private abstract static class AbstractClosure implements 
IgniteClosure<Boolean, Boolean> {
         /** {@inheritDoc} */
         @Override public Boolean apply(Boolean o) {
-            EXECUTED_TASK_CNTR.incrementAndGet();
+            EXECUTED_JOB_CNT.incrementAndGet();
 
             return null;
         }
@@ -681,9 +773,19 @@ public class ComputeTaskPermissionsTest extends 
AbstractSecurityTest {
 
         /** {@inheritDoc} */
         @Override public @Nullable Object reduce(List<ComputeJobResult> 
results) throws IgniteException {
+            for (ComputeJobResult res : results) {
+                if (!res.isCancelled() && res.getException() != null)
+                    throw res.getException();
+            }
+
             return null;
         }
 
+        /** {@inheritDoc} */
+        @Override public ComputeJobResultPolicy result(ComputeJobResult res, 
List<ComputeJobResult> rcvd) throws IgniteException {
+            return ComputeJobResultPolicy.WAIT;
+        }
+
         /** */
         protected ComputeJob job() {
             return new TestJob();
@@ -707,7 +809,7 @@ public class ComputeTaskPermissionsTest extends 
AbstractSecurityTest {
 
         /** {@inheritDoc} */
         @Override public Object execute() {
-            EXECUTED_TASK_CNTR.incrementAndGet();
+            EXECUTED_JOB_CNT.incrementAndGet();
 
             return null;
         }
@@ -725,14 +827,14 @@ public class ComputeTaskPermissionsTest extends 
AbstractSecurityTest {
                 taskUnblockedLatch.await(5_000, MILLISECONDS);
             }
             catch (InterruptedException e) {
-                CANCELLED_TASK_CNTR.incrementAndGet();
+                CANCELLED_JOB_CNT.incrementAndGet();
 
                 Thread.currentThread().interrupt();
 
                 throw new IgniteException(e);
             }
 
-            EXECUTED_TASK_CNTR.incrementAndGet();
+            EXECUTED_JOB_CNT.incrementAndGet();
 
             return null;
         }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/GridJobSessionImpl.java 
b/modules/core/src/main/java/org/apache/ignite/internal/GridJobSessionImpl.java
index 5d98789420c..219f424f3c5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/GridJobSessionImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/GridJobSessionImpl.java
@@ -73,7 +73,7 @@ public class GridJobSessionImpl implements 
GridTaskSessionInternal {
     }
 
     /** {@inheritDoc} */
-    @Override public GridTaskSessionInternal session() {
+    @Override public GridTaskSessionImpl session() {
         return ses;
     }
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
index 4ae62e2d343..956396c2e39 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.NoSuchElementException;
+import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -41,6 +42,7 @@ import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.compute.ComputeExecutionRejectedException;
+import org.apache.ignite.compute.ComputeJob;
 import org.apache.ignite.compute.ComputeJobSibling;
 import org.apache.ignite.compute.ComputeTaskSession;
 import org.apache.ignite.events.DiscoveryEvent;
@@ -57,6 +59,7 @@ import org.apache.ignite.internal.GridJobSiblingsResponse;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.GridTaskSessionImpl;
 import org.apache.ignite.internal.GridTaskSessionRequest;
+import org.apache.ignite.internal.PlatformSecurityAwareJob;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import 
org.apache.ignite.internal.managers.collision.GridCollisionJobContextAdapter;
 import org.apache.ignite.internal.managers.collision.GridCollisionManager;
@@ -119,6 +122,10 @@ import static 
org.apache.ignite.internal.processors.metric.GridMetricManager.CPU
 import static 
org.apache.ignite.internal.processors.metric.GridMetricManager.SYS_METRICS;
 import static 
org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
 import static 
org.apache.ignite.internal.processors.security.SecurityUtils.securitySubjectId;
+import static 
org.apache.ignite.internal.processors.security.SecurityUtils.unwrap;
+import static 
org.apache.ignite.internal.processors.task.GridTaskProcessor.resolveTaskClass;
+import static org.apache.ignite.plugin.security.SecurityPermission.ADMIN_KILL;
+import static org.apache.ignite.plugin.security.SecurityPermission.TASK_CANCEL;
 import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q;
 
 /**
@@ -787,51 +794,24 @@ public class GridJobProcessor extends 
GridProcessorAdapter {
                 return;
             }
 
-            // Put either job ID or session ID (they are unique).
-            cancelReqs.putIfAbsent(jobId != null ? jobId : sesId, sys);
-
-            Predicate<GridJobWorker> idsMatch = idMatch(sesId, jobId);
-
-            // If we don't have jobId then we have to iterate
-            if (jobId == null) {
-                if (!jobAlwaysActivate) {
-                    for (GridJobWorker job : passiveJobs.values()) {
-                        if (idsMatch.test(job))
-                            cancelPassiveJob(job);
-                    }
-                }
-
-                for (GridJobWorker job : activeJobs.values()) {
-                    if (idsMatch.test(job))
-                        cancelActiveJob(job, sys);
-                }
+            GridJobWorker jobWorker = findJobWorker(sesId, jobId);
 
-                for (GridJobWorker job : syncRunningJobs.values()) {
-                    if (idsMatch.test(job))
-                        cancelJob(job, sys);
-                }
-            }
-            else {
-                if (!jobAlwaysActivate) {
-                    GridJobWorker passiveJob = passiveJobs.get(jobId);
-
-                    if (passiveJob != null && idsMatch.test(passiveJob) && 
cancelPassiveJob(passiveJob))
-                        return;
-                }
+            if (jobWorker != null && !sys)
+                authorizeJobCancel(jobWorker);
 
-                GridJobWorker activeJob = activeJobs.get(jobId);
-
-                if (activeJob != null && idsMatch.test(activeJob)) {
-                    cancelActiveJob(activeJob, sys);
+            // Put either job ID or session ID (they are unique).
+            cancelReqs.putIfAbsent(jobId != null ? jobId : sesId, sys);
 
-                    return;
-                }
+            if (jobWorker == null)
+                return;
 
-                activeJob = syncRunningJobs.get(jobId);
+            if (!cancelPassiveJob(jobWorker) && !cancelActiveJob(jobWorker, 
sys))
+                cancelJob(jobWorker, sys);
+        }
+        catch (Exception e) {
+            U.error(log, "Failed to cancel Ignite Compute Task Job [sesId=" + 
sesId + ", jobId=" + jobId + ']', e);
 
-                if (activeJob != null && idsMatch.test(activeJob))
-                    cancelJob(activeJob, sys);
-            }
+            throw e;
         }
         finally {
             rwLock.readUnlock();
@@ -845,9 +825,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
      * @return {@code True} if succeeded.
      */
     private boolean cancelPassiveJob(GridJobWorker job) {
-        assert !jobAlwaysActivate;
-
-        if (removeFromPassive(job)) {
+        if (!jobAlwaysActivate && removeFromPassive(job)) {
             if (log.isDebugEnabled())
                 log.debug("Job has been cancelled before activation: " + job);
 
@@ -867,7 +845,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
      * @param job Job to cancel.
      * @param sys Flag indicating whether this is a system cancel.
      */
-    private void cancelActiveJob(GridJobWorker job, boolean sys) {
+    private boolean cancelActiveJob(GridJobWorker job, boolean sys) {
         if (removeFromActive(job)) {
             cancelledJobs.put(job.getJobId(), job);
 
@@ -877,7 +855,11 @@ public class GridJobProcessor extends GridProcessorAdapter 
{
             else
                 // No reply, since it is not cancel from collision.
                 cancelJob(job, sys);
+
+            return true;
         }
+
+        return false;
     }
 
     /**
@@ -1711,7 +1693,6 @@ public class GridJobProcessor extends 
GridProcessorAdapter {
      * @param nodeId Node ID.
      * @param req Request.
      */
-    @SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter"})
     private void processTaskSessionRequest(UUID nodeId, GridTaskSessionRequest 
req) {
         if (!rwLock.tryReadLock()) {
             if (log.isDebugEnabled())
@@ -2424,6 +2405,80 @@ public class GridJobProcessor extends 
GridProcessorAdapter {
             .collect(groupingBy(GridJobWorker::status, counting()));
     }
 
+    /** */
+    private void authorizeJobCancel(GridJobWorker jobWorker) {
+        if (!ctx.security().enabled())
+            return;
+
+        GridTaskSessionImpl taskSes = jobWorker.getSession().session();
+
+        Class<?> taskCls = resolveTaskClass(taskSes.getTaskName(), null, null);
+
+        if (taskCls == null || !ctx.security().isSystemType(taskCls))
+            ctx.security().authorize(taskSes.getTaskName(), TASK_CANCEL);
+        else {
+            authorizeJobCancel(
+                jobWorker.getJob(),
+                
Objects.equals(taskSes.initiatorSecurityContext().subject().id(), 
ctx.security().securityContext().subject().id())
+            );
+        }
+    }
+
+    /** */
+    public void authorizeJobCancel(ComputeJob job, boolean 
isCanceledByInitiator) {
+        Object executable = unwrap(job);
+
+        if (!ctx.security().isSystemType(executable.getClass()))
+            ctx.security().authorize(executable.getClass().getName(), 
TASK_CANCEL);
+        else if (executable instanceof PlatformSecurityAwareJob)
+            
ctx.security().authorize(((PlatformSecurityAwareJob)executable).name(), 
TASK_CANCEL);
+        else if (!isCanceledByInitiator)
+            ctx.security().authorize(ADMIN_KILL);
+    }
+
+    /** */
+    private GridJobWorker findJobWorker(@Nullable final IgniteUuid sesId, 
@Nullable final IgniteUuid jobId) {
+        Predicate<GridJobWorker> pred = idMatch(sesId, jobId);
+
+        GridJobWorker res = findJobWorker(passiveJobs, jobId, pred);
+
+        if (res == null) {
+            res = findJobWorker(activeJobs, jobId, pred);
+
+            if (res == null)
+                res = findJobWorker(syncRunningJobs, jobId, pred);
+        }
+
+        return res;
+    }
+
+    /** */
+    private GridJobWorker findJobWorker(
+        @Nullable Map<IgniteUuid, GridJobWorker> workers,
+        @Nullable IgniteUuid jobId,
+        Predicate<GridJobWorker> pred
+    ) {
+        if (workers == null)
+            return null;
+
+        if (jobId == null) {
+            for (GridJobWorker jobWorker : workers.values()) {
+                if (jobWorker != null && pred.test(jobWorker))
+                    return jobWorker;
+            }
+
+            return null;
+        }
+        else {
+            GridJobWorker jobWorker = workers.get(jobId);
+
+            if (jobWorker != null && pred.test(jobWorker))
+                return jobWorker;
+        }
+
+        return null;
+    }
+
     /**
      * @param sesId Task session ID.
      * @param jobId Job ID.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
index 54775dcb63a..ebcfc3ad5ee 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
@@ -1599,7 +1599,7 @@ public class GridTaskProcessor extends 
GridProcessorAdapter implements IgniteCha
      * execution by its name, and  corresponding to this name task class was 
not found by the default classloader on
      * the local node.
      */
-    private Class<?> resolveTaskClass(@Nullable String taskName, @Nullable 
Class<?> taskCls, @Nullable ComputeTask<?, ?> task) {
+    public static Class<?> resolveTaskClass(@Nullable String taskName, 
@Nullable Class<?> taskCls, @Nullable ComputeTask<?, ?> task) {
         if (taskCls != null)
             return taskCls;
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index 0d0de757abd..16c91b86edf 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -114,7 +114,6 @@ import static 
org.apache.ignite.internal.processors.job.ComputeJobStatusEnum.FIN
 import static 
org.apache.ignite.internal.processors.security.SecurityUtils.authorizeAll;
 import static 
org.apache.ignite.internal.processors.security.SecurityUtils.unwrap;
 import static org.apache.ignite.internal.util.lang.ClusterNodeFunc.node2id;
-import static org.apache.ignite.plugin.security.SecurityPermission.ADMIN_KILL;
 import static org.apache.ignite.plugin.security.SecurityPermission.TASK_CANCEL;
 import static 
org.apache.ignite.plugin.security.SecurityPermission.TASK_EXECUTE;
 
@@ -1780,16 +1779,8 @@ public class GridTaskWorker<T, R> extends GridWorker 
implements GridTimeoutObjec
                 ses.initiatorSecurityContext().subject().id(),
                 ctx.security().securityContext().subject().id());
 
-            for (GridJobResultImpl jobRes : jobRes.values()) {
-                Object executable = unwrap(jobRes.getJob());
-
-                if (!ctx.security().isSystemType(executable.getClass()))
-                    ctx.security().authorize(executable.getClass().getName(), 
TASK_CANCEL);
-                else if (executable instanceof PlatformSecurityAwareJob)
-                    
ctx.security().authorize(((PlatformSecurityAwareJob)executable).name(), 
TASK_CANCEL);
-                else if (!isClosedByInitiator)
-                    ctx.security().authorize(ADMIN_KILL);
-            }
+            for (GridJobResultImpl jobRes : jobRes.values())
+                ctx.job().authorizeJobCancel(jobRes.getJob(), 
isClosedByInitiator);
         }
     }
 }

Reply via email to