Repository: ignite Updated Branches: refs/heads/master 3a4f23bfe -> d70477b20
IGNITE-7944: Disconnected client node tries to send JOB_CANCEL message. Applied fix: - Skip sending message if client disconnected; - Throw IgniteCheckedException if a client node is disconnected and communication client is null. This closes #3737. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d70477b2 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d70477b2 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d70477b2 Branch: refs/heads/master Commit: d70477b20ce5e1830a3167a8913e0c0593a2d2d2 Parents: 3a4f23b Author: Roman Guseinov <gromc...@gmail.com> Authored: Mon Apr 9 14:45:44 2018 +0300 Committer: devozerov <voze...@gridgain.com> Committed: Mon Apr 9 14:45:44 2018 +0300 ---------------------------------------------------------------------- .../processors/task/GridTaskProcessor.java | 2 +- .../communication/tcp/TcpCommunicationSpi.java | 26 +- .../TcpCommunicationSpiSkipMessageSendTest.java | 414 +++++++++++++++++++ .../IgniteSpiCommunicationSelfTestSuite.java | 3 + 4 files changed, 442 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d70477b2/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java index d27e116..2f0aa7b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java @@ -171,7 +171,7 @@ public class GridTaskProcessor extends GridProcessorAdapter implements IgniteCha IgniteClientDisconnectedCheckedException err = disconnectedError(reconnectFut); for (GridTaskWorker<?, ?> worker : tasks.values()) - worker.finishTask(null, err); + worker.finishTask(null, err, false); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/d70477b2/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index c5f366b..e1addd8 100755 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -64,6 +64,7 @@ import org.apache.ignite.events.Event; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener; @@ -2601,6 +2602,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati if (log.isTraceEnabled()) log.trace("Sending message with ack to node [node=" + node + ", msg=" + msg + ']'); + if (isLocalNodeDisconnected()) { + throw new IgniteSpiException("Failed to send a message to remote node because local node has " + + "been disconnected [rmtNodeId=" + node.id() + ']'); + } + ClusterNode locNode = getLocalNode(); if (locNode == null) @@ -2654,6 +2660,18 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati } /** + * @return {@code True} if local node in disconnected state. + */ + private boolean isLocalNodeDisconnected() { + boolean disconnected = false; + + if (ignite instanceof IgniteKernal) + disconnected = ((IgniteKernal)ignite).context().clientDisconnected(); + + return disconnected; + } + + /** * @param nodeId Node ID. * @param rmvClient Client to remove. * @return {@code True} if client was removed. @@ -2798,8 +2816,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati client = fut.get(); - if (client == null) - continue; + if (client == null) { + if (isLocalNodeDisconnected()) + throw new IgniteCheckedException("Unable to create TCP client due to local node disconnecting."); + else + continue; + } if (getSpiContext().node(nodeId) == null) { if (removeNodeClient(nodeId, client)) http://git-wip-us.apache.org/repos/asf/ignite/blob/d70477b2/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiSkipMessageSendTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiSkipMessageSendTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiSkipMessageSendTest.java new file mode 100644 index 0000000..c4bc8f2 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiSkipMessageSendTest.java @@ -0,0 +1,414 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.communication.tcp; + +import java.io.IOException; +import java.io.InputStream; +import java.net.Socket; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteCompute; +import org.apache.ignite.IgniteException; +import org.apache.ignite.Ignition; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.events.Event; +import org.apache.ignite.events.EventType; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.util.nio.GridCommunicationClient; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper; +import org.apache.ignite.spi.collision.fifoqueue.FifoQueueCollisionSpi; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.Nullable; + +/** + * Tests that the client will be segmented in time and won't hang due to canceling compute jobs. + */ +public class TcpCommunicationSpiSkipMessageSendTest extends GridCommonAbstractTest { + /** */ + private static final CountDownLatch COMPUTE_JOB_STARTED = new CountDownLatch(1); + + /** */ + private static final long FAILURE_DETECTION_TIMEOUT = 10000; + + /** */ + private static final long JOIN_TIMEOUT = 10000; + + /** */ + private static final long START_JOB_TIMEOUT = 10000; + + /** */ + private static final long DISABLE_NETWORK_DELAY = 2000; + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 2 * 60 * 1000; + } + + /** + * @throws Exception If failed. + */ + public void testClientSegmented() throws Exception { + Ignite server = null; + Ignite client = null; + + try { + server = Ignition.start(getConfig(false)); + + final CountDownLatch clientDisconnected = new CountDownLatch(1); + final CountDownLatch clientSegmented = new CountDownLatch(1); + + client = startClient(clientDisconnected, clientSegmented); + + final IgniteCompute compute = client.compute(); + + runJobAsync(compute); + + if (!COMPUTE_JOB_STARTED.await(START_JOB_TIMEOUT, TimeUnit.MILLISECONDS)) + fail("Compute job wasn't started."); + + disableNetwork(client); + + if (!clientDisconnected.await(FAILURE_DETECTION_TIMEOUT * 3, TimeUnit.MILLISECONDS)) + fail("Client wasn't disconnected."); + + if (!clientSegmented.await(JOIN_TIMEOUT * 2, TimeUnit.MILLISECONDS)) + fail("Client wasn't segmented."); + } + finally { + if (client != null) + client.close(); + + if (server != null) + server.close(); + } + } + + /** + * Simulate network disabling. + * + * @param ignite Ignite instance. + * @throws IgniteInterruptedCheckedException If thread sleep interrupted. + * @throws InterruptedException If waiting for network disabled failed (interrupted). + */ + private void disableNetwork(Ignite ignite) throws IgniteInterruptedCheckedException, InterruptedException { + U.sleep(DISABLE_NETWORK_DELAY); + + CustomCommunicationSpi communicationSpi = (CustomCommunicationSpi)ignite.configuration().getCommunicationSpi(); + + CustomDiscoverySpi discoverySpi = (CustomDiscoverySpi)ignite.configuration().getDiscoverySpi(); + + discoverySpi.disableNetwork(); + + communicationSpi.disableNetwork(); + + if (!discoverySpi.awaitNetworkDisabled(FAILURE_DETECTION_TIMEOUT * 2)) + fail("Network wasn't disabled."); + } + + /** + * Start compute jobs in the separate thread. + * + * @param compute Ignite compute instance. + */ + private void runJobAsync(final IgniteCompute compute) { + new Thread(new Runnable() { + @Override public void run() { + try { + compute.call(new IgniteCallable<Integer>() { + @Override public Integer call() throws Exception { + COMPUTE_JOB_STARTED.countDown(); + + // Simulate long-running job. + new CountDownLatch(1).await(); + + return null; + } + }); + } + catch (Exception e) { + e.printStackTrace(); + } + } + }).start(); + } + + /** + * Create Communication Spi instance. + * + * @param client Is a client node. + * @return Communication Spi. + */ + private TcpCommunicationSpi getCommunicationSpi(boolean client) { + TcpCommunicationSpi spi = new CustomCommunicationSpi(client); + + spi.setName("CustomCommunicationSpi"); + + return spi; + } + + /** + * Create Discovery Spi instance. + * + * @return Discovery Spi. + */ + private TcpDiscoverySpi getDiscoverySpi() { + TcpDiscoverySpi spi = new CustomDiscoverySpi(); + + spi.setName("CustomDiscoverySpi"); + + spi.setIpFinder(LOCAL_IP_FINDER); + + return spi; + } + + /** + * Create Ignite configuration. + * + * @param clientMode Client mode. + * @return Ignite configuration. + */ + private IgniteConfiguration getConfig(boolean clientMode) { + IgniteConfiguration cfg = new IgniteConfiguration(); + + cfg.setIgniteInstanceName(clientMode ? "client-node" : "server-node"); + + cfg.setClientMode(clientMode); + + cfg.setCommunicationSpi(getCommunicationSpi(clientMode)); + + if (!clientMode) { + cfg.setDiscoverySpi(getDiscoverySpi()); + + FifoQueueCollisionSpi collisionSpi = new FifoQueueCollisionSpi(); + + collisionSpi.setParallelJobsNumber(1); + + cfg.setCollisionSpi(collisionSpi); + } + else { + cfg.setFailureDetectionTimeout(FAILURE_DETECTION_TIMEOUT); + + cfg.setDiscoverySpi(getDiscoverySpi().setJoinTimeout(JOIN_TIMEOUT)); + } + + return cfg; + } + + /** + * Start client node. + * + * @param clientDisconnected Client is disconnected. + * @param clientSegmented Client is segmented. + * @return Ignite instance. + */ + private Ignite startClient(final CountDownLatch clientDisconnected, final CountDownLatch clientSegmented) { + Ignite ignite = Ignition.start(getConfig(true)); + + IgnitePredicate<Event> locLsnr = new IgnitePredicate<Event>() { + @Override public boolean apply(Event event) { + log.info("Client node received event: " + event.name()); + + if (event.type() == EventType.EVT_CLIENT_NODE_DISCONNECTED) + clientDisconnected.countDown(); + + if (event.type() == EventType.EVT_NODE_SEGMENTED) + clientSegmented.countDown(); + + return true; + } + }; + + ignite.events().localListen(locLsnr, + EventType.EVT_NODE_SEGMENTED, + EventType.EVT_CLIENT_NODE_DISCONNECTED); + + return ignite; + } + + /** + * Communication Spi that emulates connection troubles. + */ + class CustomCommunicationSpi extends TcpCommunicationSpi { + /** Network is disabled. */ + private volatile boolean networkDisabled = false; + + /** Additional logging is enabled. */ + private final boolean logEnabled; + + /** + * @param enableLogs Enable additional logging. + */ + CustomCommunicationSpi(boolean enableLogs) { + super(); + this.logEnabled = enableLogs; + } + + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode node, Message msg, + IgniteInClosure<IgniteException> ackC) throws IgniteSpiException { + String message = msg.toString(); + + if (logEnabled) + log.info("CustomCommunicationSpi.sendMessage: " + message); + + if (message.contains("TOPIC_JOB_CANCEL")) + closeTcpConnections(); + + super.sendMessage(node, msg, ackC); + } + + /** {@inheritDoc} */ + @Override protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx) throws IgniteCheckedException { + if (logEnabled) + log.info(String.format("CustomCommunicationSpi.createTcpClient [networkDisabled=%s, node=%s]", networkDisabled, node)); + + if (networkDisabled) { + IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(this, !node.isClient()); + + long timeout = timeoutHelper.nextTimeoutChunk(getConnectTimeout()); + + if (logEnabled) + log.info("CustomCommunicationSpi.createTcpClient [timeoutHelper.nextTimeoutChunk=" + timeout + "]"); + + sleep(timeout); + + return null; + } + else + return super.createTcpClient(node, connIdx); + } + + /** + * Simulate network disabling. + */ + void disableNetwork() { + networkDisabled = true; + } + + /** + * Close communication clients. It will lead that sendMessage method will be trying to create new ones. + */ + private void closeTcpConnections() { + final ConcurrentMap<UUID, GridCommunicationClient[]> clients = U.field(this, "clients"); + + Set<UUID> ids = clients.keySet(); + + if (ids.size() > 0) { + log.info("Close TCP clients: " + ids); + + for (UUID nodeId : ids) { + GridCommunicationClient[] clients0 = clients.remove(nodeId); + + if (clients0 != null) { + for (GridCommunicationClient client : clients0) { + if (client != null) + client.forceClose(); + } + } + } + + log.info("TCP clients are closed."); + } + } + } + + /** + * Discovery Spi that emulates connection troubles. + */ + class CustomDiscoverySpi extends TcpDiscoverySpi { + /** Network is disabled. */ + private volatile boolean networkDisabled = false; + + /** */ + private final CountDownLatch networkDisabledLatch = new CountDownLatch(1); + + /** */ + CustomDiscoverySpi() { + super(); + + setName("CustomDiscoverySpi"); + } + + /** {@inheritDoc} */ + @Override protected <T> T readMessage(Socket sock, @Nullable InputStream in, + long timeout) throws IOException, IgniteCheckedException { + if (networkDisabled) { + sleep(timeout); + + return null; + } + else + return super.readMessage(sock, in, timeout); + } + + /** {@inheritDoc} */ + @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, + long timeout) throws IOException, IgniteCheckedException { + if (networkDisabled) { + sleep(timeout); + + networkDisabledLatch.countDown(); + + throw new IgniteCheckedException("CustomDiscoverySpi: network is disabled."); + } + else + super.writeToSocket(sock, msg, timeout); + } + + /** + * Simulate network disabling. + */ + void disableNetwork() { + networkDisabled = true; + } + + /** + * Wait until the network is disabled. + */ + boolean awaitNetworkDisabled(long timeout) throws InterruptedException { + return networkDisabledLatch.await(timeout, TimeUnit.MILLISECONDS); + } + } + + /** + * Sleeps for given number of milliseconds. + * + * @param timeout Time to sleep (2000 ms by default). + * @throws IgniteInterruptedCheckedException If current thread interrupted. + */ + static void sleep(long timeout) throws IgniteInterruptedCheckedException { + if (timeout > 0) + U.sleep(timeout); + else + U.sleep(2000); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/d70477b2/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java index 7a4de1b..1b962bc 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java @@ -39,6 +39,7 @@ import org.apache.ignite.spi.communication.tcp.IgniteTcpCommunicationRecoveryAck import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiDropNodesTest; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiFaultyClientTest; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiHalfOpenedConnectionTest; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiSkipMessageSendTest; import org.apache.ignite.spi.communication.tcp.TcpCommunicationStatisticsTest; /** @@ -78,6 +79,8 @@ public class IgniteSpiCommunicationSelfTestSuite extends TestSuite { suite.addTest(new TestSuite(GridTcpCommunicationSpiConfigSelfTest.class)); + suite.addTest(new TestSuite(TcpCommunicationSpiSkipMessageSendTest.class)); + suite.addTest(new TestSuite(TcpCommunicationSpiFaultyClientTest.class)); suite.addTest(new TestSuite(TcpCommunicationSpiDropNodesTest.class)); suite.addTest(new TestSuite(TcpCommunicationSpiHalfOpenedConnectionTest.class));