This is an automated email from the ASF dual-hosted git repository.

ilyak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 1bb32a7  IGNITE-12300 Use initiating node security context in 
ComputeJob.cancel - Fixes #7017.
1bb32a7 is described below

commit 1bb32a7d5a86f31f77bd2d729b083e1f75be78d9
Author: d.garus <[email protected]>
AuthorDate: Fri Nov 8 13:06:21 2019 +0300

    IGNITE-12300 Use initiating node security context in ComputeJob.cancel - 
Fixes #7017.
    
    Signed-off-by: Ilya Kasnacheev <[email protected]>
---
 .../internal/processors/job/GridJobWorker.java     |  11 +-
 .../AbstractRemoteSecurityContextCheckTest.java    |  29 +--
 ...teTaskCancelRemoteSecurityContextCheckTest.java | 195 +++++++++++++++++++++
 .../ignite/testsuites/SecurityTestSuite.java       |   3 +
 4 files changed, 225 insertions(+), 13 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
index e6a72b5..90e54e5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
@@ -51,6 +51,8 @@ import 
org.apache.ignite.internal.managers.deployment.GridDeployment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.internal.processors.security.OperationSecurityContext;
+import org.apache.ignite.internal.processors.security.SecurityContext;
 import 
org.apache.ignite.internal.processors.service.GridServiceNotFoundException;
 import org.apache.ignite.internal.processors.task.GridInternal;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
@@ -174,6 +176,9 @@ public class GridJobWorker extends GridWorker implements 
GridTimeoutObject {
     /** Request topology version. */
     private final String execName;
 
+    /** Security context. */
+    private final SecurityContext secCtx;
+
     /**
      * @param ctx Kernal context.
      * @param dep Grid deployment.
@@ -240,6 +245,8 @@ public class GridJobWorker extends GridWorker implements 
GridTimeoutObject {
 
         jobTopic = TOPIC_JOB.topic(ses.getJobId(), locNodeId);
         taskTopic = TOPIC_TASK.topic(ses.getJobId(), locNodeId);
+
+        secCtx = ctx.security().securityContext();
     }
 
     /**
@@ -721,7 +728,9 @@ public class GridJobWorker extends GridWorker implements 
GridTimeoutObject {
 
                 U.wrapThreadLoader(dep.classLoader(), new IgniteRunnable() {
                     @Override public void run() {
-                        job0.cancel();
+                        try (OperationSecurityContext c = 
ctx.security().withContext(secCtx)) {
+                            job0.cancel();
+                        }
                     }
                 });
             }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/AbstractRemoteSecurityContextCheckTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/AbstractRemoteSecurityContextCheckTest.java
index 05b348c..9acc07e 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/AbstractRemoteSecurityContextCheckTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/AbstractRemoteSecurityContextCheckTest.java
@@ -41,8 +41,6 @@ import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteRunnable;
 
 import static org.apache.ignite.Ignition.localIgnite;
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertThat;
 
 /**
  *
@@ -162,7 +160,7 @@ public abstract class 
AbstractRemoteSecurityContextCheckTest extends AbstractSec
         private UUID expSecSubjId;
 
         /** */
-        private Verifier clear() {
+        public Verifier clear() {
             registeredSubjects.clear();
             expInvokes.clear();
 
@@ -185,11 +183,16 @@ public abstract class 
AbstractRemoteSecurityContextCheckTest extends AbstractSec
         }
 
         /**
-         * Registers current security context and increments invoke's counter.
+         * Registers a security subject referred for {@code localIgnite} and 
increments invoke counter.
          */
-        public synchronized void register() {
-            IgniteEx ignite = (IgniteEx)localIgnite();
+        public void register() {
+            register((IgniteEx)localIgnite());
+        }
 
+        /**
+         * Registers a security subject referred for the passed {@code ignite} 
and increments invoke counter.
+         */
+        public synchronized void register(IgniteEx ignite) {
             registeredSubjects.add(new T2<>(secSubjectId(ignite), 
ignite.name()));
 
             expInvokes.computeIfPresent(ignite.name(), (name, t2) -> {
@@ -204,15 +207,15 @@ public abstract class 
AbstractRemoteSecurityContextCheckTest extends AbstractSec
         /**
          * Checks result of test and clears expected behavior.
          */
-        private void checkResult() {
+        public void checkResult() {
             registeredSubjects.forEach(t ->
-                assertThat("Invalide security context on node " + t.get2(),
-                    t.get1(), is(expSecSubjId))
+                assertEquals("Invalide security context on node " + t.get2(),
+                    expSecSubjId, t.get1())
             );
 
             expInvokes.forEach((key, value) ->
-                assertThat("Node " + key + ". Execution of register: ",
-                    value.get2(), is(value.get1())));
+                assertEquals("Node " + key + ". Execution of register: ",
+                    value.get1(), value.get2()));
 
             clear();
         }
@@ -225,8 +228,10 @@ public abstract class 
AbstractRemoteSecurityContextCheckTest extends AbstractSec
         }
 
         /** */
-        private void initiator(IgniteEx initiator) {
+        public Verifier initiator(IgniteEx initiator) {
             expSecSubjId = secSubjectId(initiator);
+
+            return this;
         }
 
         /** */
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/compute/closure/ComputeTaskCancelRemoteSecurityContextCheckTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/compute/closure/ComputeTaskCancelRemoteSecurityContextCheckTest.java
new file mode 100644
index 0000000..15c342c
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/compute/closure/ComputeTaskCancelRemoteSecurityContextCheckTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.security.compute.closure;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeJobResultPolicy;
+import org.apache.ignite.compute.ComputeTask;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgnitionEx;
+import 
org.apache.ignite.internal.processors.security.AbstractRemoteSecurityContextCheckTest;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+
+/**
+ * Testing operation security context when the compute task is canceled on 
remote node.
+ * <p>
+ * The initiator node send a task on a remote node asynchronously.
+ * The method {@code cancel} is called on IgniteFuture or
+ * the remote node leaves the cluster that is the reason for calling the 
'cancel' method of a ComputeJob.
+ * This method should be executed on the remote node with the initiator
+ * node security context.
+ */
+public class ComputeTaskCancelRemoteSecurityContextCheckTest extends 
AbstractRemoteSecurityContextCheckTest {
+    /** Cancel method was executed. */
+    private static final AtomicBoolean CANCELED = new AtomicBoolean();
+
+    /** Waiting timeout. */
+    private static final int TIMEOUT = 20_000;
+
+    /** */
+    private static final CyclicBarrier BARRIER = new CyclicBarrier(2);
+
+    /** {@inheritDoc} */
+    @Override protected void setupVerifier(Verifier verifier) {
+        // No-op.
+    }
+
+    /**
+     * Initiator is server node. Remote is server node.
+     */
+    @Test
+    public void testSrvTaskInitatorCancelOnSrvNode() throws Exception {
+        prepareAndCheck(false, false);
+    }
+
+    /**
+     * Initiator is server node. Remote is client node.
+     */
+    @Test
+    public void testSrvTaskInitatorCancelOnClientNode() throws Exception {
+        prepareAndCheck(false, true);
+    }
+
+    /**
+     * Initiator is client node. Remote is server node.
+     */
+    @Test
+    public void testClientTaskInitatorCancelOnSrvNode() throws Exception {
+        prepareAndCheck(true, false);
+    }
+
+    /**
+     * Initiator is client node. Remote is client node.
+     */
+    @Test
+    public void testClientTaskInitatorCancelOnClientNode() throws Exception {
+        prepareAndCheck(true, true);
+    }
+
+    /**
+     * @param isClientInitiator The initiator node is client.
+     * @param isClientRmt The remote node is client.
+     */
+    private void prepareAndCheck(boolean isClientInitiator, boolean 
isClientRmt) throws Exception {
+        try {
+            IgniteEx srv = startGridAllowAll("srv_init");
+
+            IgniteEx initator = isClientInitiator ? 
startClientAllowAll("clnt_init") : srv;
+
+            IgniteEx rmt = isClientRmt ? startClientAllowAll("clnt_rmt") : 
startGridAllowAll("srv_rmt");
+
+            srv.cluster().active(true);
+
+            //Checks the case when IgniteFuture#cancel is called.
+            checkCancel(initator, rmt, IgniteFuture::cancel);
+            //Checks the case when rmt node leaves the cluster.
+            checkCancel(initator, rmt, f -> IgnitionEx.stop(rmt.name(), true, 
false));
+        }
+        finally {
+            G.stopAll(true);
+
+            cleanPersistenceDir();
+        }
+    }
+
+    /** */
+    private void checkCancel(IgniteEx initator, IgniteEx rmt, 
Consumer<IgniteFuture> consumer) throws Exception {
+        VERIFIER
+            .clear()
+            .initiator(initator)
+            .expect(rmt.name(), 1);
+
+        BARRIER.reset();
+        CANCELED.set(false);
+
+        IgniteFuture fut = compute(initator, 
Collections.singleton(rmt.localNode().id()))
+            .executeAsync(new TestComputeTask(), 0);
+
+        BARRIER.await(TIMEOUT, TimeUnit.MILLISECONDS);
+
+        consumer.accept(fut);
+
+        GridTestUtils.waitForCondition(CANCELED::get, TIMEOUT);
+
+        VERIFIER.checkResult();
+    }
+
+    /**
+     * Compute task for tests.
+     */
+    static class TestComputeTask implements ComputeTask<Integer, Integer> {
+        /** {@inheritDoc} */
+        @Override public Map<? extends ComputeJob, ClusterNode> 
map(List<ClusterNode> subgrid,
+            Integer arg) throws IgniteException {
+            return Collections.singletonMap(
+                new ComputeJob() {
+                    @IgniteInstanceResource
+                    private Ignite loc;
+
+                    @Override public void cancel() {
+                        VERIFIER.register((IgniteEx)loc);
+
+                        CANCELED.set(true);
+                    }
+
+                    @Override public Object execute() {
+                        try {
+                            BARRIER.await(TIMEOUT, TimeUnit.MILLISECONDS);
+
+                            GridTestUtils.waitForCondition(() -> false, 
TIMEOUT);
+                        }
+                        catch (Exception e) {
+                            throw new RuntimeException(e);
+                        }
+
+                        return null;
+                    }
+                }, 
subgrid.stream().findFirst().orElseThrow(IllegalStateException::new)
+            );
+        }
+
+        /** {@inheritDoc} */
+        @Override public ComputeJobResultPolicy result(ComputeJobResult res, 
List<ComputeJobResult> rcvd) {
+            if (res.getException() != null)
+                throw res.getException();
+
+            return ComputeJobResultPolicy.WAIT;
+        }
+
+        /** {@inheritDoc} */
+        @Override public @Nullable Integer reduce(List<ComputeJobResult> 
results) {
+            return null;
+        }
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/SecurityTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/SecurityTestSuite.java
index 5b7725e..a4d1815 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/SecurityTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/SecurityTestSuite.java
@@ -26,6 +26,7 @@ import 
org.apache.ignite.internal.processors.security.cache.closure.EntryProcess
 import 
org.apache.ignite.internal.processors.security.cache.closure.ScanQueryRemoteSecurityContextCheckTest;
 import 
org.apache.ignite.internal.processors.security.client.ThinClientPermissionCheckTest;
 import 
org.apache.ignite.internal.processors.security.compute.ComputePermissionCheckTest;
+import 
org.apache.ignite.internal.processors.security.compute.closure.ComputeTaskCancelRemoteSecurityContextCheckTest;
 import 
org.apache.ignite.internal.processors.security.compute.closure.ComputeTaskRemoteSecurityContextCheckTest;
 import 
org.apache.ignite.internal.processors.security.compute.closure.DistributedClosureRemoteSecurityContextCheckTest;
 import 
org.apache.ignite.internal.processors.security.compute.closure.ExecutorServiceRemoteSecurityContextCheckTest;
@@ -47,12 +48,14 @@ import org.junit.runners.Suite;
 
     DistributedClosureRemoteSecurityContextCheckTest.class,
     ComputeTaskRemoteSecurityContextCheckTest.class,
+    ComputeTaskCancelRemoteSecurityContextCheckTest.class,
     ExecutorServiceRemoteSecurityContextCheckTest.class,
     ScanQueryRemoteSecurityContextCheckTest.class,
     EntryProcessorRemoteSecurityContextCheckTest.class,
     DataStreamerRemoteSecurityContextCheckTest.class,
     CacheLoadRemoteSecurityContextCheckTest.class,
     ThinClientPermissionCheckTest.class,
+
     InvalidServerTest.class,
 })
 public class SecurityTestSuite {

Reply via email to