This is an automated email from the ASF dual-hosted git repository.
dgovorukhin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new c843b8f IGNITE-5569 Fix TCP Discovery SPI allows multiple NODE_JOINED
/ NODE_FAILED leading to a cluster DDoS - Fixes #6108.
c843b8f is described below
commit c843b8fe0ca624231d384d9f56a9635c336c91c4
Author: Sergey Chugunov <[email protected]>
AuthorDate: Fri Feb 15 00:46:37 2019 +0300
IGNITE-5569 Fix TCP Discovery SPI allows multiple NODE_JOINED / NODE_FAILED
leading to a cluster DDoS - Fixes #6108.
Signed-off-by: Dmitriy Govorukhin <[email protected]>
---
.../org/apache/ignite/IgniteSystemProperties.java | 5 +
.../ignite/spi/discovery/tcp/ServerImpl.java | 97 +++++++++-
.../ignite/spi/discovery/tcp/TcpDiscoveryImpl.java | 3 +
.../ignite/spi/discovery/tcp/TcpDiscoverySpi.java | 14 +-
.../discovery/tcp/TcpDiscoveryFailedJoinTest.java | 209 +++++++++++++++++++++
.../IgniteSpiDiscoverySelfTestSuite.java | 3 +
6 files changed, 326 insertions(+), 5 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index cd5f496..8438968 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -1117,6 +1117,11 @@ public final class IgniteSystemProperties {
public static final String IGNITE_RECOVERY_SEMAPHORE_PERMITS =
"IGNITE_RECOVERY_SEMAPHORE_PERMITS";
/**
+ * Maximum size of history of server nodes (server node IDs) that ever
joined to current topology.
+ */
+ public static final String IGNITE_NODE_IDS_HISTORY_SIZE =
"IGNITE_NODE_IDS_HISTORY_SIZE";
+
+ /**
* Enforces singleton.
*/
private IgniteSystemProperties() {
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index e1577fb..d5149ee 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -152,6 +152,7 @@ import org.jetbrains.annotations.Nullable;
import static
org.apache.ignite.IgniteSystemProperties.IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2;
import static
org.apache.ignite.IgniteSystemProperties.IGNITE_DISCOVERY_CLIENT_RECONNECT_HISTORY_SIZE;
+import static
org.apache.ignite.IgniteSystemProperties.IGNITE_NODE_IDS_HISTORY_SIZE;
import static
org.apache.ignite.IgniteSystemProperties.IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID;
import static
org.apache.ignite.IgniteSystemProperties.IGNITE_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED;
import static org.apache.ignite.IgniteSystemProperties.getInteger;
@@ -278,6 +279,15 @@ class ServerImpl extends TcpDiscoveryImpl {
private IgniteFuture<?> lastCustomEvtLsnrFut;
/**
+ * Maximum size of history of IDs of server nodes ever tried to join
current topology (ever sent join request).
+ */
+ private static final int JOINED_NODE_IDS_HISTORY_SIZE =
getInteger(IGNITE_NODE_IDS_HISTORY_SIZE, 50);
+
+ /** History of all node UUIDs that current node has seen. */
+ private final GridBoundedLinkedHashSet<UUID> nodesIdsHist =
+ new GridBoundedLinkedHashSet<>(JOINED_NODE_IDS_HISTORY_SIZE);
+
+ /**
* @param adapter Adapter.
*/
ServerImpl(TcpDiscoverySpi adapter) {
@@ -1116,6 +1126,8 @@ class ServerImpl extends TcpDiscoveryImpl {
return false;
boolean retry = false;
+ boolean joinImpossible = false;
+
Collection<Exception> errs = new ArrayList<>();
for (InetSocketAddress addr : addrs) {
@@ -1154,6 +1166,11 @@ class ServerImpl extends TcpDiscoveryImpl {
// Join request sending succeeded, wait for
response from topology.
return true;
+ case RES_JOIN_IMPOSSIBLE:
+ joinImpossible = true;
+
+ break;
+
default:
// Concurrent startup, try next node.
if (res == RES_CONTINUE_JOIN) {
@@ -1185,6 +1202,13 @@ class ServerImpl extends TcpDiscoveryImpl {
noResAddrs.add(addr);
}
+
+ if (joinImpossible)
+ throw new IgniteSpiException("Impossible to continue join,
" +
+ "check if local discovery and communication ports " +
+ "are not blocked with firewall [addr=" + addr +
+ ", req=" + joinReq + ", discoLocalPort=" +
spi.getLocalPort() +
+ ", discoLocalPortRange=" + spi.getLocalPortRange() +
']');
}
if (retry) {
@@ -1295,7 +1319,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (msg instanceof TcpDiscoveryJoinRequestMessage) {
boolean ignore = false;
- synchronized (failedNodes) {
+ synchronized (mux) {
for (TcpDiscoveryNode failedNode :
failedNodes.keySet()) {
if (failedNode.id().equals(res.creatorNodeId())) {
if (log.isDebugEnabled())
@@ -3839,6 +3863,26 @@ class ServerImpl extends TcpDiscoveryImpl {
return;
}
+ else {
+ if (!node.isClient() && !node.isDaemon()) {
+ if (nodesIdsHist.contains(node.id())) {
+ try {
+ trySendMessageDirectly(node, new
TcpDiscoveryDuplicateIdMessage(locNodeId,
+ node));
+ }
+ catch (IgniteSpiException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send duplicate ID
message to node " +
+ "[node=" + node +
+ ", err=" + e.getMessage() + ']');
+
+ onException("Failed to send duplicate ID
message to node: " + node, e);
+ }
+
+ return;
+ }
+ }
+ }
if (spi.nodeAuth != null) {
// Authenticate node first.
@@ -4438,6 +4482,13 @@ class ServerImpl extends TcpDiscoveryImpl {
}
if (msg.verified() && !locNodeId.equals(node.id())) {
+ if (!node.isClient() && !node.isDaemon() &&
nodesIdsHist.contains(node.id())) {
+ U.warn(log, "Discarding node added message since local
node has already seen " +
+ "joining node in topology [node=" + node + ",
locNode=" + locNode + ", msg=" + msg + ']');
+
+ return;
+ }
+
if (node.internalOrder() <= ring.maxInternalOrder()) {
if (log.isDebugEnabled())
log.debug("Discarding node added message since new
node's order is less than " +
@@ -4784,9 +4835,13 @@ class ServerImpl extends TcpDiscoveryImpl {
lastMsg = msg;
}
- if (state == CONNECTED)
+ if (state == CONNECTED) {
notifyDiscovery(EVT_NODE_JOINED, topVer, node);
+ if (!node.isClient() && !node.isDaemon())
+ nodesIdsHist.add(node.id());
+ }
+
try {
if (spi.ipFinder.isShared() && locNodeCoord &&
node.clientRouterNodeId() == null)
spi.ipFinder.registerAddresses(node.socketAddresses());
@@ -6915,6 +6970,17 @@ class ServerImpl extends TcpDiscoveryImpl {
spi.getSocketTimeout();
if (state == CONNECTED) {
+ TcpDiscoveryNode node = msg.node();
+
+ // Check that joining node can accept incoming connections.
+ if (node.clientRouterNodeId() == null) {
+ if (!pingJoiningNode(node)) {
+ spi.writeToSocket(msg, sock, RES_JOIN_IMPOSSIBLE,
sockTimeout);
+
+ return false;
+ }
+ }
+
spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
if (log.isDebugEnabled())
@@ -6965,6 +7031,33 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
+ /**
+ * @param node Remote node to ping.
+ *
+ * @return {@code True} if ping was successful and {@code false} if
ping failed for any reason.
+ */
+ private boolean pingJoiningNode(TcpDiscoveryNode node) {
+ for (InetSocketAddress addr : spi.getNodeAddresses(node, false)) {
+ try {
+ if (!(addr.getAddress().isLoopbackAddress() &&
locNode.socketAddresses().contains(addr))) {
+ IgniteBiTuple<UUID, Boolean> t = pingNode(addr,
node.id(), null);
+
+ if (t != null)
+ return true;
+ }
+ }
+ catch (IgniteCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to ping joining node, closing
connection. [node=" + node +
+ ", err=" + e.getMessage() + ']');
+ }
+ }
+
+ U.warn(log, "Failed to ping joining node, closing connection.
[node=" + node + ']');
+
+ return false;
+ }
+
/** {@inheritDoc} */
@Override public void interrupt() {
super.interrupt();
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index 917e340..7e5b7a2 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -56,6 +56,9 @@ abstract class TcpDiscoveryImpl {
/** Response WAIT. */
protected static final int RES_WAIT = 200;
+ /** Response join impossible. */
+ protected static final int RES_JOIN_IMPOSSIBLE = 255;
+
/** */
protected final TcpDiscoverySpi spi;
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index c2c22b9..d6fbc1c 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1956,9 +1956,17 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter
implements IgniteDiscovery
protected IgniteSpiException
duplicateIdError(TcpDiscoveryDuplicateIdMessage msg) {
assert msg != null;
- return new IgniteSpiException("Local node has the same ID as existing
node in topology " +
- "(fix configuration and restart local node) [localNode=" + locNode
+
- ", existingNode=" + msg.node() + ']');
+ StringBuilder errorMsgBldr = new StringBuilder();
+ errorMsgBldr
+ .append("Node with the same ID was found in node IDs history ")
+ .append("or existing node in topology has the same ID ")
+ .append("(fix configuration and restart local node) [localNode=")
+ .append(locNode)
+ .append(", existingNode=")
+ .append(msg.node())
+ .append(']');
+
+ return new IgniteSpiException(errorMsgBldr.toString());
}
/**
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryFailedJoinTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryFailedJoinTest.java
new file mode 100644
index 0000000..ec3d970
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryFailedJoinTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
+import java.util.Collections;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.IgniteSpiOperationTimeoutException;
+import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Test;
+
+/**
+ * Test checks that if node cannot accept incoming connections it will be
+ * kicked off and stopped.
+ *
+ * Older versions will infinitely retry to connect, but this will not lead
+ * to node join and, as a consequence, to start exchange process.
+ */
+public class TcpDiscoveryFailedJoinTest extends GridCommonAbstractTest {
+ /** */
+ private static final int FAIL_PORT = 47503;
+
+ /** */
+ private SpiFailType failType = SpiFailType.REFUSE;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName)
throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ TcpDiscoverySpi discoSpi = failType == SpiFailType.REFUSE ? new
FailTcpDiscoverySpi() : new DropTcpDiscoverySpi();
+
+ discoSpi.setLocalPort(Integer.parseInt(gridName.split("-")[1]));
+
+ TcpDiscoveryVmIpFinder finder = new TcpDiscoveryVmIpFinder(true);
+
+ finder.setAddresses(Collections.singleton("127.0.0.1:47500..47503"));
+
+ discoSpi.setIpFinder(finder);
+ discoSpi.setNetworkTimeout(2_000);
+
+ cfg.setDiscoverySpi(discoSpi);
+
+ if (gridName.contains("client")) {
+ cfg.setClientMode(true);
+
+ discoSpi.setForceServerMode(gridName.contains("server"));
+ }
+
+ return cfg;
+ }
+
+ /** */
+ @After
+ public void afterTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testDiscoveryRefuse() throws Exception {
+ failType = SpiFailType.REFUSE;
+
+ startGrid("server-47500");
+ startGrid("server-47501");
+ startGrid("server-47502");
+
+ assertStartFailed("server-47503");
+
+ // Client in server mode.
+ assertStartFailed("client_server-47503");
+
+ // Regular client starts normally.
+ startGrid("client-47503");
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testDiscoveryDrop() throws Exception {
+ failType = SpiFailType.DROP;
+
+ startGrid("server-47500");
+ startGrid("server-47501");
+ startGrid("server-47502");
+
+ assertStartFailed("server-47503");
+
+ // Client in server mode.
+ assertStartFailed("client_server-47503");
+
+ // Regular client starts normally.
+ startGrid("client-47503");
+ }
+
+ /**
+ * @param name Name.
+ */
+ private void assertStartFailed(final String name) {
+ //noinspection ThrowableNotThrown
+ GridTestUtils.assertThrows(log, () -> {
+ startGrid(name);
+
+ return null;
+ }, IgniteCheckedException.class, null);
+ }
+
+ /**
+ *
+ */
+ private static class FailTcpDiscoverySpi extends TcpDiscoverySpi {
+ /** {@inheritDoc} */
+ @Override protected Socket openSocket(InetSocketAddress sockAddr,
+ IgniteSpiOperationTimeoutHelper timeoutHelper) throws IOException,
IgniteSpiOperationTimeoutException {
+ if (sockAddr.getPort() == FAIL_PORT)
+ throw new SocketException("Connection refused");
+
+ return super.openSocket(sockAddr, timeoutHelper);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Socket openSocket(Socket sock, InetSocketAddress
remAddr,
+ IgniteSpiOperationTimeoutHelper timeoutHelper) throws IOException,
IgniteSpiOperationTimeoutException {
+ if (remAddr.getPort() == FAIL_PORT)
+ throw new SocketException("Connection refused");
+
+ return super.openSocket(sock, remAddr, timeoutHelper);
+ }
+ }
+
+ /**
+ * Emulates situation when network drops packages.
+ */
+ private static class DropTcpDiscoverySpi extends TcpDiscoverySpi {
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(Socket sock,
TcpDiscoveryAbstractMessage msg, byte[] data,
+ long timeout) throws IOException {
+ if (sock.getPort() != FAIL_PORT)
+ super.writeToSocket(sock, msg, data, timeout);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(Socket sock,
TcpDiscoveryAbstractMessage msg,
+ long timeout) throws IOException, IgniteCheckedException {
+ if (sock.getPort() != FAIL_PORT)
+ super.writeToSocket(sock, msg, timeout);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(ClusterNode node, Socket sock,
OutputStream out,
+ TcpDiscoveryAbstractMessage msg, long timeout) throws IOException,
IgniteCheckedException {
+ if (sock.getPort() != FAIL_PORT)
+ super.writeToSocket(node, sock, out, msg, timeout);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(Socket sock, OutputStream out,
TcpDiscoveryAbstractMessage msg,
+ long timeout) throws IOException, IgniteCheckedException {
+ if (sock.getPort() != FAIL_PORT)
+ super.writeToSocket(sock, out, msg, timeout);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(TcpDiscoveryAbstractMessage
msg, Socket sock, int res,
+ long timeout) throws IOException {
+ if (sock.getPort() != FAIL_PORT)
+ super.writeToSocket(msg, sock, res, timeout);
+ }
+ }
+
+ /**
+ *
+ */
+ private enum SpiFailType {
+ /** */
+ REFUSE,
+
+ /** */
+ DROP
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
index 73c5688..a17639e 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
@@ -36,6 +36,7 @@ import
org.apache.ignite.spi.discovery.tcp.TcpClientDiscoverySpiMulticastTest;
import org.apache.ignite.spi.discovery.tcp.TcpClientDiscoverySpiSelfTest;
import
org.apache.ignite.spi.discovery.tcp.TcpClientDiscoveryUnresolvedHostTest;
import
org.apache.ignite.spi.discovery.tcp.TcpDiscoveryClientSuspensionSelfTest;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryFailedJoinTest;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryCoordinatorFailureTest;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryIpFinderCleanerTest;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryMarshallerCheckSelfTest;
@@ -133,6 +134,8 @@ import static
org.apache.ignite.IgniteSystemProperties.IGNITE_OVERRIDE_MCAST_GRP
IgniteClientReconnectMassiveShutdownSslTest.class,
TcpDiscoveryClientSuspensionSelfTest.class,
+ TcpDiscoveryFailedJoinTest.class,
+
// SSL.
TcpDiscoverySslSelfTest.class,
TcpDiscoverySslTrustedSelfTest.class,