This is an automated email from the ASF dual-hosted git repository. ilyak pushed a commit to branch ignite-12300 in repository https://gitbox.apache.org/repos/asf/ignite.git
commit 67394f8e28998b6d29795f7b03238dfc51bc9449 Author: d.garus <[email protected]> AuthorDate: Thu Nov 7 14:42:28 2019 +0300 IGNITE-12300 Use initiating node security context in ComputeJob.cancel - Fixes #7017. Signed-off-by: Ilya Kasnacheev <[email protected]> --- .../internal/processors/job/GridJobWorker.java | 11 +- .../AbstractRemoteSecurityContextCheckTest.java | 29 +-- ...teTaskCancelRemoteSecurityContextCheckTest.java | 201 +++++++++++++++++++++ .../ignite/testsuites/SecurityTestSuite.java | 2 + 4 files changed, 230 insertions(+), 13 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java index e6a72b5..90e54e5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java @@ -51,6 +51,8 @@ import org.apache.ignite.internal.managers.deployment.GridDeployment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable; import org.apache.ignite.internal.processors.query.GridQueryProcessor; +import org.apache.ignite.internal.processors.security.OperationSecurityContext; +import org.apache.ignite.internal.processors.security.SecurityContext; import org.apache.ignite.internal.processors.service.GridServiceNotFoundException; import org.apache.ignite.internal.processors.task.GridInternal; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; @@ -174,6 +176,9 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject { /** Request topology version. */ private final String execName; + /** Security context. */ + private final SecurityContext secCtx; + /** * @param ctx Kernal context. * @param dep Grid deployment. @@ -240,6 +245,8 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject { jobTopic = TOPIC_JOB.topic(ses.getJobId(), locNodeId); taskTopic = TOPIC_TASK.topic(ses.getJobId(), locNodeId); + + secCtx = ctx.security().securityContext(); } /** @@ -721,7 +728,9 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject { U.wrapThreadLoader(dep.classLoader(), new IgniteRunnable() { @Override public void run() { - job0.cancel(); + try (OperationSecurityContext c = ctx.security().withContext(secCtx)) { + job0.cancel(); + } } }); } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/AbstractRemoteSecurityContextCheckTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/AbstractRemoteSecurityContextCheckTest.java index 05b348c..9acc07e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/AbstractRemoteSecurityContextCheckTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/AbstractRemoteSecurityContextCheckTest.java @@ -41,8 +41,6 @@ import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteRunnable; import static org.apache.ignite.Ignition.localIgnite; -import static org.hamcrest.core.Is.is; -import static org.junit.Assert.assertThat; /** * @@ -162,7 +160,7 @@ public abstract class AbstractRemoteSecurityContextCheckTest extends AbstractSec private UUID expSecSubjId; /** */ - private Verifier clear() { + public Verifier clear() { registeredSubjects.clear(); expInvokes.clear(); @@ -185,11 +183,16 @@ public abstract class AbstractRemoteSecurityContextCheckTest extends AbstractSec } /** - * Registers current security context and increments invoke's counter. + * Registers a security subject referred for {@code localIgnite} and increments invoke counter. */ - public synchronized void register() { - IgniteEx ignite = (IgniteEx)localIgnite(); + public void register() { + register((IgniteEx)localIgnite()); + } + /** + * Registers a security subject referred for the passed {@code ignite} and increments invoke counter. + */ + public synchronized void register(IgniteEx ignite) { registeredSubjects.add(new T2<>(secSubjectId(ignite), ignite.name())); expInvokes.computeIfPresent(ignite.name(), (name, t2) -> { @@ -204,15 +207,15 @@ public abstract class AbstractRemoteSecurityContextCheckTest extends AbstractSec /** * Checks result of test and clears expected behavior. */ - private void checkResult() { + public void checkResult() { registeredSubjects.forEach(t -> - assertThat("Invalide security context on node " + t.get2(), - t.get1(), is(expSecSubjId)) + assertEquals("Invalide security context on node " + t.get2(), + expSecSubjId, t.get1()) ); expInvokes.forEach((key, value) -> - assertThat("Node " + key + ". Execution of register: ", - value.get2(), is(value.get1()))); + assertEquals("Node " + key + ". Execution of register: ", + value.get1(), value.get2())); clear(); } @@ -225,8 +228,10 @@ public abstract class AbstractRemoteSecurityContextCheckTest extends AbstractSec } /** */ - private void initiator(IgniteEx initiator) { + public Verifier initiator(IgniteEx initiator) { expSecSubjId = secSubjectId(initiator); + + return this; } /** */ diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/compute/closure/ComputeTaskCancelRemoteSecurityContextCheckTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/compute/closure/ComputeTaskCancelRemoteSecurityContextCheckTest.java new file mode 100644 index 0000000..90641d5cf --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/compute/closure/ComputeTaskCancelRemoteSecurityContextCheckTest.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.security.compute.closure; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.compute.ComputeJob; +import org.apache.ignite.compute.ComputeJobResult; +import org.apache.ignite.compute.ComputeJobResultPolicy; +import org.apache.ignite.compute.ComputeTask; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgnitionEx; +import org.apache.ignite.internal.processors.security.AbstractRemoteSecurityContextCheckTest; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.testframework.GridTestUtils; +import org.jetbrains.annotations.Nullable; +import org.junit.Test; + +/** + * Testing operation security context when the compute task is canceled on remote node. + * <p> + * The initiator node send a task on a remote node asynchronously. + * The method {@code cancel} is called on IgniteFuture or + * the remote node leaves the cluster that is the reason for calling the 'cancel' method of a ComputeJob. + * This method should be executed on the remote node with the initiator + * node security context. + */ +public class ComputeTaskCancelRemoteSecurityContextCheckTest extends AbstractRemoteSecurityContextCheckTest { + /** Cancel action was called. */ + private static final AtomicBoolean CANCEL_CALLED = new AtomicBoolean(); + + /** Cancel method was executed. */ + private static final AtomicBoolean CANCELED = new AtomicBoolean(); + + /** Waiting timeout. */ + private static final int TIMEOUT = 20_000; + + /** */ + private static final CyclicBarrier BARRIER = new CyclicBarrier(2); + + /** {@inheritDoc} */ + @Override protected void setupVerifier(Verifier verifier) { + // No-op. + } + + /** + * Initiator is server node. Remote is server node. + */ + @Test + public void testSrvTaskInitatorCancelOnSrvNode() throws Exception { + prepareAndCheck(false, false); + } + + /** + * Initiator is server node. Remote is client node. + */ + @Test + public void testSrvTaskInitatorCancelOnClientNode() throws Exception { + prepareAndCheck(false, true); + } + + /** + * Initiator is client node. Remote is server node. + */ + @Test + public void testClientTaskInitatorCancelOnSrvNode() throws Exception { + prepareAndCheck(true, false); + } + + /** + * Initiator is client node. Remote is client node. + */ + @Test + public void testClientTaskInitatorCancelOnClientNode() throws Exception { + prepareAndCheck(true, true); + } + + /** + * @param isClientInitiator The initiator node is client. + * @param isClientRmt The remote node is client. + */ + private void prepareAndCheck(boolean isClientInitiator, boolean isClientRmt) throws Exception { + try { + IgniteEx srv = startGridAllowAll("srv_init"); + + IgniteEx initator = isClientInitiator ? startClientAllowAll("clnt_init") : srv; + + IgniteEx rmt = isClientRmt ? startClientAllowAll("clnt_rmt") : startGridAllowAll("srv_rmt"); + + srv.cluster().active(true); + + //Checks the case when IgniteFuture#cancel is called. + checkCancel(initator, rmt, IgniteFuture::cancel); + //Checks the case when rmt node leaves the cluster. + checkCancel(initator, rmt, f -> IgnitionEx.stop(rmt.name(), true, false)); + } + finally { + G.stopAll(true); + + cleanPersistenceDir(); + } + } + + /** */ + private void checkCancel(IgniteEx initator, IgniteEx rmt, Consumer<IgniteFuture> consumer) throws Exception { + VERIFIER + .clear() + .initiator(initator) + .expect(rmt.name(), 1); + + BARRIER.reset(); + CANCEL_CALLED.set(false); + CANCELED.set(false); + + IgniteFuture fut = compute(initator, Collections.singleton(rmt.localNode().id())) + .executeAsync(new TestComputeTask(), 0); + + BARRIER.await(TIMEOUT, TimeUnit.MILLISECONDS); + + consumer.accept(fut); + + CANCEL_CALLED.set(true); + + GridTestUtils.waitForCondition(CANCELED::get, TIMEOUT); + + VERIFIER.checkResult(); + } + + /** + * Compute task for tests. + */ + static class TestComputeTask implements ComputeTask<Integer, Integer> { + /** {@inheritDoc} */ + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, + Integer arg) throws IgniteException { + return Collections.singletonMap( + new ComputeJob() { + @IgniteInstanceResource + private Ignite loc; + + @Override public void cancel() { + VERIFIER.register((IgniteEx)loc); + + CANCELED.set(true); + } + + @Override public Object execute() { + try { + BARRIER.await(TIMEOUT, TimeUnit.MILLISECONDS); + + GridTestUtils.waitForCondition(CANCEL_CALLED::get, TIMEOUT); + } + catch (Exception e) { + throw new RuntimeException(e); + } + + return null; + } + }, subgrid.stream().findFirst().orElseThrow(IllegalStateException::new) + ); + } + + /** {@inheritDoc} */ + @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) { + if (res.getException() != null) + throw res.getException(); + + return ComputeJobResultPolicy.WAIT; + } + + /** {@inheritDoc} */ + @Override public @Nullable Integer reduce(List<ComputeJobResult> results) { + return null; + } + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/SecurityTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/SecurityTestSuite.java index 5b7725e..94593ea 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/SecurityTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/SecurityTestSuite.java @@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.security.cache.closure.EntryProcess import org.apache.ignite.internal.processors.security.cache.closure.ScanQueryRemoteSecurityContextCheckTest; import org.apache.ignite.internal.processors.security.client.ThinClientPermissionCheckTest; import org.apache.ignite.internal.processors.security.compute.ComputePermissionCheckTest; +import org.apache.ignite.internal.processors.security.compute.closure.ComputeTaskCancelRemoteSecurityContextCheckTest; import org.apache.ignite.internal.processors.security.compute.closure.ComputeTaskRemoteSecurityContextCheckTest; import org.apache.ignite.internal.processors.security.compute.closure.DistributedClosureRemoteSecurityContextCheckTest; import org.apache.ignite.internal.processors.security.compute.closure.ExecutorServiceRemoteSecurityContextCheckTest; @@ -47,6 +48,7 @@ import org.junit.runners.Suite; DistributedClosureRemoteSecurityContextCheckTest.class, ComputeTaskRemoteSecurityContextCheckTest.class, + ComputeTaskCancelRemoteSecurityContextCheckTest.class, ExecutorServiceRemoteSecurityContextCheckTest.class, ScanQueryRemoteSecurityContextCheckTest.class, EntryProcessorRemoteSecurityContextCheckTest.class,
