This is an automated email from the ASF dual-hosted git repository.
ilyak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 1bb32a7 IGNITE-12300 Use initiating node security context in
ComputeJob.cancel - Fixes #7017.
1bb32a7 is described below
commit 1bb32a7d5a86f31f77bd2d729b083e1f75be78d9
Author: d.garus <[email protected]>
AuthorDate: Fri Nov 8 13:06:21 2019 +0300
IGNITE-12300 Use initiating node security context in ComputeJob.cancel -
Fixes #7017.
Signed-off-by: Ilya Kasnacheev <[email protected]>
---
.../internal/processors/job/GridJobWorker.java | 11 +-
.../AbstractRemoteSecurityContextCheckTest.java | 29 +--
...teTaskCancelRemoteSecurityContextCheckTest.java | 195 +++++++++++++++++++++
.../ignite/testsuites/SecurityTestSuite.java | 3 +
4 files changed, 225 insertions(+), 13 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
index e6a72b5..90e54e5 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
@@ -51,6 +51,8 @@ import
org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import
org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.internal.processors.security.OperationSecurityContext;
+import org.apache.ignite.internal.processors.security.SecurityContext;
import
org.apache.ignite.internal.processors.service.GridServiceNotFoundException;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
@@ -174,6 +176,9 @@ public class GridJobWorker extends GridWorker implements
GridTimeoutObject {
/** Request topology version. */
private final String execName;
+ /** Security context. */
+ private final SecurityContext secCtx;
+
/**
* @param ctx Kernal context.
* @param dep Grid deployment.
@@ -240,6 +245,8 @@ public class GridJobWorker extends GridWorker implements
GridTimeoutObject {
jobTopic = TOPIC_JOB.topic(ses.getJobId(), locNodeId);
taskTopic = TOPIC_TASK.topic(ses.getJobId(), locNodeId);
+
+ secCtx = ctx.security().securityContext();
}
/**
@@ -721,7 +728,9 @@ public class GridJobWorker extends GridWorker implements
GridTimeoutObject {
U.wrapThreadLoader(dep.classLoader(), new IgniteRunnable() {
@Override public void run() {
- job0.cancel();
+ try (OperationSecurityContext c =
ctx.security().withContext(secCtx)) {
+ job0.cancel();
+ }
}
});
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/AbstractRemoteSecurityContextCheckTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/AbstractRemoteSecurityContextCheckTest.java
index 05b348c..9acc07e 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/AbstractRemoteSecurityContextCheckTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/AbstractRemoteSecurityContextCheckTest.java
@@ -41,8 +41,6 @@ import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteRunnable;
import static org.apache.ignite.Ignition.localIgnite;
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertThat;
/**
*
@@ -162,7 +160,7 @@ public abstract class
AbstractRemoteSecurityContextCheckTest extends AbstractSec
private UUID expSecSubjId;
/** */
- private Verifier clear() {
+ public Verifier clear() {
registeredSubjects.clear();
expInvokes.clear();
@@ -185,11 +183,16 @@ public abstract class
AbstractRemoteSecurityContextCheckTest extends AbstractSec
}
/**
- * Registers current security context and increments invoke's counter.
+ * Registers a security subject referred for {@code localIgnite} and
increments invoke counter.
*/
- public synchronized void register() {
- IgniteEx ignite = (IgniteEx)localIgnite();
+ public void register() {
+ register((IgniteEx)localIgnite());
+ }
+ /**
+ * Registers a security subject referred for the passed {@code ignite}
and increments invoke counter.
+ */
+ public synchronized void register(IgniteEx ignite) {
registeredSubjects.add(new T2<>(secSubjectId(ignite),
ignite.name()));
expInvokes.computeIfPresent(ignite.name(), (name, t2) -> {
@@ -204,15 +207,15 @@ public abstract class
AbstractRemoteSecurityContextCheckTest extends AbstractSec
/**
* Checks result of test and clears expected behavior.
*/
- private void checkResult() {
+ public void checkResult() {
registeredSubjects.forEach(t ->
- assertThat("Invalide security context on node " + t.get2(),
- t.get1(), is(expSecSubjId))
+ assertEquals("Invalide security context on node " + t.get2(),
+ expSecSubjId, t.get1())
);
expInvokes.forEach((key, value) ->
- assertThat("Node " + key + ". Execution of register: ",
- value.get2(), is(value.get1())));
+ assertEquals("Node " + key + ". Execution of register: ",
+ value.get1(), value.get2()));
clear();
}
@@ -225,8 +228,10 @@ public abstract class
AbstractRemoteSecurityContextCheckTest extends AbstractSec
}
/** */
- private void initiator(IgniteEx initiator) {
+ public Verifier initiator(IgniteEx initiator) {
expSecSubjId = secSubjectId(initiator);
+
+ return this;
}
/** */
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/compute/closure/ComputeTaskCancelRemoteSecurityContextCheckTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/compute/closure/ComputeTaskCancelRemoteSecurityContextCheckTest.java
new file mode 100644
index 0000000..15c342c
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/compute/closure/ComputeTaskCancelRemoteSecurityContextCheckTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.security.compute.closure;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeJobResultPolicy;
+import org.apache.ignite.compute.ComputeTask;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgnitionEx;
+import
org.apache.ignite.internal.processors.security.AbstractRemoteSecurityContextCheckTest;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+
+/**
+ * Testing operation security context when the compute task is canceled on
remote node.
+ * <p>
+ * The initiator node send a task on a remote node asynchronously.
+ * The method {@code cancel} is called on IgniteFuture or
+ * the remote node leaves the cluster that is the reason for calling the
'cancel' method of a ComputeJob.
+ * This method should be executed on the remote node with the initiator
+ * node security context.
+ */
+public class ComputeTaskCancelRemoteSecurityContextCheckTest extends
AbstractRemoteSecurityContextCheckTest {
+ /** Cancel method was executed. */
+ private static final AtomicBoolean CANCELED = new AtomicBoolean();
+
+ /** Waiting timeout. */
+ private static final int TIMEOUT = 20_000;
+
+ /** */
+ private static final CyclicBarrier BARRIER = new CyclicBarrier(2);
+
+ /** {@inheritDoc} */
+ @Override protected void setupVerifier(Verifier verifier) {
+ // No-op.
+ }
+
+ /**
+ * Initiator is server node. Remote is server node.
+ */
+ @Test
+ public void testSrvTaskInitatorCancelOnSrvNode() throws Exception {
+ prepareAndCheck(false, false);
+ }
+
+ /**
+ * Initiator is server node. Remote is client node.
+ */
+ @Test
+ public void testSrvTaskInitatorCancelOnClientNode() throws Exception {
+ prepareAndCheck(false, true);
+ }
+
+ /**
+ * Initiator is client node. Remote is server node.
+ */
+ @Test
+ public void testClientTaskInitatorCancelOnSrvNode() throws Exception {
+ prepareAndCheck(true, false);
+ }
+
+ /**
+ * Initiator is client node. Remote is client node.
+ */
+ @Test
+ public void testClientTaskInitatorCancelOnClientNode() throws Exception {
+ prepareAndCheck(true, true);
+ }
+
+ /**
+ * @param isClientInitiator The initiator node is client.
+ * @param isClientRmt The remote node is client.
+ */
+ private void prepareAndCheck(boolean isClientInitiator, boolean
isClientRmt) throws Exception {
+ try {
+ IgniteEx srv = startGridAllowAll("srv_init");
+
+ IgniteEx initator = isClientInitiator ?
startClientAllowAll("clnt_init") : srv;
+
+ IgniteEx rmt = isClientRmt ? startClientAllowAll("clnt_rmt") :
startGridAllowAll("srv_rmt");
+
+ srv.cluster().active(true);
+
+ //Checks the case when IgniteFuture#cancel is called.
+ checkCancel(initator, rmt, IgniteFuture::cancel);
+ //Checks the case when rmt node leaves the cluster.
+ checkCancel(initator, rmt, f -> IgnitionEx.stop(rmt.name(), true,
false));
+ }
+ finally {
+ G.stopAll(true);
+
+ cleanPersistenceDir();
+ }
+ }
+
+ /** */
+ private void checkCancel(IgniteEx initator, IgniteEx rmt,
Consumer<IgniteFuture> consumer) throws Exception {
+ VERIFIER
+ .clear()
+ .initiator(initator)
+ .expect(rmt.name(), 1);
+
+ BARRIER.reset();
+ CANCELED.set(false);
+
+ IgniteFuture fut = compute(initator,
Collections.singleton(rmt.localNode().id()))
+ .executeAsync(new TestComputeTask(), 0);
+
+ BARRIER.await(TIMEOUT, TimeUnit.MILLISECONDS);
+
+ consumer.accept(fut);
+
+ GridTestUtils.waitForCondition(CANCELED::get, TIMEOUT);
+
+ VERIFIER.checkResult();
+ }
+
+ /**
+ * Compute task for tests.
+ */
+ static class TestComputeTask implements ComputeTask<Integer, Integer> {
+ /** {@inheritDoc} */
+ @Override public Map<? extends ComputeJob, ClusterNode>
map(List<ClusterNode> subgrid,
+ Integer arg) throws IgniteException {
+ return Collections.singletonMap(
+ new ComputeJob() {
+ @IgniteInstanceResource
+ private Ignite loc;
+
+ @Override public void cancel() {
+ VERIFIER.register((IgniteEx)loc);
+
+ CANCELED.set(true);
+ }
+
+ @Override public Object execute() {
+ try {
+ BARRIER.await(TIMEOUT, TimeUnit.MILLISECONDS);
+
+ GridTestUtils.waitForCondition(() -> false,
TIMEOUT);
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ return null;
+ }
+ },
subgrid.stream().findFirst().orElseThrow(IllegalStateException::new)
+ );
+ }
+
+ /** {@inheritDoc} */
+ @Override public ComputeJobResultPolicy result(ComputeJobResult res,
List<ComputeJobResult> rcvd) {
+ if (res.getException() != null)
+ throw res.getException();
+
+ return ComputeJobResultPolicy.WAIT;
+ }
+
+ /** {@inheritDoc} */
+ @Override public @Nullable Integer reduce(List<ComputeJobResult>
results) {
+ return null;
+ }
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/SecurityTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/SecurityTestSuite.java
index 5b7725e..a4d1815 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/SecurityTestSuite.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/SecurityTestSuite.java
@@ -26,6 +26,7 @@ import
org.apache.ignite.internal.processors.security.cache.closure.EntryProcess
import
org.apache.ignite.internal.processors.security.cache.closure.ScanQueryRemoteSecurityContextCheckTest;
import
org.apache.ignite.internal.processors.security.client.ThinClientPermissionCheckTest;
import
org.apache.ignite.internal.processors.security.compute.ComputePermissionCheckTest;
+import
org.apache.ignite.internal.processors.security.compute.closure.ComputeTaskCancelRemoteSecurityContextCheckTest;
import
org.apache.ignite.internal.processors.security.compute.closure.ComputeTaskRemoteSecurityContextCheckTest;
import
org.apache.ignite.internal.processors.security.compute.closure.DistributedClosureRemoteSecurityContextCheckTest;
import
org.apache.ignite.internal.processors.security.compute.closure.ExecutorServiceRemoteSecurityContextCheckTest;
@@ -47,12 +48,14 @@ import org.junit.runners.Suite;
DistributedClosureRemoteSecurityContextCheckTest.class,
ComputeTaskRemoteSecurityContextCheckTest.class,
+ ComputeTaskCancelRemoteSecurityContextCheckTest.class,
ExecutorServiceRemoteSecurityContextCheckTest.class,
ScanQueryRemoteSecurityContextCheckTest.class,
EntryProcessorRemoteSecurityContextCheckTest.class,
DataStreamerRemoteSecurityContextCheckTest.class,
CacheLoadRemoteSecurityContextCheckTest.class,
ThinClientPermissionCheckTest.class,
+
InvalidServerTest.class,
})
public class SecurityTestSuite {