This is an automated email from the ASF dual-hosted git repository.

mpetrov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 17928fe8b7d IGNITE-19410 Fixed node crash due to SecurityContext not 
being found during discovery message processing. (#10701)
17928fe8b7d is described below

commit 17928fe8b7d8c02c29c2a8360f8a3ada479d6a2d
Author: Mikhail Petrov <[email protected]>
AuthorDate: Thu May 25 11:25:05 2023 +0300

    IGNITE-19410 Fixed node crash due to SecurityContext not being found during 
discovery message processing. (#10701)
---
 .../managers/discovery/GridDiscoveryManager.java   |  21 +-
 .../security/IgniteSecurityProcessor.java          |  40 +-
 .../spi/discovery/DiscoveryNotification.java       |   8 +-
 .../ignite/spi/discovery/tcp/ServerImpl.java       |   9 +-
 .../security/IgniteSecurityProcessorTest.java      |  77 ++-
 .../NodeSecurityContextPropagationTest.java        | 551 +++++++++++++++++++++
 .../security/client/ClientReconnectTest.java       |   5 -
 .../ignite/testsuites/SecurityTestSuite.java       |   4 +-
 .../zk/internal/ZookeeperDiscoveryImpl.java        |  16 +-
 9 files changed, 667 insertions(+), 64 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 2a733d942b8..b5dd27c0ee8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableMap;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.UUID;
@@ -258,7 +259,7 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
         new GridBoundedConcurrentLinkedHashMap<>(DISCOVERY_HISTORY_SIZE);
 
     /** Topology snapshots history. */
-    private volatile Map<Long, Collection<ClusterNode>> topHist = new 
HashMap<>();
+    private volatile NavigableMap<Long, Collection<ClusterNode>> topHist = 
Collections.emptyNavigableMap();
 
     /** Topology version. */
     private final AtomicReference<Snapshot> topSnap =
@@ -606,7 +607,7 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
                 final ClusterNode locNode = localNode();
 
                 if (notification.getTopHist() != null)
-                    topHist = notification.getTopHist();
+                    topHist = 
Collections.unmodifiableNavigableMap(notification.getTopHist());
 
                 boolean verChanged;
 
@@ -844,7 +845,7 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
                         assert rmvd != null : histVer;
                     }
 
-                    topHist.clear();
+                    topHist = Collections.emptyNavigableMap();
 
                     topSnap.set(new Snapshot(AffinityTopologyVersion.ZERO,
                         createDiscoCache(AffinityTopologyVersion.ZERO, 
ctx.state().clusterState(), locNode,
@@ -2671,11 +2672,25 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
      * @return resolved node, or <code>null</code> if node not found.
      */
     public ClusterNode historicalNode(UUID nodeId) {
+        long lastCheckedLocTopVer = Long.MAX_VALUE;
+
         for (DiscoCache discoCache : discoCacheHist.descendingValues()) {
             ClusterNode node = discoCache.node(nodeId);
 
             if (node != null)
                 return node;
+
+            lastCheckedLocTopVer = discoCache.version().topologyVersion();
+        }
+
+        // We did not find node with given ID in the discovery history of the 
local node. This means that the local
+        // node could join the cluster after the node with given ID left it. 
Let's check in the global topology history,
+        // which contains all topology versions since the cluster was started.
+        for (Collection<ClusterNode> top : 
topHist.headMap(lastCheckedLocTopVer, false).descendingMap().values()) {
+            for (ClusterNode node : top) {
+                if (F.eq(node.id(), nodeId))
+                    return node;
+            }
         }
 
         return null;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/IgniteSecurityProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/security/IgniteSecurityProcessor.java
index 11cd00b8cdf..0840426667f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/security/IgniteSecurityProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/security/IgniteSecurityProcessor.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.security;
 import java.security.Security;
 import java.util.Collection;
 import java.util.Map;
-import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -46,6 +45,7 @@ import org.apache.ignite.spi.IgniteNodeValidationResult;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag;
 import org.jetbrains.annotations.Nullable;
 
+import static java.util.Optional.ofNullable;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static 
org.apache.ignite.internal.processors.security.SecurityUtils.IGNITE_INTERNAL_PACKAGE;
@@ -148,21 +148,13 @@ public class IgniteSecurityProcessor extends 
IgniteSecurityAdapter {
     /** {@inheritDoc} */
     @Override public OperationSecurityContext withContext(UUID subjId) {
         try {
-            ClusterNode node = 
Optional.ofNullable(ctx.discovery().node(subjId))
-                .orElseGet(() -> ctx.discovery().historicalNode(subjId));
-
-            SecurityContext res;
-
-            if (node == null)
-                res = secPrc.securityContext(subjId);
-            else if (dfltSecCtx.subject().id().equals(subjId))
-                res = dfltSecCtx;
-            else
-                res = secCtxs.computeIfAbsent(subjId, uuid -> 
nodeSecurityContext(marsh, U.resolveClassLoader(ctx.config()), node));
+            SecurityContext res = secPrc.securityContext(subjId);
 
             if (res == null) {
-                throw new IllegalStateException("Failed to find security 
context " +
-                    "for subject with given ID : " + subjId);
+                res = findNodeSecurityContext(subjId);
+
+                if (res == null)
+                    throw new IllegalStateException("Failed to find security 
context for subject with given ID : " + subjId);
             }
 
             return withContext(res);
@@ -174,6 +166,26 @@ public class IgniteSecurityProcessor extends 
IgniteSecurityAdapter {
         }
     }
 
+    /**
+     * Looks for a node which ID is equal to the given Subject ID. If such a 
node exists, returns the Security Context
+     * that belongs to it. Otherwise {@code null}.
+     */
+    @Nullable private SecurityContext findNodeSecurityContext(UUID subjId) {
+        SecurityContext locNodeSecCtx = dfltSecCtx;
+
+        if (locNodeSecCtx.subject().id().equals(subjId))
+            return locNodeSecCtx;
+
+        ClusterNode node = 
ofNullable(ctx.discovery().node(subjId)).orElseGet(() -> 
ctx.discovery().historicalNode(subjId));
+
+        return node == null
+            ? null
+            : secCtxs.computeIfAbsent(
+                node.id(),
+                uuid -> nodeSecurityContext(marsh, 
U.resolveClassLoader(ctx.config()), node)
+            );
+    }
+
     /** Restores local node context for the current thread. */
     void restoreDefaultContext() {
         curSecCtx.set(null);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryNotification.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryNotification.java
index 938d512c77b..6dc38c1c60d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryNotification.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryNotification.java
@@ -17,7 +17,7 @@
 package org.apache.ignite.spi.discovery;
 
 import java.util.Collection;
-import java.util.Map;
+import java.util.NavigableMap;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.processors.tracing.messages.SpanContainer;
 import org.jetbrains.annotations.Nullable;
@@ -39,7 +39,7 @@ public class DiscoveryNotification {
     private final Collection<ClusterNode> topSnapshot;
 
     /** Topology history. */
-    private @Nullable Map<Long, Collection<ClusterNode>> topHist;
+    private @Nullable NavigableMap<Long, Collection<ClusterNode>> topHist;
 
     /** Custom message data. */
     private @Nullable DiscoverySpiCustomMessage customMsgData;
@@ -74,7 +74,7 @@ public class DiscoveryNotification {
         long topVer,
         ClusterNode node,
         Collection<ClusterNode> topSnapshot,
-        @Nullable Map<Long, Collection<ClusterNode>> topHist,
+        @Nullable NavigableMap<Long, Collection<ClusterNode>> topHist,
         @Nullable DiscoverySpiCustomMessage customMsgData,
         SpanContainer spanContainer
     ) {
@@ -118,7 +118,7 @@ public class DiscoveryNotification {
     /**
      * @return Topology history.
      */
-    public Map<Long, Collection<ClusterNode>> getTopHist() {
+    public NavigableMap<Long, Collection<ClusterNode>> getTopHist() {
         return topHist;
     }
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index d5ebca56245..8c85ee1819b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -43,6 +43,7 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableMap;
 import java.util.NoSuchElementException;
 import java.util.Queue;
 import java.util.Set;
@@ -625,7 +626,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                     topVer++;
 
-                    Map<Long, Collection<ClusterNode>> hist = 
updateTopologyHistory(topVer,
+                    NavigableMap<Long, Collection<ClusterNode>> hist = 
updateTopologyHistory(topVer,
                         Collections.unmodifiableList(top));
 
                     lsnr.onDiscovery(
@@ -1703,7 +1704,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             Collection<ClusterNode> top = upcast(ring.visibleNodes());
 
-            Map<Long, Collection<ClusterNode>> hist = 
updateTopologyHistory(topVer, top);
+            NavigableMap<Long, Collection<ClusterNode>> hist = 
updateTopologyHistory(topVer, top);
 
             lsnr.onDiscovery(
                 new DiscoveryNotification(type, topVer, node, top, hist, null, 
spanContainer)
@@ -1727,7 +1728,7 @@ class ServerImpl extends TcpDiscoveryImpl {
      * @param top Topology snapshot.
      * @return Copy of updated topology history.
      */
-    @Nullable private Map<Long, Collection<ClusterNode>> 
updateTopologyHistory(long topVer, Collection<ClusterNode> top) {
+    @Nullable private NavigableMap<Long, Collection<ClusterNode>> 
updateTopologyHistory(long topVer, Collection<ClusterNode> top) {
         synchronized (mux) {
             if (topHist.containsKey(topVer))
                 return null;
@@ -6367,7 +6368,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             TcpDiscoverySpiState spiState = spiStateCopy();
 
-            Map<Long, Collection<ClusterNode>> hist;
+            NavigableMap<Long, Collection<ClusterNode>> hist;
 
             synchronized (mux) {
                 hist = new TreeMap<>(topHist);
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/IgniteSecurityProcessorTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/IgniteSecurityProcessorTest.java
index 4491183ce0f..55b27645e4e 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/IgniteSecurityProcessorTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/IgniteSecurityProcessorTest.java
@@ -17,49 +17,76 @@
 
 package org.apache.ignite.internal.processors.security;
 
+import java.lang.reflect.Method;
 import java.util.UUID;
 import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.managers.GridManagerAdapter;
+import 
org.apache.ignite.internal.managers.communication.GridIoSecurityAwareMessage;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.ListeningTestLogger;
 import org.apache.ignite.testframework.LogListener;
 import org.junit.Test;
 
-import static 
org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
+import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL;
 
 /**
  * Unit test for {@link IgniteSecurityProcessor}.
  */
-public class IgniteSecurityProcessorTest {
-    /**
-     * Checks that {@link IgniteSecurityProcessor#withContext(UUID)} throws 
exception in case a node ID is unknown.
-     */
+public class IgniteSecurityProcessorTest extends AbstractSecurityTest {
+    /** */
+    private static ListeningTestLogger listeningLog;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(
+        String instanceName,
+        AbstractTestSecurityPluginProvider pluginProv
+    ) throws Exception {
+        return super.getConfiguration(instanceName, pluginProv)
+            .setGridLogger(listeningLog);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        listeningLog = new ListeningTestLogger();
+    }
+
+    /** Checks that {@link IgniteSecurityProcessor#withContext(UUID)} throws 
exception in case a node ID is unknown. */
     @Test
-    public void testThrowIllegalStateExceptionIfNodeNotFoundInDiscoCache() {
-        GridKernalContext ctx = mock(GridKernalContext.class);
-        when(ctx.config()).thenReturn(new IgniteConfiguration());
-        when(ctx.discovery()).thenReturn(mock(GridDiscoveryManager.class));
+    public void testThrowIllegalStateExceptionIfNodeNotFoundInDiscoCache() 
throws Exception {
+        IgniteEx srv = startGridAllowAll("srv");
 
-        LogListener logLsnr = LogListener
-            .matches(s -> s.contains("Failed to obtain a security context."))
-            .times(1)
-            .build();
+        IgniteEx cli = startClientAllowAll("cli");
 
-        ListeningTestLogger log = new ListeningTestLogger();
+        Method getSpiMethod = 
GridManagerAdapter.class.getDeclaredMethod("getSpi");
 
-        log.registerListener(logLsnr);
+        getSpiMethod.setAccessible(true);
 
-        when(ctx.log(IgniteSecurityProcessor.class)).thenReturn(log);
+        TcpCommunicationSpi spi = 
(TcpCommunicationSpi)getSpiMethod.invoke(cli.context().io());
 
-        GridSecurityProcessor secPrc = mock(GridSecurityProcessor.class);
+        LogListener logPattern = LogListener
+            .matches(s -> s.contains("Failed to obtain a security context."))
+            .times(1)
+            .build();
 
-        IgniteSecurityProcessor ignSecPrc = new IgniteSecurityProcessor(ctx, 
secPrc);
+        listeningLog.registerListener(logPattern);
 
-        assertThrowsWithCause(() -> ignSecPrc.withContext(UUID.randomUUID()), 
IllegalStateException.class);
+        spi.sendMessage(srv.localNode(), new GridIoSecurityAwareMessage(
+            UUID.randomUUID(),
+            PUBLIC_POOL,
+            TOPIC_CACHE,
+            TOPIC_CACHE.ordinal(),
+            new AffinityTopologyVersion(),
+            false,
+            0,
+            false
+        ));
 
-        assertTrue(logLsnr.check());
+        GridTestUtils.waitForCondition(logPattern::check, getTestTimeout());
     }
 }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/NodeSecurityContextPropagationTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/NodeSecurityContextPropagationTest.java
new file mode 100644
index 00000000000..5f681a564c5
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/NodeSecurityContextPropagationTest.java
@@ -0,0 +1,551 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.security;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.StopNodeOrHaltFailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.events.DiscoveryCustomEvent;
+import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import 
org.apache.ignite.internal.managers.discovery.SecurityAwareCustomMessageWrapper;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.spi.discovery.DiscoverySpi;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME;
+import static 
org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/** */
+public class NodeSecurityContextPropagationTest extends GridCommonAbstractTest 
{
+    /** */
+    private static final Collection<UUID> TEST_MESSAGE_ACCEPTED_NODES = new 
HashSet<>();
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName)
+            .setFailureHandler(new StopNodeOrHaltFailureHandler())
+            .setLocalEventListeners(
+                Collections.singletonMap(e -> {
+                    DiscoveryCustomEvent discoEvt = (DiscoveryCustomEvent)e;
+
+                    if (discoEvt.customMessage() instanceof 
TestDiscoveryAcknowledgeMessage)
+                        TEST_MESSAGE_ACCEPTED_NODES.add(discoEvt.node().id());
+
+                    return true;
+                }, new int[] {EVT_DISCOVERY_CUSTOM_EVT})
+            )
+            .setDataStorageConfiguration(new DataStorageConfiguration()
+                .setDefaultDataRegionConfiguration(new 
DataRegionConfiguration()
+                    .setPersistenceEnabled(true)
+                    .setMaxSize(100L * 1024 * 1024)))
+            .setAuthenticationEnabled(true);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi())
+            .setIpFinder(new TcpDiscoveryVmIpFinder()
+                .setAddresses(Collections.singleton("127.0.0.1:47500")));
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteEx startGrid(int idx) throws Exception {
+        IgniteEx ignite = super.startGrid(idx);
+
+        wrapRingMessageWorkerQueue(ignite);
+
+        return ignite;
+    }
+
+    /** */
+    @Test
+    public void testProcessCustomDiscoveryMessageFromLeftNode() throws 
Exception {
+        IgniteEx crd = startGrid(0);
+
+        IgniteEx cli = startClientGrid(11);
+
+        IgniteEx srv = startGrid(1);
+
+        CountDownLatch cliLefEvtProcessedByCoordinator = new CountDownLatch(1);
+
+        crd.events().localListen(
+            evt -> {
+                cliLefEvtProcessedByCoordinator.countDown();
+
+                return true;
+            },
+            EVT_NODE_LEFT
+        );
+
+        discoveryRingMessageWorkerQueue(srv).block();
+
+        long pollingTimeout = U.field(discoveryRingMessageWorker(srv), 
"pollingTimeout");
+
+        // We need to wait for any active BlockingDeque#poll operation to 
complete.
+        U.sleep(5 * pollingTimeout);
+
+        cli.context().discovery().sendCustomEvent(new TestDiscoveryMessage());
+
+        waitForCondition(() -> anyReceivedMessageMatch(srv, msg -> msg 
instanceof TestDiscoveryMessage), getTestTimeout());
+
+        runAsync(() -> stopGrid(11));
+
+        cliLefEvtProcessedByCoordinator.await();
+
+        waitForCondition(() -> anyReceivedMessageMatch(srv, msg -> msg 
instanceof TcpDiscoveryNodeLeftMessage), getTestTimeout());
+
+        runAsync(() -> startGrid(2));
+
+        waitForCondition(() -> anyReceivedMessageMatch(srv, msg -> 
isDiscoveryNodeAddedMessage(msg, 2)), getTestTimeout());
+
+        runAsync(() -> startGrid(3));
+
+        waitForCondition(() -> anyReceivedMessageMatch(srv, msg -> 
isDiscoveryNodeAddedMessage(msg, 3)), getTestTimeout());
+
+        discoveryRingMessageWorkerQueue(srv).unblock();
+
+        waitForCondition(
+            () -> grid(0).cluster().nodes().size() == 4
+                && 
TEST_MESSAGE_ACCEPTED_NODES.contains(grid(2).cluster().localNode().id()),
+            getTestTimeout());
+    }
+
+    /** */
+    private boolean isDiscoveryNodeAddedMessage(Object msg, int joiningNdeIdx) 
{
+        return msg instanceof TcpDiscoveryNodeAddedMessage &&
+            F.eq(
+                getTestIgniteInstanceName(joiningNdeIdx),
+                
((TcpDiscoveryNodeAddedMessage)msg).node().attribute(ATTR_IGNITE_INSTANCE_NAME)
+            );
+    }
+
+    /** */
+    private BlockingDequeWrapper<TcpDiscoveryAbstractMessage> 
discoveryRingMessageWorkerQueue(IgniteEx ignite) {
+        return U.field(discoveryRingMessageWorker(ignite), "queue");
+    }
+
+    /** */
+    private boolean anyReceivedMessageMatch(IgniteEx ignite, Predicate<Object> 
predicate) {
+        for (TcpDiscoveryAbstractMessage msg : 
discoveryRingMessageWorkerQueue(ignite)) {
+            Object unwrappedMsg = msg;
+
+            if (msg instanceof TcpDiscoveryCustomEventMessage) {
+                DiscoverySpiCustomMessage customMsg = U.field(msg, "msg");
+
+                if (customMsg == null) {
+                    try {
+                        customMsg = U.unmarshal(
+                            
ignite.context().marshallerContext().jdkMarshaller(),
+                            (byte[])U.field(msg, "msgBytes"),
+                            U.resolveClassLoader(ignite.configuration())
+                        );
+                    }
+                    catch (IgniteCheckedException e) {
+                        fail(e.getMessage());
+                    }
+                }
+
+                assert customMsg instanceof SecurityAwareCustomMessageWrapper;
+
+                unwrappedMsg = ((CustomMessageWrapper)customMsg).delegate();
+            }
+
+            if (predicate.test(unwrappedMsg))
+                return true;
+        }
+
+        return false;
+    }
+
+    /** */
+    private void wrapRingMessageWorkerQueue(IgniteEx ignite) throws Exception {
+        Object discoMsgWorker = discoveryRingMessageWorker(ignite);
+
+        BlockingDeque<TcpDiscoveryAbstractMessage> queue = 
U.field(discoMsgWorker, "queue");
+
+        BlockingDequeWrapper<TcpDiscoveryAbstractMessage> wrapper = new 
BlockingDequeWrapper<>(queue);
+
+        FieldUtils.writeField(discoMsgWorker, "queue", wrapper, true);
+    }
+
+    /** */
+    private Object discoveryRingMessageWorker(IgniteEx ignite) {
+        DiscoverySpi[] discoverySpis = U.field(ignite.context().discovery(), 
"spis");
+
+        Object impl = U.field(discoverySpis[0], "impl");
+
+        return U.field(impl, "msgWorker");
+    }
+
+    /** */
+    public static class TestDiscoveryMessage extends 
AbstractTestDiscoveryMessage {
+        /** {@inheritDoc} */
+        @Override public @Nullable DiscoveryCustomMessage ackMessage() {
+            return new TestDiscoveryAcknowledgeMessage();
+        }
+    }
+
+    /** */
+    public static class TestDiscoveryAcknowledgeMessage extends 
AbstractTestDiscoveryMessage { }
+
+    /** */
+    public abstract static class AbstractTestDiscoveryMessage implements 
DiscoveryCustomMessage {
+        /** {@inheritDoc} */
+        @Override public IgniteUuid id() {
+            return IgniteUuid.randomUuid();
+        }
+
+        /** {@inheritDoc} */
+        @Override public @Nullable DiscoveryCustomMessage ackMessage() {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isMutable() {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public DiscoCache createDiscoCache(
+            GridDiscoveryManager mgr,
+            AffinityTopologyVersion topVer,
+            DiscoCache discoCache
+        ) {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    /** */
+    public static class BlockingDequeWrapper<T> implements BlockingDeque<T> {
+        /** */
+        private volatile boolean isBlocked;
+
+        /** */
+        private final BlockingDeque<T> delegate;
+
+        /** */
+        public BlockingDequeWrapper(BlockingDeque<T> delegate) {
+            this.delegate = delegate;
+        }
+
+        /** */
+        public void block() {
+            isBlocked = true;
+        }
+
+        /** */
+        public void unblock() {
+            isBlocked = false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public T poll(long timeout, TimeUnit unit) throws 
InterruptedException {
+            return isBlocked ? null : delegate.poll(timeout, unit);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int remainingCapacity() {
+            return delegate.remainingCapacity();
+        }
+
+        /** {@inheritDoc} */
+        @NotNull @Override public T element() {
+            return delegate.element();
+        }
+
+        /** {@inheritDoc} */
+        @Override public T peek() {
+            return delegate.peek();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean remove(Object o) {
+            return delegate.remove(o);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean containsAll(@NotNull Collection<?> c) {
+            return delegate.containsAll(c);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean addAll(@NotNull Collection<? extends T> c) {
+            return delegate.addAll(c);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean removeAll(@NotNull Collection<?> c) {
+            return delegate.removeAll(c);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean retainAll(@NotNull Collection<?> c) {
+            return delegate.retainAll(c);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void clear() {
+            delegate.clear();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void addFirst(T t) {
+            delegate.addFirst(t);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void addLast(@NotNull T t) {
+            delegate.addLast(t);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean offerFirst(@NotNull T t) {
+            return delegate.offerFirst(t);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean offerLast(@NotNull T t) {
+            return delegate.offerLast(t);
+        }
+
+        /** {@inheritDoc} */
+        @Override public T removeFirst() {
+            return delegate.removeFirst();
+        }
+
+        /** {@inheritDoc} */
+        @Override public T removeLast() {
+            return delegate.removeLast();
+        }
+
+        /** {@inheritDoc} */
+        @Override public T pollFirst() {
+            return delegate.pollFirst();
+        }
+
+        /** {@inheritDoc} */
+        @Override public T pollLast() {
+            return delegate.pollLast();
+        }
+
+        /** {@inheritDoc} */
+        @Override public T getFirst() {
+            return delegate.getFirst();
+        }
+
+        /** {@inheritDoc} */
+        @Override public T getLast() {
+            return delegate.getLast();
+        }
+
+        /** {@inheritDoc} */
+        @Override public T peekFirst() {
+            return delegate.peekFirst();
+        }
+
+        /** {@inheritDoc} */
+        @Override public T peekLast() {
+            return delegate.peekLast();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void putFirst(@NotNull T t) throws 
InterruptedException {
+            delegate.putFirst(t);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void putLast(@NotNull T t) throws 
InterruptedException {
+            delegate.putLast(t);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean offerFirst(@NotNull T t, long timeout, 
TimeUnit unit) throws InterruptedException {
+            return delegate.offerFirst(t, timeout, unit);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean offerLast(@NotNull T t, long timeout, 
TimeUnit unit) throws InterruptedException {
+            return delegate.offerLast(t, timeout, unit);
+        }
+
+        /** {@inheritDoc} */
+        @NotNull @Override public T takeFirst() throws InterruptedException {
+            return delegate.takeFirst();
+        }
+
+        /** {@inheritDoc} */
+        @NotNull @Override public T takeLast() throws InterruptedException {
+            return delegate.takeLast();
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public T pollFirst(long timeout, TimeUnit unit) 
throws InterruptedException {
+            return delegate.pollFirst(timeout, unit);
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public T pollLast(long timeout, TimeUnit unit) 
throws InterruptedException {
+            return delegate.pollLast(timeout, unit);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean removeFirstOccurrence(Object o) {
+            return delegate.removeFirstOccurrence(o);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean removeLastOccurrence(Object o) {
+            return delegate.removeLastOccurrence(o);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean add(T t) {
+            return delegate.add(t);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean offer(@NotNull T t) {
+            return delegate.offer(t);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void put(@NotNull T t) throws InterruptedException {
+            delegate.put(t);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean offer(@NotNull T t, long timeout, TimeUnit 
unit) throws InterruptedException {
+            return delegate.offer(t, timeout, unit);
+        }
+
+        /** {@inheritDoc} */
+        @NotNull @Override public T remove() {
+            return delegate.remove();
+        }
+
+        /** {@inheritDoc} */
+        @Override public T poll() {
+            return delegate.poll();
+        }
+
+        /** {@inheritDoc} */
+        @NotNull @Override public T take() throws InterruptedException {
+            return delegate.take();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean contains(Object o) {
+            return delegate.contains(o);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int drainTo(@NotNull Collection<? super T> c) {
+            return delegate.drainTo(c);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int drainTo(@NotNull Collection<? super T> c, int 
maxElements) {
+            return delegate.drainTo(c, maxElements);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int size() {
+            return delegate.size();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isEmpty() {
+            return delegate.isEmpty();
+        }
+
+        /** {@inheritDoc} */
+        @Override public Iterator<T> iterator() {
+            return delegate.iterator();
+        }
+
+        /** {@inheritDoc} */
+        @NotNull @Override public Object[] toArray() {
+            return delegate.toArray();
+        }
+
+        /** {@inheritDoc} */
+        @NotNull @Override public <T1> T1[] toArray(@NotNull T1[] a) {
+            return delegate.toArray(a);
+        }
+
+        /** {@inheritDoc} */
+        @NotNull @Override public Iterator<T> descendingIterator() {
+            return delegate.descendingIterator();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void push(@NotNull T t) {
+            delegate.push(t);
+        }
+
+        /** {@inheritDoc} */
+        @Override public T pop() {
+            return delegate.pop();
+        }
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/client/ClientReconnectTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/client/ClientReconnectTest.java
index 5de6c1114ea..e65d0a05f78 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/client/ClientReconnectTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/client/ClientReconnectTest.java
@@ -47,11 +47,6 @@ public class ClientReconnectTest extends 
GridCommonAbstractTest {
             @Override protected GridSecurityProcessor 
securityProcessor(GridKernalContext ctx) {
                 return new TestReconnectProcessor(ctx) {
                     @Override public SecurityContext securityContext(UUID 
subjId) {
-                        if (ctx.localNodeId().equals(subjId))
-                            return ctx.security().securityContext();
-
-                        fail("Unexpected subjId[subjId=" + subjId + ", 
localNodeId=" + ctx.localNodeId() + ']');
-
                         return null;
                     }
 
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/SecurityTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/SecurityTestSuite.java
index 9bd400f66d6..3c55f944cbc 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/SecurityTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/SecurityTestSuite.java
@@ -19,6 +19,7 @@ package org.apache.ignite.testsuites;
 
 import 
org.apache.ignite.internal.processors.security.IgniteSecurityProcessorTest;
 import org.apache.ignite.internal.processors.security.InvalidServerTest;
+import 
org.apache.ignite.internal.processors.security.NodeSecurityContextPropagationTest;
 import 
org.apache.ignite.internal.processors.security.cache.CacheOperationPermissionCheckTest;
 import 
org.apache.ignite.internal.processors.security.cache.CacheOperationPermissionCreateDestroyCheckTest;
 import 
org.apache.ignite.internal.processors.security.cache.ContinuousQueryPermissionCheckTest;
@@ -134,7 +135,8 @@ import org.junit.runners.Suite;
     MaintenanceModeNodeSecurityTest.class,
     ServiceAuthorizationTest.class,
     ServiceStaticConfigTest.class,
-    ClusterNodeOperationPermissionTest.class
+    ClusterNodeOperationPermissionTest.class,
+    NodeSecurityContextPropagationTest.class
 })
 public class SecurityTestSuite {
     /** */
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index 6803af48da1..3b3b2546904 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -513,7 +513,7 @@ public class ZookeeperDiscoveryImpl {
                     rtState.evtsData.topVer,
                     locNode,
                     rtState.top.topologySnapshot(),
-                    Collections.emptyMap(),
+                    Collections.emptyNavigableMap(),
                     null,
                     null
                 )
@@ -586,7 +586,7 @@ public class ZookeeperDiscoveryImpl {
                 rtState.evtsData != null ? rtState.evtsData.topVer : 1L,
                 locNode,
                 nodes,
-                Collections.emptyMap(),
+                Collections.emptyNavigableMap(),
                 null,
                 null
             )
@@ -2367,7 +2367,7 @@ public class ZookeeperDiscoveryImpl {
                     1L,
                     locNode,
                     topSnapshot,
-                    Collections.emptyMap(),
+                    Collections.emptyNavigableMap(),
                     null,
                     null
                 )
@@ -3075,7 +3075,7 @@ public class ZookeeperDiscoveryImpl {
                     joinedEvtData.topVer,
                     locNode,
                     topSnapshot,
-                    Collections.emptyMap(),
+                    Collections.emptyNavigableMap(),
                     null,
                     null
                 )
@@ -3088,7 +3088,7 @@ public class ZookeeperDiscoveryImpl {
                         joinedEvtData.topVer,
                         locNode,
                         topSnapshot,
-                        Collections.emptyMap(),
+                        Collections.emptyNavigableMap(),
                         null,
                         null
                     )
@@ -3547,7 +3547,7 @@ public class ZookeeperDiscoveryImpl {
                 evtData.topologyVersion(),
                 sndNode,
                 topSnapshot,
-                Collections.emptyMap(),
+                Collections.emptyNavigableMap(),
                 msg,
                 null
             )
@@ -3577,7 +3577,7 @@ public class ZookeeperDiscoveryImpl {
                 joinedEvtData.topVer,
                 joinedNode,
                 topSnapshot,
-                Collections.emptyMap(),
+                Collections.emptyNavigableMap(),
                 null,
                 null
             )
@@ -3627,7 +3627,7 @@ public class ZookeeperDiscoveryImpl {
                 topVer,
                 leftNode,
                 topSnapshot,
-                Collections.emptyMap(),
+                Collections.emptyNavigableMap(),
                 null,
                 null
             )


Reply via email to