This is an automated email from the ASF dual-hosted git repository.

dgovorukhin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new c843b8f  IGNITE-5569 Fix TCP Discovery SPI allows multiple NODE_JOINED 
/ NODE_FAILED leading to a cluster DDoS - Fixes #6108.
c843b8f is described below

commit c843b8fe0ca624231d384d9f56a9635c336c91c4
Author: Sergey Chugunov <[email protected]>
AuthorDate: Fri Feb 15 00:46:37 2019 +0300

    IGNITE-5569 Fix TCP Discovery SPI allows multiple NODE_JOINED / NODE_FAILED 
leading to a cluster DDoS - Fixes #6108.
    
    Signed-off-by: Dmitriy Govorukhin <[email protected]>
---
 .../org/apache/ignite/IgniteSystemProperties.java  |   5 +
 .../ignite/spi/discovery/tcp/ServerImpl.java       |  97 +++++++++-
 .../ignite/spi/discovery/tcp/TcpDiscoveryImpl.java |   3 +
 .../ignite/spi/discovery/tcp/TcpDiscoverySpi.java  |  14 +-
 .../discovery/tcp/TcpDiscoveryFailedJoinTest.java  | 209 +++++++++++++++++++++
 .../IgniteSpiDiscoverySelfTestSuite.java           |   3 +
 6 files changed, 326 insertions(+), 5 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java 
b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index cd5f496..8438968 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -1117,6 +1117,11 @@ public final class IgniteSystemProperties {
     public static final String IGNITE_RECOVERY_SEMAPHORE_PERMITS = 
"IGNITE_RECOVERY_SEMAPHORE_PERMITS";
 
     /**
+     * Maximum size of history of server nodes (server node IDs) that ever 
joined to current topology.
+     */
+    public static final String IGNITE_NODE_IDS_HISTORY_SIZE = 
"IGNITE_NODE_IDS_HISTORY_SIZE";
+
+    /**
      * Enforces singleton.
      */
     private IgniteSystemProperties() {
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index e1577fb..d5149ee 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -152,6 +152,7 @@ import org.jetbrains.annotations.Nullable;
 
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_DISCOVERY_CLIENT_RECONNECT_HISTORY_SIZE;
+import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_NODE_IDS_HISTORY_SIZE;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED;
 import static org.apache.ignite.IgniteSystemProperties.getInteger;
@@ -278,6 +279,15 @@ class ServerImpl extends TcpDiscoveryImpl {
     private IgniteFuture<?> lastCustomEvtLsnrFut;
 
     /**
+     * Maximum size of history of IDs of server nodes ever tried to join 
current topology (ever sent join request).
+     */
+    private static final int JOINED_NODE_IDS_HISTORY_SIZE = 
getInteger(IGNITE_NODE_IDS_HISTORY_SIZE, 50);
+
+    /** History of all node UUIDs that current node has seen. */
+    private final GridBoundedLinkedHashSet<UUID> nodesIdsHist =
+        new GridBoundedLinkedHashSet<>(JOINED_NODE_IDS_HISTORY_SIZE);
+
+    /**
      * @param adapter Adapter.
      */
     ServerImpl(TcpDiscoverySpi adapter) {
@@ -1116,6 +1126,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                 return false;
 
             boolean retry = false;
+            boolean joinImpossible = false;
+
             Collection<Exception> errs = new ArrayList<>();
 
             for (InetSocketAddress addr : addrs) {
@@ -1154,6 +1166,11 @@ class ServerImpl extends TcpDiscoveryImpl {
                             // Join request sending succeeded, wait for 
response from topology.
                             return true;
 
+                        case RES_JOIN_IMPOSSIBLE:
+                            joinImpossible = true;
+
+                            break;
+
                         default:
                             // Concurrent startup, try next node.
                             if (res == RES_CONTINUE_JOIN) {
@@ -1185,6 +1202,13 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                     noResAddrs.add(addr);
                 }
+
+                if (joinImpossible)
+                    throw new IgniteSpiException("Impossible to continue join, 
" +
+                        "check if local discovery and communication ports " +
+                        "are not blocked with firewall [addr=" + addr +
+                        ", req=" + joinReq + ", discoLocalPort=" + 
spi.getLocalPort() +
+                        ", discoLocalPortRange=" + spi.getLocalPortRange() + 
']');
             }
 
             if (retry) {
@@ -1295,7 +1319,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 if (msg instanceof TcpDiscoveryJoinRequestMessage) {
                     boolean ignore = false;
 
-                    synchronized (failedNodes) {
+                    synchronized (mux) {
                         for (TcpDiscoveryNode failedNode : 
failedNodes.keySet()) {
                             if (failedNode.id().equals(res.creatorNodeId())) {
                                 if (log.isDebugEnabled())
@@ -3839,6 +3863,26 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                     return;
                 }
+                else {
+                    if (!node.isClient() && !node.isDaemon()) {
+                        if (nodesIdsHist.contains(node.id())) {
+                            try {
+                                trySendMessageDirectly(node, new 
TcpDiscoveryDuplicateIdMessage(locNodeId,
+                                    node));
+                            }
+                            catch (IgniteSpiException e) {
+                                if (log.isDebugEnabled())
+                                    log.debug("Failed to send duplicate ID 
message to node " +
+                                        "[node=" + node +
+                                        ", err=" + e.getMessage() + ']');
+
+                                onException("Failed to send duplicate ID 
message to node: " + node, e);
+                            }
+
+                            return;
+                        }
+                    }
+                }
 
                 if (spi.nodeAuth != null) {
                     // Authenticate node first.
@@ -4438,6 +4482,13 @@ class ServerImpl extends TcpDiscoveryImpl {
             }
 
             if (msg.verified() && !locNodeId.equals(node.id())) {
+                if (!node.isClient() && !node.isDaemon() && 
nodesIdsHist.contains(node.id())) {
+                    U.warn(log, "Discarding node added message since local 
node has already seen " +
+                        "joining node in topology [node=" + node + ", 
locNode=" + locNode + ", msg=" + msg + ']');
+
+                    return;
+                }
+
                 if (node.internalOrder() <= ring.maxInternalOrder()) {
                     if (log.isDebugEnabled())
                         log.debug("Discarding node added message since new 
node's order is less than " +
@@ -4784,9 +4835,13 @@ class ServerImpl extends TcpDiscoveryImpl {
                     lastMsg = msg;
                 }
 
-                if (state == CONNECTED)
+                if (state == CONNECTED) {
                     notifyDiscovery(EVT_NODE_JOINED, topVer, node);
 
+                    if (!node.isClient() && !node.isDaemon())
+                        nodesIdsHist.add(node.id());
+                }
+
                 try {
                     if (spi.ipFinder.isShared() && locNodeCoord && 
node.clientRouterNodeId() == null)
                         spi.ipFinder.registerAddresses(node.socketAddresses());
@@ -6915,6 +6970,17 @@ class ServerImpl extends TcpDiscoveryImpl {
                 spi.getSocketTimeout();
 
             if (state == CONNECTED) {
+                TcpDiscoveryNode node = msg.node();
+
+                // Check that joining node can accept incoming connections.
+                if (node.clientRouterNodeId() == null) {
+                    if (!pingJoiningNode(node)) {
+                        spi.writeToSocket(msg, sock, RES_JOIN_IMPOSSIBLE, 
sockTimeout);
+
+                        return false;
+                    }
+                }
+
                 spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
 
                 if (log.isDebugEnabled())
@@ -6965,6 +7031,33 @@ class ServerImpl extends TcpDiscoveryImpl {
             }
         }
 
+        /**
+         * @param node Remote node to ping.
+         *
+         * @return {@code True} if ping was successful and {@code false} if 
ping failed for any reason.
+         */
+        private boolean pingJoiningNode(TcpDiscoveryNode node) {
+            for (InetSocketAddress addr : spi.getNodeAddresses(node, false)) {
+                try {
+                    if (!(addr.getAddress().isLoopbackAddress() && 
locNode.socketAddresses().contains(addr))) {
+                        IgniteBiTuple<UUID, Boolean> t = pingNode(addr, 
node.id(), null);
+
+                        if (t != null)
+                            return true;
+                    }
+                }
+                catch (IgniteCheckedException e) {
+                    if (log.isDebugEnabled())
+                        log.debug("Failed to ping joining node, closing 
connection. [node=" + node +
+                            ", err=" + e.getMessage() + ']');
+                }
+            }
+
+            U.warn(log, "Failed to ping joining node, closing connection. 
[node=" + node + ']');
+
+            return false;
+        }
+
         /** {@inheritDoc} */
         @Override public void interrupt() {
             super.interrupt();
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index 917e340..7e5b7a2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -56,6 +56,9 @@ abstract class TcpDiscoveryImpl {
     /** Response WAIT. */
     protected static final int RES_WAIT = 200;
 
+    /** Response join impossible. */
+    protected static final int RES_JOIN_IMPOSSIBLE = 255;
+
     /** */
     protected final TcpDiscoverySpi spi;
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index c2c22b9..d6fbc1c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1956,9 +1956,17 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter 
implements IgniteDiscovery
     protected IgniteSpiException 
duplicateIdError(TcpDiscoveryDuplicateIdMessage msg) {
         assert msg != null;
 
-        return new IgniteSpiException("Local node has the same ID as existing 
node in topology " +
-            "(fix configuration and restart local node) [localNode=" + locNode 
+
-            ", existingNode=" + msg.node() + ']');
+        StringBuilder errorMsgBldr = new StringBuilder();
+        errorMsgBldr
+            .append("Node with the same ID was found in node IDs history ")
+            .append("or existing node in topology has the same ID ")
+            .append("(fix configuration and restart local node) [localNode=")
+            .append(locNode)
+            .append(", existingNode=")
+            .append(msg.node())
+            .append(']');
+
+        return new IgniteSpiException(errorMsgBldr.toString());
     }
 
     /**
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryFailedJoinTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryFailedJoinTest.java
new file mode 100644
index 0000000..ec3d970
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryFailedJoinTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
+import java.util.Collections;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.IgniteSpiOperationTimeoutException;
+import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Test;
+
+/**
+ * Test checks that if node cannot accept incoming connections it will be
+ * kicked off and stopped.
+ *
+ * Older versions will infinitely retry to connect, but this will not lead
+ * to node join and, as a consequence, to start exchange process.
+ */
+public class TcpDiscoveryFailedJoinTest extends GridCommonAbstractTest {
+    /** */
+    private static final int FAIL_PORT = 47503;
+
+    /** */
+    private SpiFailType failType = SpiFailType.REFUSE;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi discoSpi = failType == SpiFailType.REFUSE ? new 
FailTcpDiscoverySpi() : new DropTcpDiscoverySpi();
+
+        discoSpi.setLocalPort(Integer.parseInt(gridName.split("-")[1]));
+
+        TcpDiscoveryVmIpFinder finder = new TcpDiscoveryVmIpFinder(true);
+
+        finder.setAddresses(Collections.singleton("127.0.0.1:47500..47503"));
+
+        discoSpi.setIpFinder(finder);
+        discoSpi.setNetworkTimeout(2_000);
+
+        cfg.setDiscoverySpi(discoSpi);
+
+        if (gridName.contains("client")) {
+            cfg.setClientMode(true);
+
+            discoSpi.setForceServerMode(gridName.contains("server"));
+        }
+
+        return cfg;
+    }
+
+    /** */
+    @After
+    public void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testDiscoveryRefuse() throws Exception {
+        failType = SpiFailType.REFUSE;
+
+        startGrid("server-47500");
+        startGrid("server-47501");
+        startGrid("server-47502");
+
+        assertStartFailed("server-47503");
+
+        // Client in server mode.
+        assertStartFailed("client_server-47503");
+
+        // Regular client starts normally.
+        startGrid("client-47503");
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testDiscoveryDrop() throws Exception {
+        failType = SpiFailType.DROP;
+
+        startGrid("server-47500");
+        startGrid("server-47501");
+        startGrid("server-47502");
+
+        assertStartFailed("server-47503");
+
+        // Client in server mode.
+        assertStartFailed("client_server-47503");
+
+        // Regular client starts normally.
+        startGrid("client-47503");
+    }
+
+    /**
+     * @param name Name.
+     */
+    private void assertStartFailed(final String name) {
+        //noinspection ThrowableNotThrown
+        GridTestUtils.assertThrows(log, () -> {
+            startGrid(name);
+
+            return null;
+        }, IgniteCheckedException.class, null);
+    }
+
+    /**
+     *
+     */
+    private static class FailTcpDiscoverySpi extends TcpDiscoverySpi {
+        /** {@inheritDoc} */
+        @Override protected Socket openSocket(InetSocketAddress sockAddr,
+            IgniteSpiOperationTimeoutHelper timeoutHelper) throws IOException, 
IgniteSpiOperationTimeoutException {
+            if (sockAddr.getPort() == FAIL_PORT)
+                throw new SocketException("Connection refused");
+
+            return super.openSocket(sockAddr, timeoutHelper);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected Socket openSocket(Socket sock, InetSocketAddress 
remAddr,
+            IgniteSpiOperationTimeoutHelper timeoutHelper) throws IOException, 
IgniteSpiOperationTimeoutException {
+            if (remAddr.getPort() == FAIL_PORT)
+                throw new SocketException("Connection refused");
+
+            return super.openSocket(sock, remAddr, timeoutHelper);
+        }
+    }
+
+    /**
+     * Emulates situation when network drops packages.
+     */
+    private static class DropTcpDiscoverySpi extends TcpDiscoverySpi {
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(Socket sock, 
TcpDiscoveryAbstractMessage msg, byte[] data,
+            long timeout) throws IOException {
+            if (sock.getPort() != FAIL_PORT)
+                super.writeToSocket(sock, msg, data, timeout);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(Socket sock, 
TcpDiscoveryAbstractMessage msg,
+            long timeout) throws IOException, IgniteCheckedException {
+            if (sock.getPort() != FAIL_PORT)
+                super.writeToSocket(sock, msg, timeout);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(ClusterNode node, Socket sock, 
OutputStream out,
+            TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, 
IgniteCheckedException {
+            if (sock.getPort() != FAIL_PORT)
+                super.writeToSocket(node, sock, out, msg, timeout);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(Socket sock, OutputStream out, 
TcpDiscoveryAbstractMessage msg,
+            long timeout) throws IOException, IgniteCheckedException {
+            if (sock.getPort() != FAIL_PORT)
+                super.writeToSocket(sock, out, msg, timeout);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(TcpDiscoveryAbstractMessage 
msg, Socket sock, int res,
+            long timeout) throws IOException {
+            if (sock.getPort() != FAIL_PORT)
+                super.writeToSocket(msg, sock, res, timeout);
+        }
+    }
+
+    /**
+     *
+     */
+    private enum SpiFailType {
+        /** */
+        REFUSE,
+
+        /** */
+        DROP
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
index 73c5688..a17639e 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
@@ -36,6 +36,7 @@ import 
org.apache.ignite.spi.discovery.tcp.TcpClientDiscoverySpiMulticastTest;
 import org.apache.ignite.spi.discovery.tcp.TcpClientDiscoverySpiSelfTest;
 import 
org.apache.ignite.spi.discovery.tcp.TcpClientDiscoveryUnresolvedHostTest;
 import 
org.apache.ignite.spi.discovery.tcp.TcpDiscoveryClientSuspensionSelfTest;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryFailedJoinTest;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryCoordinatorFailureTest;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryIpFinderCleanerTest;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryMarshallerCheckSelfTest;
@@ -133,6 +134,8 @@ import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_OVERRIDE_MCAST_GRP
     IgniteClientReconnectMassiveShutdownSslTest.class,
     TcpDiscoveryClientSuspensionSelfTest.class,
 
+    TcpDiscoveryFailedJoinTest.class,
+
     // SSL.
     TcpDiscoverySslSelfTest.class,
     TcpDiscoverySslTrustedSelfTest.class,

Reply via email to