This is an automated email from the ASF dual-hosted git repository.
mpetrov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 17928fe8b7d IGNITE-19410 Fixed node crash due to SecurityContext not
being found during discovery message processing. (#10701)
17928fe8b7d is described below
commit 17928fe8b7d8c02c29c2a8360f8a3ada479d6a2d
Author: Mikhail Petrov <[email protected]>
AuthorDate: Thu May 25 11:25:05 2023 +0300
IGNITE-19410 Fixed node crash due to SecurityContext not being found during
discovery message processing. (#10701)
---
.../managers/discovery/GridDiscoveryManager.java | 21 +-
.../security/IgniteSecurityProcessor.java | 40 +-
.../spi/discovery/DiscoveryNotification.java | 8 +-
.../ignite/spi/discovery/tcp/ServerImpl.java | 9 +-
.../security/IgniteSecurityProcessorTest.java | 77 ++-
.../NodeSecurityContextPropagationTest.java | 551 +++++++++++++++++++++
.../security/client/ClientReconnectTest.java | 5 -
.../ignite/testsuites/SecurityTestSuite.java | 4 +-
.../zk/internal/ZookeeperDiscoveryImpl.java | 16 +-
9 files changed, 667 insertions(+), 64 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 2a733d942b8..b5dd27c0ee8 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
@@ -258,7 +259,7 @@ public class GridDiscoveryManager extends
GridManagerAdapter<DiscoverySpi> {
new GridBoundedConcurrentLinkedHashMap<>(DISCOVERY_HISTORY_SIZE);
/** Topology snapshots history. */
- private volatile Map<Long, Collection<ClusterNode>> topHist = new
HashMap<>();
+ private volatile NavigableMap<Long, Collection<ClusterNode>> topHist =
Collections.emptyNavigableMap();
/** Topology version. */
private final AtomicReference<Snapshot> topSnap =
@@ -606,7 +607,7 @@ public class GridDiscoveryManager extends
GridManagerAdapter<DiscoverySpi> {
final ClusterNode locNode = localNode();
if (notification.getTopHist() != null)
- topHist = notification.getTopHist();
+ topHist =
Collections.unmodifiableNavigableMap(notification.getTopHist());
boolean verChanged;
@@ -844,7 +845,7 @@ public class GridDiscoveryManager extends
GridManagerAdapter<DiscoverySpi> {
assert rmvd != null : histVer;
}
- topHist.clear();
+ topHist = Collections.emptyNavigableMap();
topSnap.set(new Snapshot(AffinityTopologyVersion.ZERO,
createDiscoCache(AffinityTopologyVersion.ZERO,
ctx.state().clusterState(), locNode,
@@ -2671,11 +2672,25 @@ public class GridDiscoveryManager extends
GridManagerAdapter<DiscoverySpi> {
* @return resolved node, or <code>null</code> if node not found.
*/
public ClusterNode historicalNode(UUID nodeId) {
+ long lastCheckedLocTopVer = Long.MAX_VALUE;
+
for (DiscoCache discoCache : discoCacheHist.descendingValues()) {
ClusterNode node = discoCache.node(nodeId);
if (node != null)
return node;
+
+ lastCheckedLocTopVer = discoCache.version().topologyVersion();
+ }
+
+ // We did not find node with given ID in the discovery history of the
local node. This means that the local
+ // node could join the cluster after the node with given ID left it.
Let's check in the global topology history,
+ // which contains all topology versions since the cluster was started.
+ for (Collection<ClusterNode> top :
topHist.headMap(lastCheckedLocTopVer, false).descendingMap().values()) {
+ for (ClusterNode node : top) {
+ if (F.eq(node.id(), nodeId))
+ return node;
+ }
}
return null;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/IgniteSecurityProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/security/IgniteSecurityProcessor.java
index 11cd00b8cdf..0840426667f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/IgniteSecurityProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/security/IgniteSecurityProcessor.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.security;
import java.security.Security;
import java.util.Collection;
import java.util.Map;
-import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
@@ -46,6 +45,7 @@ import org.apache.ignite.spi.IgniteNodeValidationResult;
import org.apache.ignite.spi.discovery.DiscoveryDataBag;
import org.jetbrains.annotations.Nullable;
+import static java.util.Optional.ofNullable;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static
org.apache.ignite.internal.processors.security.SecurityUtils.IGNITE_INTERNAL_PACKAGE;
@@ -148,21 +148,13 @@ public class IgniteSecurityProcessor extends
IgniteSecurityAdapter {
/** {@inheritDoc} */
@Override public OperationSecurityContext withContext(UUID subjId) {
try {
- ClusterNode node =
Optional.ofNullable(ctx.discovery().node(subjId))
- .orElseGet(() -> ctx.discovery().historicalNode(subjId));
-
- SecurityContext res;
-
- if (node == null)
- res = secPrc.securityContext(subjId);
- else if (dfltSecCtx.subject().id().equals(subjId))
- res = dfltSecCtx;
- else
- res = secCtxs.computeIfAbsent(subjId, uuid ->
nodeSecurityContext(marsh, U.resolveClassLoader(ctx.config()), node));
+ SecurityContext res = secPrc.securityContext(subjId);
if (res == null) {
- throw new IllegalStateException("Failed to find security
context " +
- "for subject with given ID : " + subjId);
+ res = findNodeSecurityContext(subjId);
+
+ if (res == null)
+ throw new IllegalStateException("Failed to find security
context for subject with given ID : " + subjId);
}
return withContext(res);
@@ -174,6 +166,26 @@ public class IgniteSecurityProcessor extends
IgniteSecurityAdapter {
}
}
+ /**
+ * Looks for a node which ID is equal to the given Subject ID. If such a
node exists, returns the Security Context
+ * that belongs to it. Otherwise {@code null}.
+ */
+ @Nullable private SecurityContext findNodeSecurityContext(UUID subjId) {
+ SecurityContext locNodeSecCtx = dfltSecCtx;
+
+ if (locNodeSecCtx.subject().id().equals(subjId))
+ return locNodeSecCtx;
+
+ ClusterNode node =
ofNullable(ctx.discovery().node(subjId)).orElseGet(() ->
ctx.discovery().historicalNode(subjId));
+
+ return node == null
+ ? null
+ : secCtxs.computeIfAbsent(
+ node.id(),
+ uuid -> nodeSecurityContext(marsh,
U.resolveClassLoader(ctx.config()), node)
+ );
+ }
+
/** Restores local node context for the current thread. */
void restoreDefaultContext() {
curSecCtx.set(null);
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryNotification.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryNotification.java
index 938d512c77b..6dc38c1c60d 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryNotification.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryNotification.java
@@ -17,7 +17,7 @@
package org.apache.ignite.spi.discovery;
import java.util.Collection;
-import java.util.Map;
+import java.util.NavigableMap;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.processors.tracing.messages.SpanContainer;
import org.jetbrains.annotations.Nullable;
@@ -39,7 +39,7 @@ public class DiscoveryNotification {
private final Collection<ClusterNode> topSnapshot;
/** Topology history. */
- private @Nullable Map<Long, Collection<ClusterNode>> topHist;
+ private @Nullable NavigableMap<Long, Collection<ClusterNode>> topHist;
/** Custom message data. */
private @Nullable DiscoverySpiCustomMessage customMsgData;
@@ -74,7 +74,7 @@ public class DiscoveryNotification {
long topVer,
ClusterNode node,
Collection<ClusterNode> topSnapshot,
- @Nullable Map<Long, Collection<ClusterNode>> topHist,
+ @Nullable NavigableMap<Long, Collection<ClusterNode>> topHist,
@Nullable DiscoverySpiCustomMessage customMsgData,
SpanContainer spanContainer
) {
@@ -118,7 +118,7 @@ public class DiscoveryNotification {
/**
* @return Topology history.
*/
- public Map<Long, Collection<ClusterNode>> getTopHist() {
+ public NavigableMap<Long, Collection<ClusterNode>> getTopHist() {
return topHist;
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index d5ebca56245..8c85ee1819b 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -43,6 +43,7 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.NavigableMap;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.Set;
@@ -625,7 +626,7 @@ class ServerImpl extends TcpDiscoveryImpl {
topVer++;
- Map<Long, Collection<ClusterNode>> hist =
updateTopologyHistory(topVer,
+ NavigableMap<Long, Collection<ClusterNode>> hist =
updateTopologyHistory(topVer,
Collections.unmodifiableList(top));
lsnr.onDiscovery(
@@ -1703,7 +1704,7 @@ class ServerImpl extends TcpDiscoveryImpl {
Collection<ClusterNode> top = upcast(ring.visibleNodes());
- Map<Long, Collection<ClusterNode>> hist =
updateTopologyHistory(topVer, top);
+ NavigableMap<Long, Collection<ClusterNode>> hist =
updateTopologyHistory(topVer, top);
lsnr.onDiscovery(
new DiscoveryNotification(type, topVer, node, top, hist, null,
spanContainer)
@@ -1727,7 +1728,7 @@ class ServerImpl extends TcpDiscoveryImpl {
* @param top Topology snapshot.
* @return Copy of updated topology history.
*/
- @Nullable private Map<Long, Collection<ClusterNode>>
updateTopologyHistory(long topVer, Collection<ClusterNode> top) {
+ @Nullable private NavigableMap<Long, Collection<ClusterNode>>
updateTopologyHistory(long topVer, Collection<ClusterNode> top) {
synchronized (mux) {
if (topHist.containsKey(topVer))
return null;
@@ -6367,7 +6368,7 @@ class ServerImpl extends TcpDiscoveryImpl {
TcpDiscoverySpiState spiState = spiStateCopy();
- Map<Long, Collection<ClusterNode>> hist;
+ NavigableMap<Long, Collection<ClusterNode>> hist;
synchronized (mux) {
hist = new TreeMap<>(topHist);
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/IgniteSecurityProcessorTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/IgniteSecurityProcessorTest.java
index 4491183ce0f..55b27645e4e 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/IgniteSecurityProcessorTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/IgniteSecurityProcessorTest.java
@@ -17,49 +17,76 @@
package org.apache.ignite.internal.processors.security;
+import java.lang.reflect.Method;
import java.util.UUID;
import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.managers.GridManagerAdapter;
+import
org.apache.ignite.internal.managers.communication.GridIoSecurityAwareMessage;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.ListeningTestLogger;
import org.apache.ignite.testframework.LogListener;
import org.junit.Test;
-import static
org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
+import static
org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL;
/**
* Unit test for {@link IgniteSecurityProcessor}.
*/
-public class IgniteSecurityProcessorTest {
- /**
- * Checks that {@link IgniteSecurityProcessor#withContext(UUID)} throws
exception in case a node ID is unknown.
- */
+public class IgniteSecurityProcessorTest extends AbstractSecurityTest {
+ /** */
+ private static ListeningTestLogger listeningLog;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(
+ String instanceName,
+ AbstractTestSecurityPluginProvider pluginProv
+ ) throws Exception {
+ return super.getConfiguration(instanceName, pluginProv)
+ .setGridLogger(listeningLog);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ listeningLog = new ListeningTestLogger();
+ }
+
+ /** Checks that {@link IgniteSecurityProcessor#withContext(UUID)} throws
exception in case a node ID is unknown. */
@Test
- public void testThrowIllegalStateExceptionIfNodeNotFoundInDiscoCache() {
- GridKernalContext ctx = mock(GridKernalContext.class);
- when(ctx.config()).thenReturn(new IgniteConfiguration());
- when(ctx.discovery()).thenReturn(mock(GridDiscoveryManager.class));
+ public void testThrowIllegalStateExceptionIfNodeNotFoundInDiscoCache()
throws Exception {
+ IgniteEx srv = startGridAllowAll("srv");
- LogListener logLsnr = LogListener
- .matches(s -> s.contains("Failed to obtain a security context."))
- .times(1)
- .build();
+ IgniteEx cli = startClientAllowAll("cli");
- ListeningTestLogger log = new ListeningTestLogger();
+ Method getSpiMethod =
GridManagerAdapter.class.getDeclaredMethod("getSpi");
- log.registerListener(logLsnr);
+ getSpiMethod.setAccessible(true);
- when(ctx.log(IgniteSecurityProcessor.class)).thenReturn(log);
+ TcpCommunicationSpi spi =
(TcpCommunicationSpi)getSpiMethod.invoke(cli.context().io());
- GridSecurityProcessor secPrc = mock(GridSecurityProcessor.class);
+ LogListener logPattern = LogListener
+ .matches(s -> s.contains("Failed to obtain a security context."))
+ .times(1)
+ .build();
- IgniteSecurityProcessor ignSecPrc = new IgniteSecurityProcessor(ctx,
secPrc);
+ listeningLog.registerListener(logPattern);
- assertThrowsWithCause(() -> ignSecPrc.withContext(UUID.randomUUID()),
IllegalStateException.class);
+ spi.sendMessage(srv.localNode(), new GridIoSecurityAwareMessage(
+ UUID.randomUUID(),
+ PUBLIC_POOL,
+ TOPIC_CACHE,
+ TOPIC_CACHE.ordinal(),
+ new AffinityTopologyVersion(),
+ false,
+ 0,
+ false
+ ));
- assertTrue(logLsnr.check());
+ GridTestUtils.waitForCondition(logPattern::check, getTestTimeout());
}
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/NodeSecurityContextPropagationTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/NodeSecurityContextPropagationTest.java
new file mode 100644
index 00000000000..5f681a564c5
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/NodeSecurityContextPropagationTest.java
@@ -0,0 +1,551 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.security;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.StopNodeOrHaltFailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.events.DiscoveryCustomEvent;
+import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import
org.apache.ignite.internal.managers.discovery.SecurityAwareCustomMessageWrapper;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.spi.discovery.DiscoverySpi;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME;
+import static
org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/** */
+public class NodeSecurityContextPropagationTest extends GridCommonAbstractTest
{
+ /** */
+ private static final Collection<UUID> TEST_MESSAGE_ACCEPTED_NODES = new
HashSet<>();
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName)
+ .setFailureHandler(new StopNodeOrHaltFailureHandler())
+ .setLocalEventListeners(
+ Collections.singletonMap(e -> {
+ DiscoveryCustomEvent discoEvt = (DiscoveryCustomEvent)e;
+
+ if (discoEvt.customMessage() instanceof
TestDiscoveryAcknowledgeMessage)
+ TEST_MESSAGE_ACCEPTED_NODES.add(discoEvt.node().id());
+
+ return true;
+ }, new int[] {EVT_DISCOVERY_CUSTOM_EVT})
+ )
+ .setDataStorageConfiguration(new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(new
DataRegionConfiguration()
+ .setPersistenceEnabled(true)
+ .setMaxSize(100L * 1024 * 1024)))
+ .setAuthenticationEnabled(true);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi())
+ .setIpFinder(new TcpDiscoveryVmIpFinder()
+ .setAddresses(Collections.singleton("127.0.0.1:47500")));
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteEx startGrid(int idx) throws Exception {
+ IgniteEx ignite = super.startGrid(idx);
+
+ wrapRingMessageWorkerQueue(ignite);
+
+ return ignite;
+ }
+
+ /** */
+ @Test
+ public void testProcessCustomDiscoveryMessageFromLeftNode() throws
Exception {
+ IgniteEx crd = startGrid(0);
+
+ IgniteEx cli = startClientGrid(11);
+
+ IgniteEx srv = startGrid(1);
+
+ CountDownLatch cliLefEvtProcessedByCoordinator = new CountDownLatch(1);
+
+ crd.events().localListen(
+ evt -> {
+ cliLefEvtProcessedByCoordinator.countDown();
+
+ return true;
+ },
+ EVT_NODE_LEFT
+ );
+
+ discoveryRingMessageWorkerQueue(srv).block();
+
+ long pollingTimeout = U.field(discoveryRingMessageWorker(srv),
"pollingTimeout");
+
+ // We need to wait for any active BlockingDeque#poll operation to
complete.
+ U.sleep(5 * pollingTimeout);
+
+ cli.context().discovery().sendCustomEvent(new TestDiscoveryMessage());
+
+ waitForCondition(() -> anyReceivedMessageMatch(srv, msg -> msg
instanceof TestDiscoveryMessage), getTestTimeout());
+
+ runAsync(() -> stopGrid(11));
+
+ cliLefEvtProcessedByCoordinator.await();
+
+ waitForCondition(() -> anyReceivedMessageMatch(srv, msg -> msg
instanceof TcpDiscoveryNodeLeftMessage), getTestTimeout());
+
+ runAsync(() -> startGrid(2));
+
+ waitForCondition(() -> anyReceivedMessageMatch(srv, msg ->
isDiscoveryNodeAddedMessage(msg, 2)), getTestTimeout());
+
+ runAsync(() -> startGrid(3));
+
+ waitForCondition(() -> anyReceivedMessageMatch(srv, msg ->
isDiscoveryNodeAddedMessage(msg, 3)), getTestTimeout());
+
+ discoveryRingMessageWorkerQueue(srv).unblock();
+
+ waitForCondition(
+ () -> grid(0).cluster().nodes().size() == 4
+ &&
TEST_MESSAGE_ACCEPTED_NODES.contains(grid(2).cluster().localNode().id()),
+ getTestTimeout());
+ }
+
+ /** */
+ private boolean isDiscoveryNodeAddedMessage(Object msg, int joiningNdeIdx)
{
+ return msg instanceof TcpDiscoveryNodeAddedMessage &&
+ F.eq(
+ getTestIgniteInstanceName(joiningNdeIdx),
+
((TcpDiscoveryNodeAddedMessage)msg).node().attribute(ATTR_IGNITE_INSTANCE_NAME)
+ );
+ }
+
+ /** */
+ private BlockingDequeWrapper<TcpDiscoveryAbstractMessage>
discoveryRingMessageWorkerQueue(IgniteEx ignite) {
+ return U.field(discoveryRingMessageWorker(ignite), "queue");
+ }
+
+ /** */
+ private boolean anyReceivedMessageMatch(IgniteEx ignite, Predicate<Object>
predicate) {
+ for (TcpDiscoveryAbstractMessage msg :
discoveryRingMessageWorkerQueue(ignite)) {
+ Object unwrappedMsg = msg;
+
+ if (msg instanceof TcpDiscoveryCustomEventMessage) {
+ DiscoverySpiCustomMessage customMsg = U.field(msg, "msg");
+
+ if (customMsg == null) {
+ try {
+ customMsg = U.unmarshal(
+
ignite.context().marshallerContext().jdkMarshaller(),
+ (byte[])U.field(msg, "msgBytes"),
+ U.resolveClassLoader(ignite.configuration())
+ );
+ }
+ catch (IgniteCheckedException e) {
+ fail(e.getMessage());
+ }
+ }
+
+ assert customMsg instanceof SecurityAwareCustomMessageWrapper;
+
+ unwrappedMsg = ((CustomMessageWrapper)customMsg).delegate();
+ }
+
+ if (predicate.test(unwrappedMsg))
+ return true;
+ }
+
+ return false;
+ }
+
+ /** */
+ private void wrapRingMessageWorkerQueue(IgniteEx ignite) throws Exception {
+ Object discoMsgWorker = discoveryRingMessageWorker(ignite);
+
+ BlockingDeque<TcpDiscoveryAbstractMessage> queue =
U.field(discoMsgWorker, "queue");
+
+ BlockingDequeWrapper<TcpDiscoveryAbstractMessage> wrapper = new
BlockingDequeWrapper<>(queue);
+
+ FieldUtils.writeField(discoMsgWorker, "queue", wrapper, true);
+ }
+
+ /** */
+ private Object discoveryRingMessageWorker(IgniteEx ignite) {
+ DiscoverySpi[] discoverySpis = U.field(ignite.context().discovery(),
"spis");
+
+ Object impl = U.field(discoverySpis[0], "impl");
+
+ return U.field(impl, "msgWorker");
+ }
+
+ /** */
+ public static class TestDiscoveryMessage extends
AbstractTestDiscoveryMessage {
+ /** {@inheritDoc} */
+ @Override public @Nullable DiscoveryCustomMessage ackMessage() {
+ return new TestDiscoveryAcknowledgeMessage();
+ }
+ }
+
+ /** */
+ public static class TestDiscoveryAcknowledgeMessage extends
AbstractTestDiscoveryMessage { }
+
+ /** */
+ public abstract static class AbstractTestDiscoveryMessage implements
DiscoveryCustomMessage {
+ /** {@inheritDoc} */
+ @Override public IgniteUuid id() {
+ return IgniteUuid.randomUuid();
+ }
+
+ /** {@inheritDoc} */
+ @Override public @Nullable DiscoveryCustomMessage ackMessage() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isMutable() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public DiscoCache createDiscoCache(
+ GridDiscoveryManager mgr,
+ AffinityTopologyVersion topVer,
+ DiscoCache discoCache
+ ) {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ /** */
+ public static class BlockingDequeWrapper<T> implements BlockingDeque<T> {
+ /** */
+ private volatile boolean isBlocked;
+
+ /** */
+ private final BlockingDeque<T> delegate;
+
+ /** */
+ public BlockingDequeWrapper(BlockingDeque<T> delegate) {
+ this.delegate = delegate;
+ }
+
+ /** */
+ public void block() {
+ isBlocked = true;
+ }
+
+ /** */
+ public void unblock() {
+ isBlocked = false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public T poll(long timeout, TimeUnit unit) throws
InterruptedException {
+ return isBlocked ? null : delegate.poll(timeout, unit);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int remainingCapacity() {
+ return delegate.remainingCapacity();
+ }
+
+ /** {@inheritDoc} */
+ @NotNull @Override public T element() {
+ return delegate.element();
+ }
+
+ /** {@inheritDoc} */
+ @Override public T peek() {
+ return delegate.peek();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean remove(Object o) {
+ return delegate.remove(o);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean containsAll(@NotNull Collection<?> c) {
+ return delegate.containsAll(c);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean addAll(@NotNull Collection<? extends T> c) {
+ return delegate.addAll(c);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean removeAll(@NotNull Collection<?> c) {
+ return delegate.removeAll(c);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean retainAll(@NotNull Collection<?> c) {
+ return delegate.retainAll(c);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void clear() {
+ delegate.clear();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void addFirst(T t) {
+ delegate.addFirst(t);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void addLast(@NotNull T t) {
+ delegate.addLast(t);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean offerFirst(@NotNull T t) {
+ return delegate.offerFirst(t);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean offerLast(@NotNull T t) {
+ return delegate.offerLast(t);
+ }
+
+ /** {@inheritDoc} */
+ @Override public T removeFirst() {
+ return delegate.removeFirst();
+ }
+
+ /** {@inheritDoc} */
+ @Override public T removeLast() {
+ return delegate.removeLast();
+ }
+
+ /** {@inheritDoc} */
+ @Override public T pollFirst() {
+ return delegate.pollFirst();
+ }
+
+ /** {@inheritDoc} */
+ @Override public T pollLast() {
+ return delegate.pollLast();
+ }
+
+ /** {@inheritDoc} */
+ @Override public T getFirst() {
+ return delegate.getFirst();
+ }
+
+ /** {@inheritDoc} */
+ @Override public T getLast() {
+ return delegate.getLast();
+ }
+
+ /** {@inheritDoc} */
+ @Override public T peekFirst() {
+ return delegate.peekFirst();
+ }
+
+ /** {@inheritDoc} */
+ @Override public T peekLast() {
+ return delegate.peekLast();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void putFirst(@NotNull T t) throws
InterruptedException {
+ delegate.putFirst(t);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void putLast(@NotNull T t) throws
InterruptedException {
+ delegate.putLast(t);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean offerFirst(@NotNull T t, long timeout,
TimeUnit unit) throws InterruptedException {
+ return delegate.offerFirst(t, timeout, unit);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean offerLast(@NotNull T t, long timeout,
TimeUnit unit) throws InterruptedException {
+ return delegate.offerLast(t, timeout, unit);
+ }
+
+ /** {@inheritDoc} */
+ @NotNull @Override public T takeFirst() throws InterruptedException {
+ return delegate.takeFirst();
+ }
+
+ /** {@inheritDoc} */
+ @NotNull @Override public T takeLast() throws InterruptedException {
+ return delegate.takeLast();
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public T pollFirst(long timeout, TimeUnit unit)
throws InterruptedException {
+ return delegate.pollFirst(timeout, unit);
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public T pollLast(long timeout, TimeUnit unit)
throws InterruptedException {
+ return delegate.pollLast(timeout, unit);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean removeFirstOccurrence(Object o) {
+ return delegate.removeFirstOccurrence(o);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean removeLastOccurrence(Object o) {
+ return delegate.removeLastOccurrence(o);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean add(T t) {
+ return delegate.add(t);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean offer(@NotNull T t) {
+ return delegate.offer(t);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void put(@NotNull T t) throws InterruptedException {
+ delegate.put(t);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean offer(@NotNull T t, long timeout, TimeUnit
unit) throws InterruptedException {
+ return delegate.offer(t, timeout, unit);
+ }
+
+ /** {@inheritDoc} */
+ @NotNull @Override public T remove() {
+ return delegate.remove();
+ }
+
+ /** {@inheritDoc} */
+ @Override public T poll() {
+ return delegate.poll();
+ }
+
+ /** {@inheritDoc} */
+ @NotNull @Override public T take() throws InterruptedException {
+ return delegate.take();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean contains(Object o) {
+ return delegate.contains(o);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int drainTo(@NotNull Collection<? super T> c) {
+ return delegate.drainTo(c);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int drainTo(@NotNull Collection<? super T> c, int
maxElements) {
+ return delegate.drainTo(c, maxElements);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int size() {
+ return delegate.size();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isEmpty() {
+ return delegate.isEmpty();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Iterator<T> iterator() {
+ return delegate.iterator();
+ }
+
+ /** {@inheritDoc} */
+ @NotNull @Override public Object[] toArray() {
+ return delegate.toArray();
+ }
+
+ /** {@inheritDoc} */
+ @NotNull @Override public <T1> T1[] toArray(@NotNull T1[] a) {
+ return delegate.toArray(a);
+ }
+
+ /** {@inheritDoc} */
+ @NotNull @Override public Iterator<T> descendingIterator() {
+ return delegate.descendingIterator();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void push(@NotNull T t) {
+ delegate.push(t);
+ }
+
+ /** {@inheritDoc} */
+ @Override public T pop() {
+ return delegate.pop();
+ }
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/client/ClientReconnectTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/client/ClientReconnectTest.java
index 5de6c1114ea..e65d0a05f78 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/client/ClientReconnectTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/client/ClientReconnectTest.java
@@ -47,11 +47,6 @@ public class ClientReconnectTest extends
GridCommonAbstractTest {
@Override protected GridSecurityProcessor
securityProcessor(GridKernalContext ctx) {
return new TestReconnectProcessor(ctx) {
@Override public SecurityContext securityContext(UUID
subjId) {
- if (ctx.localNodeId().equals(subjId))
- return ctx.security().securityContext();
-
- fail("Unexpected subjId[subjId=" + subjId + ",
localNodeId=" + ctx.localNodeId() + ']');
-
return null;
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/SecurityTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/SecurityTestSuite.java
index 9bd400f66d6..3c55f944cbc 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/SecurityTestSuite.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/SecurityTestSuite.java
@@ -19,6 +19,7 @@ package org.apache.ignite.testsuites;
import
org.apache.ignite.internal.processors.security.IgniteSecurityProcessorTest;
import org.apache.ignite.internal.processors.security.InvalidServerTest;
+import
org.apache.ignite.internal.processors.security.NodeSecurityContextPropagationTest;
import
org.apache.ignite.internal.processors.security.cache.CacheOperationPermissionCheckTest;
import
org.apache.ignite.internal.processors.security.cache.CacheOperationPermissionCreateDestroyCheckTest;
import
org.apache.ignite.internal.processors.security.cache.ContinuousQueryPermissionCheckTest;
@@ -134,7 +135,8 @@ import org.junit.runners.Suite;
MaintenanceModeNodeSecurityTest.class,
ServiceAuthorizationTest.class,
ServiceStaticConfigTest.class,
- ClusterNodeOperationPermissionTest.class
+ ClusterNodeOperationPermissionTest.class,
+ NodeSecurityContextPropagationTest.class
})
public class SecurityTestSuite {
/** */
diff --git
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index 6803af48da1..3b3b2546904 100644
---
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -513,7 +513,7 @@ public class ZookeeperDiscoveryImpl {
rtState.evtsData.topVer,
locNode,
rtState.top.topologySnapshot(),
- Collections.emptyMap(),
+ Collections.emptyNavigableMap(),
null,
null
)
@@ -586,7 +586,7 @@ public class ZookeeperDiscoveryImpl {
rtState.evtsData != null ? rtState.evtsData.topVer : 1L,
locNode,
nodes,
- Collections.emptyMap(),
+ Collections.emptyNavigableMap(),
null,
null
)
@@ -2367,7 +2367,7 @@ public class ZookeeperDiscoveryImpl {
1L,
locNode,
topSnapshot,
- Collections.emptyMap(),
+ Collections.emptyNavigableMap(),
null,
null
)
@@ -3075,7 +3075,7 @@ public class ZookeeperDiscoveryImpl {
joinedEvtData.topVer,
locNode,
topSnapshot,
- Collections.emptyMap(),
+ Collections.emptyNavigableMap(),
null,
null
)
@@ -3088,7 +3088,7 @@ public class ZookeeperDiscoveryImpl {
joinedEvtData.topVer,
locNode,
topSnapshot,
- Collections.emptyMap(),
+ Collections.emptyNavigableMap(),
null,
null
)
@@ -3547,7 +3547,7 @@ public class ZookeeperDiscoveryImpl {
evtData.topologyVersion(),
sndNode,
topSnapshot,
- Collections.emptyMap(),
+ Collections.emptyNavigableMap(),
msg,
null
)
@@ -3577,7 +3577,7 @@ public class ZookeeperDiscoveryImpl {
joinedEvtData.topVer,
joinedNode,
topSnapshot,
- Collections.emptyMap(),
+ Collections.emptyNavigableMap(),
null,
null
)
@@ -3627,7 +3627,7 @@ public class ZookeeperDiscoveryImpl {
topVer,
leftNode,
topSnapshot,
- Collections.emptyMap(),
+ Collections.emptyNavigableMap(),
null,
null
)