This is an automated email from the ASF dual-hosted git repository. alexpl pushed a commit to branch ignite-2.9 in repository https://gitbox.apache.org/repos/asf/ignite.git
commit bd13defe2995f06cb8ffd35782db073257aca183 Author: ibessonov <[email protected]> AuthorDate: Tue Sep 1 15:17:32 2020 +0300 IGNITE-13013 Added ability to avoid opening of server socket on client nodes. - Fixes #8199. Signed-off-by: Aleksey Plekhanov <[email protected]> --- .../managers/communication/GridIoManager.java | 53 +++++------ .../managers/communication/GridIoMessage.java | 15 +-- .../communication/GridIoSecurityAwareMessage.java | 7 +- .../rest/protocols/GridRestProtocolAdapter.java | 2 +- .../apache/ignite/internal/util/IgniteUtils.java | 30 +++--- .../spi/communication/tcp/TcpCommunicationSpi.java | 36 +++++-- .../TcpInverseConnectionResponseMessage.java | 7 +- .../ignite/spi/discovery/tcp/TcpDiscoverySpi.java | 2 +- .../discovery/tcp/internal/TcpDiscoveryNode.java | 1 - .../multicast/TcpDiscoveryMulticastIpFinder.java | 2 +- .../datastreamer/DataStreamerImplSelfTest.java | 4 +- .../tcp/GridSandboxedClientWithoutNetworkTest.java | 105 +++++++++++++++++++++ ...unicationInverseConnectionEstablishingTest.java | 25 ++--- .../tcp/GridTotallyUnreachableClientTest.java | 101 ++++++++++++++++++++ .../IgniteSpiCommunicationSelfTestSuite.java | 6 +- 15 files changed, 301 insertions(+), 95 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 25581ce..278ed68 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -172,7 +172,6 @@ import static org.apache.ignite.internal.processors.tracing.SpanType.COMMUNICATI import static org.apache.ignite.internal.processors.tracing.messages.TraceableMessagesTable.traceName; import static org.apache.ignite.internal.util.nio.GridNioBackPressureControl.threadProcessingMessage; import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.ATTR_PAIRED_CONN; -import static org.apache.ignite.spi.communication.tcp.internal.TcpConnectionIndexAwareMessage.UNDEFINED_CONNECTION_INDEX; import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q_OPTIMIZED_RMV; /** @@ -2015,8 +2014,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa PUBLIC_POOL, false, 0, - false, - UNDEFINED_CONNECTION_INDEX + false ); try { @@ -2061,8 +2059,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa long timeout, boolean skipOnTimeout, IgniteInClosure<IgniteException> ackC, - boolean async, - int connIdx + boolean async ) throws IgniteCheckedException { assert node != null; assert topic != null; @@ -2073,8 +2070,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa try (TraceSurroundings ignored = support(null)) { MTC.span().addLog(() -> "Create communication msg - " + traceName(msg)); - GridIoMessage ioMsg = createGridIoMessage(topic, topicOrd, msg, plc, ordered, timeout, skipOnTimeout, - connIdx); + GridIoMessage ioMsg = createGridIoMessage(topic, topicOrd, msg, plc, ordered, timeout, skipOnTimeout); if (locNodeId.equals(node.id())) { @@ -2137,8 +2133,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa byte plc, boolean ordered, long timeout, - boolean skipOnTimeout, - int connIdx + boolean skipOnTimeout ) { if (ctx.security().enabled()) { UUID secSubjId = null; @@ -2148,10 +2143,10 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (!locNodeId.equals(curSecSubjId)) secSubjId = curSecSubjId; - return new GridIoSecurityAwareMessage(secSubjId, plc, topic, topicOrd, msg, ordered, timeout, skipOnTimeout, connIdx); + return new GridIoSecurityAwareMessage(secSubjId, plc, topic, topicOrd, msg, ordered, timeout, skipOnTimeout); } - return new GridIoMessage(plc, topic, topicOrd, msg, ordered, timeout, skipOnTimeout, connIdx); + return new GridIoMessage(plc, topic, topicOrd, msg, ordered, timeout, skipOnTimeout); } /** @@ -2185,7 +2180,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (node == null) throw new ClusterTopologyCheckedException("Failed to send message to node (has node left grid?): " + nodeId); - send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null, false, UNDEFINED_CONNECTION_INDEX); + send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null, false); } /** @@ -2197,7 +2192,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa */ public void sendToGridTopic(ClusterNode node, GridTopic topic, Message msg, byte plc) throws IgniteCheckedException { - send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null, false, UNDEFINED_CONNECTION_INDEX); + send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null, false); } /** @@ -2209,7 +2204,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa */ public void sendToCustomTopic(ClusterNode node, Object topic, Message msg, byte plc) throws IgniteCheckedException { - send(node, topic, -1, msg, plc, false, 0, false, null, false, UNDEFINED_CONNECTION_INDEX); + send(node, topic, -1, msg, plc, false, 0, false, null, false); } /** @@ -2222,7 +2217,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa */ public void sendToGridTopic(ClusterNode node, GridTopic topic, Message msg, byte plc, Span span) throws IgniteCheckedException { - send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null, false, UNDEFINED_CONNECTION_INDEX); + send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null, false); } /** @@ -2235,7 +2230,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa */ public void sendGeneric(ClusterNode node, Object topic, int topicOrd, Message msg, byte plc) throws IgniteCheckedException { - send(node, topic, topicOrd, msg, plc, false, 0, false, null, false, UNDEFINED_CONNECTION_INDEX); + send(node, topic, topicOrd, msg, plc, false, 0, false, null, false); } /** @@ -2257,7 +2252,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa ) throws IgniteCheckedException { assert timeout > 0 || skipOnTimeout; - send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null, false, UNDEFINED_CONNECTION_INDEX); + send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null, false); } /** @@ -2274,7 +2269,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa byte plc, IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException { - send(node, topic, topic.ordinal(), msg, plc, false, 0, false, ackC, false, UNDEFINED_CONNECTION_INDEX); + send(node, topic, topic.ordinal(), msg, plc, false, 0, false, ackC, false); } /** @@ -2301,7 +2296,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa for (ClusterNode node : nodes) { try { - send(node, topic, topic.ordinal(), msg, plc, true, timeout, skipOnTimeout, null, false, UNDEFINED_CONNECTION_INDEX); + send(node, topic, topic.ordinal(), msg, plc, true, timeout, skipOnTimeout, null, false); } catch (IgniteCheckedException e) { if (err == null) @@ -2332,7 +2327,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa for (ClusterNode node : nodes) { try { - send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null, false, UNDEFINED_CONNECTION_INDEX); + send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null, false); } catch (IgniteCheckedException e) { if (err == null) @@ -2367,7 +2362,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa ) throws IgniteCheckedException { assert timeout > 0 || skipOnTimeout; - send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackC, false, UNDEFINED_CONNECTION_INDEX); + send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackC, false); } /** @@ -2442,8 +2437,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa 0, false, null, - async, - UNDEFINED_CONNECTION_INDEX + async ); } else { @@ -2467,8 +2461,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa 0, false, null, - async, - UNDEFINED_CONNECTION_INDEX + async ); } } @@ -4396,16 +4389,15 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (!locNodeId.equals(msg.receiverNodeId())) return; + int connIdx = msg.connectionIndex(); + if (log.isInfoEnabled()) - log.info("Received inverse communication request from " + snd + " for connection index " - + msg.connectionIndex()); + log.info("Received inverse communication request from " + snd + " for connection index " + connIdx); TcpCommunicationSpi tcpCommSpi = getTcpCommunicationSpi(); assert !isPairedConnection(snd, tcpCommSpi); - int connIdx = msg.connectionIndex(); - responseSendService.submit(() -> { try { send(snd, @@ -4417,8 +4409,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa 0, false, null, - false, - connIdx + false ); } catch (IgniteCheckedException e) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java index 3fc5e63..72c2e2d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java @@ -29,13 +29,12 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; -import org.apache.ignite.spi.communication.tcp.internal.TcpConnectionIndexAwareMessage; import org.jetbrains.annotations.Nullable; /** * Wrapper for all grid messages. */ -public class GridIoMessage implements TcpConnectionIndexAwareMessage, SpanTransport { +public class GridIoMessage implements Message, SpanTransport { /** */ public static final Integer STRIPE_DISABLED_PART = Integer.MIN_VALUE; @@ -68,9 +67,6 @@ public class GridIoMessage implements TcpConnectionIndexAwareMessage, SpanTransp /** Message. */ private Message msg; - /** */ - private transient int connIdx = UNDEFINED_CONNECTION_INDEX; - /** Serialized span */ private byte[] span; @@ -98,8 +94,7 @@ public class GridIoMessage implements TcpConnectionIndexAwareMessage, SpanTransp Message msg, boolean ordered, long timeout, - boolean skipOnTimeout, - int connIdx + boolean skipOnTimeout ) { assert topic != null; assert topicOrd <= Byte.MAX_VALUE; @@ -112,7 +107,6 @@ public class GridIoMessage implements TcpConnectionIndexAwareMessage, SpanTransp this.ordered = ordered; this.timeout = timeout; this.skipOnTimeout = skipOnTimeout; - this.connIdx = connIdx; } /** @@ -178,11 +172,6 @@ public class GridIoMessage implements TcpConnectionIndexAwareMessage, SpanTransp return skipOnTimeout; } - /** {@inheritDoc} */ - @Override public int connectionIndex() { - return connIdx; - } - /** * @return {@code True} if message is ordered, {@code false} otherwise. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoSecurityAwareMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoSecurityAwareMessage.java index 3c3c13e..8e0f0f3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoSecurityAwareMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoSecurityAwareMessage.java @@ -23,7 +23,6 @@ import java.util.UUID; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; -import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; /** * @@ -55,7 +54,6 @@ public class GridIoSecurityAwareMessage extends GridIoMessage { * @param ordered Message ordered flag. * @param timeout Timeout. * @param skipOnTimeout Whether message can be skipped on timeout. - * @param connIdx Desired {@link TcpCommunicationSpi} connection index if applicable. */ public GridIoSecurityAwareMessage( UUID secSubjId, @@ -65,10 +63,9 @@ public class GridIoSecurityAwareMessage extends GridIoMessage { Message msg, boolean ordered, long timeout, - boolean skipOnTimeout, - int connIdx + boolean skipOnTimeout ) { - super(plc, topic, topicOrd, msg, ordered, timeout, skipOnTimeout, connIdx); + super(plc, topic, topicOrd, msg, ordered, timeout, skipOnTimeout); this.secSubjId = secSubjId; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/GridRestProtocolAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/GridRestProtocolAdapter.java index 0e9212b..76c6926 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/GridRestProtocolAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/GridRestProtocolAdapter.java @@ -164,7 +164,7 @@ public abstract class GridRestProtocolAdapter implements GridRestProtocol { ) : Collections.<IgniteBiTuple<String, Object>>emptyList(); } - catch (IgniteCheckedException | IOException ignored) { + catch (IOException ignored) { return null; } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index a8e4f5f..a6ec582 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -302,7 +302,7 @@ import static org.apache.ignite.internal.util.GridUnsafe.staticFieldOffset; /** * Collection of utility methods used throughout the system. */ -@SuppressWarnings({"UnusedReturnValue", "RedundantStringConstructorCall"}) +@SuppressWarnings({"UnusedReturnValue"}) public abstract class IgniteUtils { /** */ public static final long MB = 1024L * 1024; @@ -514,6 +514,9 @@ public abstract class IgniteUtils { /** Cached local host address to make sure that every time the same local host is returned. */ private static InetAddress locHost; + /** Supplier of network interfaces. Could be used for tests purposes, must not be changed in production code. */ + public static InterfaceSupplier INTERFACE_SUPPLIER = NetworkInterface::getNetworkInterfaces; + /** */ static volatile long curTimeMillis = System.currentTimeMillis(); @@ -2059,7 +2062,7 @@ public abstract class IgniteUtils { try { InetAddress inetAddr = InetAddress.getByName(addr); - for (NetworkInterface itf : asIterable(NetworkInterface.getNetworkInterfaces())) + for (NetworkInterface itf : asIterable(INTERFACE_SUPPLIER.getInterfaces())) for (InetAddress itfAddr : asIterable(itf.getInetAddresses())) if (itfAddr.equals(inetAddr)) return itf.getDisplayName(); @@ -2170,10 +2173,9 @@ public abstract class IgniteUtils { * @param locAddr Local address to resolve. * @return Resolved available addresses of given local address. * @throws IOException If failed. - * @throws IgniteCheckedException If no network interfaces found. */ public static IgniteBiTuple<Collection<String>, Collection<String>> resolveLocalAddresses(InetAddress locAddr) - throws IOException, IgniteCheckedException { + throws IOException { return resolveLocalAddresses(locAddr, false); } @@ -2186,10 +2188,9 @@ public abstract class IgniteUtils { * @param allHostNames If {@code true} then include host names for all addresses. * @return Resolved available addresses and host names of given local address. * @throws IOException If failed. - * @throws IgniteCheckedException If no network interfaces found. */ public static IgniteBiTuple<Collection<String>, Collection<String>> resolveLocalAddresses(InetAddress locAddr, - boolean allHostNames) throws IOException, IgniteCheckedException { + boolean allHostNames) throws IOException { assert locAddr != null; Collection<String> addrs = new ArrayList<>(); @@ -2202,7 +2203,7 @@ public abstract class IgniteUtils { if (res == null) { List<InetAddress> locAddrs = new ArrayList<>(); - for (NetworkInterface itf : asIterable(NetworkInterface.getNetworkInterfaces())) { + for (NetworkInterface itf : asIterable(INTERFACE_SUPPLIER.getInterfaces())) { for (InetAddress addr : asIterable(itf.getInetAddresses())) { if (!addr.isLinkLocalAddress()) locAddrs.add(addr); @@ -2215,7 +2216,7 @@ public abstract class IgniteUtils { addresses(addr, addrs, hostNames, allHostNames); if (F.isEmpty(addrs)) - throw new IgniteCheckedException("No network addresses found (is networking enabled?)."); + return F.t(Collections.emptyList(), Collections.emptyList()); res = F.t(addrs, hostNames); @@ -2288,7 +2289,7 @@ public abstract class IgniteUtils { else { List<NetworkInterface> itfs = new ArrayList<>(); - for (NetworkInterface itf : asIterable(NetworkInterface.getNetworkInterfaces())) + for (NetworkInterface itf : asIterable(INTERFACE_SUPPLIER.getInterfaces())) itfs.add(itf); Collections.sort(itfs, new Comparator<NetworkInterface>() { @@ -2385,7 +2386,7 @@ public abstract class IgniteUtils { List<String> ips = new ArrayList<>(4); try { - Enumeration<NetworkInterface> itfs = NetworkInterface.getNetworkInterfaces(); + Enumeration<NetworkInterface> itfs = INTERFACE_SUPPLIER.getInterfaces(); if (itfs != null) { for (NetworkInterface itf : asIterable(itfs)) { @@ -2450,7 +2451,7 @@ public abstract class IgniteUtils { List<String> macs = new ArrayList<>(3); try { - Enumeration<NetworkInterface> itfs = NetworkInterface.getNetworkInterfaces(); + Enumeration<NetworkInterface> itfs = INTERFACE_SUPPLIER.getInterfaces(); if (itfs != null) { for (NetworkInterface itf : asIterable(itfs)) { @@ -12030,4 +12031,11 @@ public abstract class IgniteUtils { return sb.toString(); } } + + /** Explicit class for {@code Supplier<Enumeration<NetworkInterface>>}. */ + @FunctionalInterface + public interface InterfaceSupplier { + /** Return collection of local network interfaces. */ + Enumeration<NetworkInterface> getInterfaces() throws SocketException; + } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 59f22ae..b5be8b0 100755 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -74,6 +74,7 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.IgniteTooManyOpenFilesException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.managers.communication.GridIoMessage; import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener; @@ -468,6 +469,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati public static final String RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_DESC = "Total number of messages received by current node from the given node"; + /** Client nodes might have port {@code 0} if they have no server socket opened. */ + public static final Integer DISABLED_CLIENT_PORT = 0; + /** */ private ConnectGateway connectGate; @@ -2308,7 +2312,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati @Override public Map<String, Object> getNodeAttributes() throws IgniteSpiException { initFailureDetectionTimeout(); - assertParameter(locPort > 1023, "locPort > 1023"); + if (Boolean.TRUE.equals(ignite.configuration().isClientMode())) + assertParameter(locPort > 1023 || locPort == -1, "locPort > 1023 || locPort == -1"); + else + assertParameter(locPort > 1023, "locPort > 1023"); + assertParameter(locPort <= 0xffff, "locPort < 0xffff"); assertParameter(locPortRange >= 0, "locPortRange >= 0"); assertParameter(idleConnTimeout > 0, "idleConnTimeout > 0"); @@ -2375,7 +2383,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati throw new IgniteSpiException("Failed to initialize TCP server: " + locHost, e); } - boolean forceClientToSrvConnections = forceClientToServerConnections(); + boolean forceClientToSrvConnections = forceClientToServerConnections() || locPort == -1; if (usePairedConnections && forceClientToSrvConnections) { throw new IgniteSpiException("Node using paired connections " + @@ -2386,6 +2394,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati try { IgniteBiTuple<Collection<String>, Collection<String>> addrs = U.resolveLocalAddresses(locHost); + if (locPort != -1 && addrs.get1().isEmpty() && addrs.get2().isEmpty()) + throw new IgniteCheckedException("No network addresses found (is networking enabled?)."); + Collection<InetSocketAddress> extAddrs = addrRslvr == null ? null : U.resolveAddresses(addrRslvr, F.flat(Arrays.asList(addrs.get1(), addrs.get2())), boundTcpPort); @@ -2397,7 +2408,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati res.put(createSpiAttributeName(ATTR_ADDRS), addrs.get1()); res.put(createSpiAttributeName(ATTR_HOST_NAMES), setEmptyHostNamesAttr ? emptyList() : addrs.get2()); - res.put(createSpiAttributeName(ATTR_PORT), boundTcpPort); + res.put(createSpiAttributeName(ATTR_PORT), boundTcpPort == -1 ? DISABLED_CLIENT_PORT : boundTcpPort); res.put(createSpiAttributeName(ATTR_SHMEM_PORT), boundTcpShmemPort >= 0 ? boundTcpShmemPort : null); res.put(createSpiAttributeName(ATTR_EXT_ADDRS), extAddrs); res.put(createSpiAttributeName(ATTR_PAIRED_CONN), usePairedConnections); @@ -2495,7 +2506,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati /** {@inheritDoc} } */ @Override public void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException { - spiCtx.registerPort(boundTcpPort, IgnitePortProtocol.TCP); + if (boundTcpPort > 0) + spiCtx.registerPort(boundTcpPort, IgnitePortProtocol.TCP); // SPI can start without shmem port. if (boundTcpShmemPort > 0) @@ -2541,7 +2553,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati IgniteCheckedException lastEx = null; // If configured TCP port is busy, find first available in range. - int lastPort = locPortRange == 0 ? locPort : locPort + locPortRange - 1; + int lastPort = locPort == -1 ? -1 + : locPortRange == 0 ? locPort : locPort + locPortRange - 1; for (int port = locPort; port <= lastPort; port++) { try { @@ -2983,8 +2996,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati int connIdx; - if (msg instanceof TcpConnectionIndexAwareMessage) { - int msgConnIdx = ((TcpConnectionIndexAwareMessage)msg).connectionIndex(); + Message connIdxMsg = msg instanceof GridIoMessage ? ((GridIoMessage)msg).message() : msg; + + if (connIdxMsg instanceof TcpConnectionIndexAwareMessage) { + int msgConnIdx = ((TcpConnectionIndexAwareMessage)connIdxMsg).connectionIndex(); connIdx = msgConnIdx == UNDEFINED_CONNECTION_INDEX ? connPlc.connectionIndex() : msgConnIdx; } @@ -3128,6 +3143,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati assert node != null; assert (connIdx >= 0 && connIdx < connectionsPerNode) || !usePairedConnections(node) : connIdx; + if (getLocalNode().isClient()) { + if (node.isClient()) { + if (DISABLED_CLIENT_PORT.equals(node.attribute(createSpiAttributeName(ATTR_PORT)))) + throw new IgniteSpiException("Cannot send message to the client node with no server socket opened."); + } + } + UUID nodeId = node.id(); while (true) { diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpInverseConnectionResponseMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpInverseConnectionResponseMessage.java index 388d5de..29d1040 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpInverseConnectionResponseMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpInverseConnectionResponseMessage.java @@ -19,7 +19,6 @@ package org.apache.ignite.spi.communication.tcp.internal; import java.nio.ByteBuffer; import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; @@ -30,7 +29,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter; * The main purpose of this message is to communicate back to server node connection index of a thread waiting for * establishing of communication connection. */ -public class TcpInverseConnectionResponseMessage implements Message { +public class TcpInverseConnectionResponseMessage implements TcpConnectionIndexAwareMessage { /** */ private static final long serialVersionUID = 0L; @@ -46,8 +45,8 @@ public class TcpInverseConnectionResponseMessage implements Message { this.connIdx = connIdx; } - /** */ - public int connectionIndex() { + /** {@inheritDoc} */ + @Override public int connectionIndex() { return connIdx; } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 6f9ad5b..40e106e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -1153,7 +1153,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements IgniteDiscovery try { addrs = U.resolveLocalAddresses(locHost); } - catch (IOException | IgniteCheckedException e) { + catch (IOException e) { throw new IgniteSpiException("Failed to resolve local host to set of external addresses: " + locHost, e); } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java index 325a1691..97edb64 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java @@ -182,7 +182,6 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Ignite Serializable consistentId) { assert id != null; - assert !F.isEmpty(addrs); assert metricsProvider != null; assert ver != null; diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java index d33f9bc..b1c85a6 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java @@ -464,7 +464,7 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder { try { locAddrs = U.resolveLocalAddresses(U.resolveLocalHost(locAddr)).get1(); } - catch (IOException | IgniteCheckedException e) { + catch (IOException e) { throw new IgniteSpiException("Failed to resolve local addresses [locAddr=" + locAddr + ']', e); } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java index 650ea1a..79dba60 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java @@ -60,7 +60,6 @@ import org.junit.Test; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; -import static org.apache.ignite.spi.communication.tcp.internal.TcpConnectionIndexAwareMessage.UNDEFINED_CONNECTION_INDEX; /** * Tests for {@code IgniteDataStreamerImpl}. @@ -610,8 +609,7 @@ public class DataStreamerImplSelfTest extends GridCommonAbstractTest { appMsg, GridTestUtils.<Boolean>getFieldValue(ioMsg, "ordered"), ioMsg.timeout(), - ioMsg.skipOnTimeout(), - UNDEFINED_CONNECTION_INDEX + ioMsg.skipOnTimeout() ); needStaleTop = false; diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridSandboxedClientWithoutNetworkTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridSandboxedClientWithoutNetworkTest.java new file mode 100644 index 0000000..0156030 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridSandboxedClientWithoutNetworkTest.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.communication.tcp; + +import java.net.NetworkInterface; +import java.util.Collections; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; + +/** + * Tests for communication over discovery feature (inverse communication request). + */ +public class GridSandboxedClientWithoutNetworkTest extends GridCommonAbstractTest { + /** */ + private int locPort; + + /** */ + private boolean useAnyLocAddress; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + stopAllGrids(); + + locPort = TcpCommunicationSpi.DFLT_PORT; + useAnyLocAddress = false; + IgniteUtils.INTERFACE_SUPPLIER = NetworkInterface::getNetworkInterfaces; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + + IgniteUtils.INTERFACE_SUPPLIER = NetworkInterface::getNetworkInterfaces; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setFailureDetectionTimeout(8_000); + + TcpCommunicationSpi communicationSpi = new TcpCommunicationSpi() + .setLocalPort(locPort); + + TcpDiscoverySpi discoverySpi = new TcpDiscoverySpi(); + + if (useAnyLocAddress) { + communicationSpi.setLocalAddress("0.0.0.0"); + discoverySpi.setLocalAddress("0.0.0.0"); + } + + cfg.setCommunicationSpi(communicationSpi); + cfg.setDiscoverySpi(discoverySpi); + + return cfg; + } + + /** + * Test that you can't send anything from client to another client that has "-1" local port. + * + * @throws Exception If failed. + */ + @Test + public void testNodeWithoutNetwork() throws Exception { + IgniteEx srv = startGrid(0); + + IgniteUtils.INTERFACE_SUPPLIER = Collections::emptyEnumeration; + + locPort = -1; + + useAnyLocAddress = true; + + IgniteEx client1 = startClientGrid(1); + + IgniteCache<String, String> testCache = client1.getOrCreateCache("test"); + + testCache.put("test", "test"); + + assertEquals("test", srv.getOrCreateCache("test").get("test")); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationInverseConnectionEstablishingTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationInverseConnectionEstablishingTest.java index ecc56e3..e2e4ebf 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationInverseConnectionEstablishingTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationInverseConnectionEstablishingTest.java @@ -83,9 +83,6 @@ public class GridTcpCommunicationInverseConnectionEstablishingTest extends GridC private static final int SRVS_NUM = 2; /** */ - private boolean clientMode; - - /** */ private boolean forceClientToSrvConnections; /** */ @@ -96,6 +93,8 @@ public class GridTcpCommunicationInverseConnectionEstablishingTest extends GridC super.beforeTest(); stopAllGrids(); + + forceClientToSrvConnections = false; } /** {@inheritDoc} */ @@ -111,7 +110,10 @@ public class GridTcpCommunicationInverseConnectionEstablishingTest extends GridC cfg.setFailureDetectionTimeout(8_000); - cfg.setCommunicationSpi(new TestCommunicationSpi()); + cfg.setCommunicationSpi( + new TestCommunicationSpi() + .setForceClientToServerConnections(forceClientToSrvConnections) + ); if (ccfg != null) { cfg.setCacheConfiguration(ccfg); @@ -119,9 +121,6 @@ public class GridTcpCommunicationInverseConnectionEstablishingTest extends GridC ccfg = null; } - if (clientMode) - cfg.setClientMode(true); - return cfg; } @@ -260,10 +259,9 @@ public class GridTcpCommunicationInverseConnectionEstablishingTest extends GridC }); } - clientMode = true; this.forceClientToSrvConnections = forceClientToSrvConnections; - startGrid(SRVS_NUM); + startClientGrid(SRVS_NUM); putAndCheckKey(); @@ -294,10 +292,9 @@ public class GridTcpCommunicationInverseConnectionEstablishingTest extends GridC return cfg.setGridLogger(log); }); - clientMode = true; forceClientToSrvConnections = false; - IgniteEx client = startGrid(SRVS_NUM); + IgniteEx client = startClientGrid(SRVS_NUM); ClusterNode clientNode = client.localNode(); IgniteEx srv = grid(SRVS_NUM - 1); @@ -370,11 +367,7 @@ public class GridTcpCommunicationInverseConnectionEstablishingTest extends GridC } /** */ - private class TestCommunicationSpi extends TcpCommunicationSpi { - { - setForceClientToServerConnections(forceClientToSrvConnections); - } - + private static class TestCommunicationSpi extends TcpCommunicationSpi { /** {@inheritDoc} */ @Override protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx) throws IgniteCheckedException { if (node.isClient()) { diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTotallyUnreachableClientTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTotallyUnreachableClientTest.java new file mode 100644 index 0000000..333048c --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTotallyUnreachableClientTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.communication.tcp; + +import java.util.concurrent.TimeUnit; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; + +/** + * Tests for communication over discovery feature (inverse communication request). + */ +public class GridTotallyUnreachableClientTest extends GridCommonAbstractTest { + /** */ + private boolean forceClientToSrvConnections; + + /** */ + private int locPort; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + stopAllGrids(); + + forceClientToSrvConnections = false; + locPort = TcpCommunicationSpi.DFLT_PORT; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setFailureDetectionTimeout(8_000); + + cfg.setCommunicationSpi( + new TcpCommunicationSpi() + .setForceClientToServerConnections(forceClientToSrvConnections) + .setLocalPort(locPort) + ); + + return cfg; + } + + /** + * Test that you can't send anything from client to another client that has "-1" local port. + * + * @throws Exception If failed. + */ + @Test + public void testTotallyUnreachableClient() throws Exception { + IgniteEx srv = startGrid(0); + + locPort = -1; + IgniteEx client1 = startClientGrid(1); + ClusterNode clientNode1 = client1.localNode(); + + IgniteInternalFuture<?> fut = GridTestUtils.runAsync(() -> + srv.context().io().sendIoTest(clientNode1, new byte[10], false).get() + ); + + fut.get(30, TimeUnit.SECONDS); + + locPort = TcpCommunicationSpi.DFLT_PORT; + + IgniteEx client2 = startClientGrid(2); + + GridTestUtils.assertThrowsAnyCause(log, () -> { + return GridTestUtils.runAsync(() -> + client2.context().io().sendIoTest(clientNode1, new byte[10], false).get() + ).get(30, TimeUnit.SECONDS); + }, IgniteSpiException.class, "Cannot send"); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java index ca62134..ab1bb87 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java @@ -17,6 +17,7 @@ package org.apache.ignite.testsuites; +import org.apache.ignite.spi.communication.tcp.GridSandboxedClientWithoutNetworkTest; import org.apache.ignite.spi.communication.tcp.GridTcpCommunicationInverseConnectionEstablishingTest; import org.apache.ignite.spi.communication.tcp.GridTcpCommunicationSpiConcurrentConnectSelfTest; import org.apache.ignite.spi.communication.tcp.GridTcpCommunicationSpiConcurrentConnectSslSelfTest; @@ -36,6 +37,7 @@ import org.apache.ignite.spi.communication.tcp.GridTcpCommunicationSpiStartStopS import org.apache.ignite.spi.communication.tcp.GridTcpCommunicationSpiTcpFailureDetectionSelfTest; import org.apache.ignite.spi.communication.tcp.GridTcpCommunicationSpiTcpNoDelayOffSelfTest; import org.apache.ignite.spi.communication.tcp.GridTcpCommunicationSpiTcpSelfTest; +import org.apache.ignite.spi.communication.tcp.GridTotallyUnreachableClientTest; import org.apache.ignite.spi.communication.tcp.IgniteTcpCommunicationConnectOnInitTest; import org.apache.ignite.spi.communication.tcp.IgniteTcpCommunicationHandshakeWaitSslTest; import org.apache.ignite.spi.communication.tcp.IgniteTcpCommunicationHandshakeWaitTest; @@ -103,7 +105,9 @@ import org.junit.runners.Suite; TcpCommunicationSpiMultiJvmTest.class, TooManyOpenFilesTcpCommunicationSpiTest.class, - GridTcpCommunicationInverseConnectionEstablishingTest.class + GridTcpCommunicationInverseConnectionEstablishingTest.class, + GridTotallyUnreachableClientTest.class, + GridSandboxedClientWithoutNetworkTest.class //GridCacheDhtLockBackupSelfTest.class, })
