Repository: ignite Updated Branches: refs/heads/master 2bf36865f -> a4a66d6e2
IGNITE-4111 Send a correct response when communication tries to send a message to a not yet joined node - Fixes #4685. Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a4a66d6e Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a4a66d6e Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a4a66d6e Branch: refs/heads/master Commit: a4a66d6e20e2c3fda009220b53c6ee520bf3289f Parents: 2bf3686 Author: NSAmelchev <nsamelc...@gmail.com> Authored: Mon Dec 10 15:09:20 2018 +0300 Committer: Alexey Goncharuk <alexey.goncha...@gmail.com> Committed: Mon Dec 10 15:09:20 2018 +0300 ---------------------------------------------------------------------- .../apache/ignite/internal/IgniteFeatures.java | 101 ++++++++++++++ .../apache/ignite/internal/IgniteKernal.java | 4 + .../ignite/internal/IgniteNodeAttributes.java | 3 + .../communication/GridIoMessageFactory.java | 8 +- .../internal/util/nio/GridDirectParser.java | 6 +- .../communication/tcp/TcpCommunicationSpi.java | 86 ++++++++++-- .../tcp/messages/HandshakeWaitMessage.java | 81 ++++++++++++ .../tcp/internal/TcpDiscoveryNodesRing.java | 2 +- ...iteTcpCommunicationHandshakeWaitSslTest.java | 35 +++++ ...IgniteTcpCommunicationHandshakeWaitTest.java | 131 +++++++++++++++++++ .../IgniteSpiCommunicationSelfTestSuite.java | 5 + 11 files changed, 449 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a4a66d6e/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java new file mode 100644 index 0000000..4802dce --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import java.util.BitSet; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.communication.tcp.messages.HandshakeWaitMessage; + +import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_FEATURES; + +/** + * Defines supported features and check its on other nodes. + */ +public enum IgniteFeatures { + /** + * Support of {@link HandshakeWaitMessage} by {@link TcpCommunicationSpi}. + */ + TCP_COMMUNICATION_SPI_HANDSHAKE_WAIT_MESSAGE(0); + + /** + * Unique feature identifier. + */ + private final int featureId; + + /** + * @param featureId Feature ID. + */ + IgniteFeatures(int featureId) { + this.featureId = featureId; + } + + /** + * @return Feature ID. + */ + public int getFeatureId() { + return featureId; + } + + /** + * Checks that feature supported by node. + * + * @param clusterNode Cluster node to check. + * @return {@code True} if feature is declared to be supported by remote node. + */ + public static boolean nodeSupports(ClusterNode clusterNode, IgniteFeatures feature) { + final byte[] features = clusterNode.attribute(ATTR_IGNITE_FEATURES); + + return features != null && BitSet.valueOf(features).get(feature.getFeatureId()); + } + + /** + * Checks that feature supported by all nodes. + * + * @param nodes cluster nodes to check their feature support. + * @return if feature is declared to be supported by all nodes + */ + public static boolean allNodesSupports(Iterable<ClusterNode> nodes, IgniteFeatures feature) { + for (ClusterNode next : nodes) { + if (!nodeSupports(next, feature)) + return false; + } + + return true; + } + + /** + * Features supported by the current node. + * + * @return Byte array representing all supported features by current node. + */ + public static byte[] allFeatures() { + final BitSet set = new BitSet(); + + for (IgniteFeatures value : IgniteFeatures.values()) { + final int featureId = value.getFeatureId(); + + assert !set.get(featureId) : "Duplicate feature ID found for [" + value + "] having same ID [" + + featureId + "]"; + + set.set(featureId); + } + + return set.toByteArray(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a4a66d6e/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 15f3c6a..a78037b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -249,6 +249,7 @@ import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_STORAGE_ import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_STREAMER_POOL_SIZE; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DYNAMIC_CACHE_START_ROLLBACK_SUPPORTED; +import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_FEATURES; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IPS; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_JIT_NAME; @@ -1620,6 +1621,9 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { // Save transactions configuration. add(ATTR_TX_CONFIG, cfg.getTransactionConfiguration()); + // Supported features. + add(ATTR_IGNITE_FEATURES, IgniteFeatures.allFeatures()); + // Stick in SPI versions and classes attributes. addSpiAttributes(cfg.getCollisionSpi()); addSpiAttributes(cfg.getDiscoverySpi()); http://git-wip-us.apache.org/repos/asf/ignite/blob/a4a66d6e/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java index 7db7fde..45ca234 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java @@ -205,6 +205,9 @@ public final class IgniteNodeAttributes { /** Internal attribute indicates that incoming cache requests should be validated on primary node as well. */ public static final String ATTR_VALIDATE_CACHE_REQUESTS = ATTR_CACHE + ".validate.cache.requests"; + /** Supported features. */ + public static final String ATTR_IGNITE_FEATURES = ATTR_PREFIX + ".features"; + /** * Enforces singleton. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/a4a66d6e/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 3f4eb18..b1c023a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -196,6 +196,7 @@ import org.apache.ignite.spi.collision.jobstealing.JobStealingRequest; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.communication.tcp.messages.HandshakeMessage; import org.apache.ignite.spi.communication.tcp.messages.HandshakeMessage2; +import org.apache.ignite.spi.communication.tcp.messages.HandshakeWaitMessage; import org.apache.ignite.spi.communication.tcp.messages.NodeIdMessage; import org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage; @@ -348,6 +349,11 @@ public class GridIoMessageFactory implements MessageFactory { break; + case TcpCommunicationSpi.HANDSHAKE_WAIT_MSG_TYPE: + msg = new HandshakeWaitMessage(); + + break; + case 0: msg = new GridJobCancelRequest(); @@ -1114,7 +1120,7 @@ public class GridIoMessageFactory implements MessageFactory { break; - // [-3..119] [124..129] [-23..-27] [-36..-55]- this + // [-3..119] [124..129] [-23..-28] [-36..-55] - this // [120..123] - DR // [-4..-22, -30..-35] - SQL // [2048..2053] - Snapshots http://git-wip-us.apache.org/repos/asf/ignite/blob/a4a66d6e/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java index 9e423bb..152ffe6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java @@ -28,6 +28,8 @@ import org.apache.ignite.plugin.extensions.communication.MessageFactory; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.makeMessageType; + /** * Parser for direct messages. */ @@ -76,9 +78,7 @@ public class GridDirectParser implements GridNioParser { byte b0 = buf.get(); byte b1 = buf.get(); - short type = (short)((b1 & 0xFF) << 8 | b0 & 0xFF); - - msg = msgFactory.create(type); + msg = msgFactory.create(makeMessageType(b0, b1)); } boolean finished = false; http://git-wip-us.apache.org/repos/asf/ignite/blob/a4a66d6e/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 3e3f7f9..044ffef 100755 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -66,6 +66,7 @@ import org.apache.ignite.failure.FailureContext; import org.apache.ignite.failure.FailureType; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteFeatures; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.IgniteKernal; @@ -145,6 +146,7 @@ import org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationConnecti import org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationNodeConnectionCheckFuture; import org.apache.ignite.spi.communication.tcp.messages.HandshakeMessage; import org.apache.ignite.spi.communication.tcp.messages.HandshakeMessage2; +import org.apache.ignite.spi.communication.tcp.messages.HandshakeWaitMessage; import org.apache.ignite.spi.communication.tcp.messages.NodeIdMessage; import org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage; import org.apache.ignite.spi.discovery.DiscoverySpi; @@ -157,6 +159,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR; import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION; import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.SSL_META; +import static org.apache.ignite.plugin.extensions.communication.Message.DIRECT_TYPE_SIZE; import static org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationConnectionCheckFuture.SES_FUT_META; import static org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage.ALREADY_CONNECTED; import static org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage.NEED_WAIT; @@ -373,6 +376,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati /** Handshake message type. */ public static final short HANDSHAKE_MSG_TYPE = -3; + /** Handshake wait message type. */ + public static final short HANDSHAKE_WAIT_MSG_TYPE = -28; + /** */ private ConnectGateway connectGate; @@ -408,11 +414,19 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati log.info("Accepted incoming communication connection [locAddr=" + ses.localAddress() + ", rmtAddr=" + ses.remoteAddress() + ']'); - if (log.isDebugEnabled()) - log.debug("Sending local node ID to newly accepted session: " + ses); - try { - ses.sendNoFuture(nodeIdMessage(), null); + if (ctxInitLatch.getCount() == 0 || !isHandshakeWaitSupported()) { + if (log.isDebugEnabled()) + log.debug("Sending local node ID to newly accepted session: " + ses); + + ses.sendNoFuture(nodeIdMessage(), null); + } + else { + if (log.isDebugEnabled()) + log.debug("Sending handshake wait message to newly accepted session: " + ses); + + ses.sendNoFuture(new HandshakeWaitMessage(), null); + } } catch (IgniteCheckedException e) { U.error(log, "Failed to send message: " + e, e); @@ -2309,12 +2323,19 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati }; GridNioMessageReaderFactory readerFactory = new GridNioMessageReaderFactory() { + private IgniteSpiContext context; + private MessageFormatter formatter; @Override public MessageReader reader(GridNioSession ses, MessageFactory msgFactory) throws IgniteCheckedException { - if (formatter == null) - formatter = getSpiContext().messageFormatter(); + final IgniteSpiContext ctx = TcpCommunicationSpi.super.getSpiContext(); + + if (formatter == null || context != ctx) { + context = ctx; + + formatter = context.messageFormatter(); + } assert formatter != null; @@ -2325,11 +2346,18 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati }; GridNioMessageWriterFactory writerFactory = new GridNioMessageWriterFactory() { + private IgniteSpiContext context; + private MessageFormatter formatter; @Override public MessageWriter writer(GridNioSession ses) throws IgniteCheckedException { - if (formatter == null) - formatter = getSpiContext().messageFormatter(); + final IgniteSpiContext ctx = TcpCommunicationSpi.super.getSpiContext(); + + if (formatter == null || context != ctx) { + context = ctx; + + formatter = context.messageFormatter(); + } assert formatter != null; @@ -3654,6 +3682,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati ByteBuffer handBuff = sslHnd.applicationBuffer(); + if (handBuff.remaining() >= DIRECT_TYPE_SIZE) { + short msgType = makeMessageType(handBuff.get(0), handBuff.get(1)); + + if (msgType == HANDSHAKE_WAIT_MSG_TYPE) + return NEED_WAIT; + } + if (handBuff.remaining() < NodeIdMessage.MESSAGE_FULL_SIZE) { buf = ByteBuffer.allocate(1000); @@ -3665,6 +3700,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati buf.flip(); buf = sslHnd.decode(buf); + + if (handBuff.remaining() >= DIRECT_TYPE_SIZE) { + short msgType = makeMessageType(handBuff.get(0), handBuff.get(1)); + + if (msgType == HANDSHAKE_WAIT_MSG_TYPE) + return NEED_WAIT; + } } else buf = handBuff; @@ -3678,6 +3720,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati if (read == -1) throw new HandshakeException("Failed to read remote node ID (connection closed)."); + if (read >= DIRECT_TYPE_SIZE) { + short msgType = makeMessageType(buf.get(0), buf.get(1)); + + if (msgType == HANDSHAKE_WAIT_MSG_TYPE) + return NEED_WAIT; + } + i += read; } } @@ -4036,6 +4085,17 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati return this; } + /** + * Checks whether remote nodes support {@link HandshakeWaitMessage}. + * + * @return {@code True} if remote nodes support {@link HandshakeWaitMessage}. + */ + private boolean isHandshakeWaitSupported() { + Collection<ClusterNode> nodes = ignite().configuration().getDiscoverySpi().getRemoteNodes(); + + return IgniteFeatures.allNodesSupports(nodes, IgniteFeatures.TCP_COMMUNICATION_SPI_HANDSHAKE_WAIT_MESSAGE); + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(TcpCommunicationSpi.class, this); @@ -4136,6 +4196,16 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati } /** + * Concatenates the two parameter bytes to form a message type value. + * + * @param b0 The first byte. + * @param b1 The second byte. + */ + public static short makeMessageType(byte b0, byte b1) { + return (short)((b1 & 0xFF) << 8 | b0 & 0xFF); + } + + /** * @param ignite Ignite. */ private static WorkersRegistry getWorkersRegistry(Ignite ignite) { http://git-wip-us.apache.org/repos/asf/ignite/blob/a4a66d6e/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeWaitMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeWaitMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeWaitMessage.java new file mode 100644 index 0000000..ae29d99 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeWaitMessage.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.communication.tcp.messages; + +import java.nio.ByteBuffer; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; + +import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.HANDSHAKE_WAIT_MSG_TYPE; + +/** + * Message requesting to wait until node's SPI context initialize. + */ +public class HandshakeWaitMessage implements Message { + /** */ + private static final long serialVersionUID = 0L; + + /** Full message size (with message type) in bytes. */ + public static final int MESSAGE_FULL_SIZE = DIRECT_TYPE_SIZE; + + /** + * Default constructor required by {@link Message}. + */ + public HandshakeWaitMessage() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + if (buf.remaining() < MESSAGE_FULL_SIZE) + return false; + + TcpCommunicationSpi.writeMessageType(buf, directType()); + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + return true; + } + + /** {@inheritDoc} */ + @Override public short directType() { + return HANDSHAKE_WAIT_MSG_TYPE; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 0; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HandshakeWaitMessage.class, this); + } + +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a4a66d6e/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java index 3a8ded7..e0a067f 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java @@ -162,7 +162,7 @@ public class TcpDiscoveryNodesRing { * @return Collection of visible remote nodes. */ public Collection<TcpDiscoveryNode> visibleRemoteNodes() { - return nodes(VISIBLE_NODES, F.remoteNodes(locNode.id())); + return nodes(F.remoteNodes(locNode.id()), VISIBLE_NODES); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/a4a66d6e/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationHandshakeWaitSslTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationHandshakeWaitSslTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationHandshakeWaitSslTest.java new file mode 100644 index 0000000..01a6fa4 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationHandshakeWaitSslTest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.communication.tcp; + +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.testframework.GridTestUtils; + +/** + * Testing {@link IgniteTcpCommunicationHandshakeWaitTest} with SSL enabled. + */ +public class IgniteTcpCommunicationHandshakeWaitSslTest extends IgniteTcpCommunicationHandshakeWaitTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setSslContextFactory(GridTestUtils.sslFactory()); + + return cfg; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a4a66d6e/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationHandshakeWaitTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationHandshakeWaitTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationHandshakeWaitTest.java new file mode 100644 index 0000000..f48f364 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationHandshakeWaitTest.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.communication.tcp; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * Testing {@link TcpCommunicationSpi} that will send the wait handshake message on received connections until SPI + * context initialized. + */ +public class IgniteTcpCommunicationHandshakeWaitTest extends GridCommonAbstractTest { + /** */ + private static final long COMMUNICATION_TIMEOUT = 1000; + + /** */ + private static final long DISCOVERY_MESSAGE_DELAY = 500; + + /** */ + private final AtomicBoolean slowNet = new AtomicBoolean(); + + /** */ + private final CountDownLatch latch = new CountDownLatch(1); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setPeerClassLoadingEnabled(false); + + TcpDiscoverySpi discoSpi = new SlowTcpDiscoverySpi(); + + cfg.setDiscoverySpi(discoSpi); + + TcpCommunicationSpi commSpi = new TcpCommunicationSpi(); + + commSpi.setConnectTimeout(COMMUNICATION_TIMEOUT); + commSpi.setMaxConnectTimeout(COMMUNICATION_TIMEOUT); + commSpi.setReconnectCount(1); + + cfg.setCommunicationSpi(commSpi); + + return cfg; + } + + /** + * Test that joining node will send the wait handshake message on received connections until SPI context + * initialized. + * + * @throws Exception If failed. + */ + public void testHandshakeOnNodeJoining() throws Exception { + System.setProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL, "true"); + + IgniteEx ignite = startGrid("srv1"); + + startGrid("srv2"); + + slowNet.set(true); + + IgniteInternalFuture fut = GridTestUtils.runAsync(() -> { + latch.await(2 * COMMUNICATION_TIMEOUT, TimeUnit.MILLISECONDS); + + Collection<ClusterNode> nodes = ignite.context().discovery().aliveServerNodes(); + + assertEquals(3, nodes.size()); + + return ignite.context().io().sendIoTest(new ArrayList<>(nodes), null, true).get(); + }); + + startGrid("srv3"); + + fut.get(); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + System.clearProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL); + } + + /** */ + private class SlowTcpDiscoverySpi extends TcpDiscoverySpi { + /** {@inheritDoc} */ + @Override protected boolean ensured(TcpDiscoveryAbstractMessage msg) { + if (slowNet.get() && msg instanceof TcpDiscoveryNodeAddFinishedMessage) { + try { + if (igniteInstanceName.contains("srv2") && msg.verified()) + latch.countDown(); + + U.sleep(DISCOVERY_MESSAGE_DELAY); + } + catch (IgniteInterruptedCheckedException e) { + throw new IgniteSpiException("Thread has been interrupted.", e); + } + } + + return super.ensured(msg); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a4a66d6e/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java index 50c0412..3bce671 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java @@ -35,6 +35,8 @@ import org.apache.ignite.spi.communication.tcp.GridTcpCommunicationSpiStartStopS import org.apache.ignite.spi.communication.tcp.GridTcpCommunicationSpiTcpFailureDetectionSelfTest; import org.apache.ignite.spi.communication.tcp.GridTcpCommunicationSpiTcpNoDelayOffSelfTest; import org.apache.ignite.spi.communication.tcp.GridTcpCommunicationSpiTcpSelfTest; +import org.apache.ignite.spi.communication.tcp.IgniteTcpCommunicationHandshakeWaitSslTest; +import org.apache.ignite.spi.communication.tcp.IgniteTcpCommunicationHandshakeWaitTest; import org.apache.ignite.spi.communication.tcp.IgniteTcpCommunicationRecoveryAckClosureSelfTest; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiDropNodesTest; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiFaultyClientTest; @@ -87,6 +89,9 @@ public class IgniteSpiCommunicationSelfTestSuite extends TestSuite { suite.addTest(new TestSuite(TcpCommunicationStatisticsTest.class)); + suite.addTest(new TestSuite(IgniteTcpCommunicationHandshakeWaitTest.class)); + suite.addTest(new TestSuite(IgniteTcpCommunicationHandshakeWaitSslTest.class)); + //suite.addTest(new TestSuite(GridCacheDhtLockBackupSelfTest.class)); return suite;