ignite-5566 TcpCommunicationSpi: optimized stop procedure - do not close connections in onContextDestroyed0 (it is called before discovery stop so remote nodes still consider node as alive and try to reconnect) - when started stopping then on connection open reply with special message so that remote nodes do not retry reconnects
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/039c02a6 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/039c02a6 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/039c02a6 Branch: refs/heads/ignite-2.1.2-exchange Commit: 039c02a67eb880af7550c79c6c5a92e1a589efa4 Parents: 63debcd Author: sboikov <[email protected]> Authored: Fri Jun 23 14:03:52 2017 +0300 Committer: sboikov <[email protected]> Committed: Fri Jun 23 14:03:52 2017 +0300 ---------------------------------------------------------------------- .../ignite/internal/GridJobSiblingImpl.java | 34 ++-- .../managers/communication/GridIoManager.java | 116 ++++++------ .../deployment/GridDeploymentCommunication.java | 6 + .../eventstorage/GridEventStorageManager.java | 5 + .../processors/cache/GridCacheIoManager.java | 135 +++---------- .../cache/binary/BinaryMetadataTransport.java | 5 + .../dht/atomic/GridDhtAtomicCache.java | 2 +- .../GridDhtPartitionsExchangeFuture.java | 14 +- .../query/GridCacheDistributedQueryFuture.java | 19 +- .../query/GridCacheDistributedQueryManager.java | 18 +- .../cache/transactions/IgniteTxManager.java | 8 + .../continuous/GridContinuousProcessor.java | 2 +- .../datastreamer/DataStreamerImpl.java | 5 + .../internal/processors/igfs/IgfsContext.java | 15 -- .../processors/job/GridJobProcessor.java | 3 +- .../internal/processors/job/GridJobWorker.java | 2 +- .../GridMarshallerMappingProcessor.java | 5 + .../processors/task/GridTaskProcessor.java | 3 +- .../processors/task/GridTaskWorker.java | 6 +- .../communication/tcp/TcpCommunicationSpi.java | 55 ++++-- .../managers/IgniteDiagnosticMessagesTest.java | 17 +- .../communication/GridIoManagerSelfTest.java | 2 +- .../IgniteRejectConnectOnNodeStopTest.java | 188 +++++++++++++++++++ .../ignite/testsuites/IgniteBasicTestSuite.java | 3 + .../query/h2/twostep/GridMapQueryExecutor.java | 2 +- 25 files changed, 412 insertions(+), 258 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/039c02a6/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingImpl.java index 79ac416..9c7f548 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingImpl.java @@ -24,8 +24,10 @@ import java.io.ObjectOutput; import java.util.Collection; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.compute.ComputeJobSibling; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; @@ -145,31 +147,18 @@ public class GridJobSiblingImpl implements ComputeJobSibling, Externalizable { @Override public void cancel() { GridTaskSessionImpl ses = ctx.session().getSession(sesId); - if (ses == null) { - Collection<ClusterNode> nodes = ctx.discovery().remoteNodes(); + Collection<ClusterNode> nodes = ses == null ? ctx.discovery().remoteNodes() : ctx.discovery().nodes(ses.getTopology()); - if (!nodes.isEmpty()) { + for (ClusterNode node : nodes) { + if (!ctx.localNodeId().equals(node.id())) { try { - ctx.io().sendToGridTopic(nodes, TOPIC_JOB_CANCEL, new GridJobCancelRequest(sesId, jobId), SYSTEM_POOL); + ctx.io().sendToGridTopic(node, TOPIC_JOB_CANCEL, new GridJobCancelRequest(sesId, jobId), SYSTEM_POOL); } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - } - - // Cancel local jobs directly. - ctx.job().cancelJob(sesId, jobId, false); + catch (ClusterTopologyCheckedException e) { + IgniteLogger log = ctx.log(GridJobSiblingImpl.class); - return; - } - - for (ClusterNode node : ctx.discovery().nodes(ses.getTopology())) { - if (ctx.localNodeId().equals(node.id())) - // Cancel local jobs directly. - ctx.job().cancelJob(ses.getId(), jobId, false); - else { - try { - ctx.io().sendToGridTopic(node, TOPIC_JOB_CANCEL, new GridJobCancelRequest(ses.getId(), jobId), SYSTEM_POOL); + if (log.isDebugEnabled()) + log.debug("Failed to send cancel request, node left [nodeId=" + node.id() + ", ses=" + ses + ']'); } catch (IgniteCheckedException e) { // Avoid stack trace for left nodes. @@ -179,6 +168,9 @@ public class GridJobSiblingImpl implements ComputeJobSibling, Externalizable { } } } + + // Cancel local jobs directly. + ctx.job().cancelJob(sesId, jobId, false); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/039c02a6/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 81692da..a1ddaf4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -54,9 +54,11 @@ import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.GridTopic; +import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteComponentType; import org.apache.ignite.internal.IgniteDeploymentCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.direct.DirectMessageReader; import org.apache.ignite.internal.direct.DirectMessageWriter; import org.apache.ignite.internal.managers.GridManagerAdapter; @@ -1567,6 +1569,21 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa } /** + * @param nodeId Node ID. + * @param sndErr Send error. + * @param ping {@code True} if try ping node. + * @return {@code True} if node left. + * @throws IgniteClientDisconnectedCheckedException If ping failed. + */ + public boolean checkNodeLeft(UUID nodeId, IgniteCheckedException sndErr, boolean ping) + throws IgniteClientDisconnectedCheckedException + { + return sndErr instanceof ClusterTopologyCheckedException || + ctx.discovery().node(nodeId) == null || + (ping && !ctx.discovery().pingNode(nodeId)); + } + + /** * @param node Destination node. * @param topic Topic to send the message to. * @param topicOrd GridTopic enumeration ordinal. @@ -1628,6 +1645,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa getSpi().sendMessage(node, ioMsg); } catch (IgniteSpiException e) { + if (e.getCause() instanceof ClusterTopologyCheckedException) + throw (ClusterTopologyCheckedException)e.getCause(); + throw new IgniteCheckedException("Failed to send message (node may have left the grid or " + "TCP connection cannot be established due to firewall issues) " + "[node=" + node + ", topic=" + topic + @@ -1767,7 +1787,22 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa throws IgniteCheckedException { assert timeout > 0 || skipOnTimeout; - send(nodes, topic, topic.ordinal(), msg, plc, true, timeout, skipOnTimeout); + IgniteCheckedException err = null; + + for (ClusterNode node : nodes) { + try { + send(node, topic, topic.ordinal(), msg, plc, true, timeout, skipOnTimeout, null, false); + } + catch (IgniteCheckedException e) { + if (err == null) + err = e; + else + err.addSuppressed(e); + } + } + + if (err != null) + throw err; } /** @@ -1783,7 +1818,25 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa Message msg, byte plc ) throws IgniteCheckedException { - send(nodes, topic, topic.ordinal(), msg, plc, false, 0, false); + assert F.find(nodes, null, F.localNode(locNodeId)) == null : + "Internal Ignite code should never call the method with local node in a node list."; + + IgniteCheckedException err = null; + + for (ClusterNode node : nodes) { + try { + send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null, false); + } + catch (IgniteCheckedException e) { + if (err == null) + err = e; + else + err.addSuppressed(e); + } + } + + if (err != null) + throw err; } /** @@ -1815,17 +1868,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * * @param nodes Destination nodes. * @param msg Message to send. - * @throws IgniteCheckedException Thrown in case of any errors. - */ - void sendUserMessage(Collection<? extends ClusterNode> nodes, Object msg) throws IgniteCheckedException { - sendUserMessage(nodes, msg, null, false, 0, false); - } - - /** - * Sends a peer deployable user message. - * - * @param nodes Destination nodes. - * @param msg Message to send. * @param topic Message topic to use. * @param ordered Is message ordered? * @param timeout Message timeout in milliseconds for ordered messages. @@ -1959,54 +2001,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa } /** - * @param nodes Destination nodes. - * @param topic Topic to send the message to. - * @param topicOrd Topic ordinal value. - * @param msg Message to send. - * @param plc Type of processing. - * @param ordered Ordered flag. - * @param timeout Message timeout. - * @param skipOnTimeout Whether message can be skipped in timeout. - * @throws IgniteCheckedException Thrown in case of any errors. - */ - private void send( - Collection<? extends ClusterNode> nodes, - Object topic, - int topicOrd, - Message msg, - byte plc, - boolean ordered, - long timeout, - boolean skipOnTimeout - ) throws IgniteCheckedException { - assert nodes != null; - assert topic != null; - assert msg != null; - - if (!ordered) - assert F.find(nodes, null, F.localNode(locNodeId)) == null : - "Internal Ignite code should never call the method with local node in a node list."; - - try { - // Small optimization, as communication SPIs may have lighter implementation for sending - // messages to one node vs. many. - if (!nodes.isEmpty()) { - for (ClusterNode node : nodes) - send(node, topic, topicOrd, msg, plc, ordered, timeout, skipOnTimeout, null, false); - } - else if (log.isDebugEnabled()) - log.debug("Failed to send message to empty nodes collection [topic=" + topic + ", msg=" + - msg + ", policy=" + plc + ']'); - } - catch (IgniteSpiException e) { - throw new IgniteCheckedException("Failed to send message (nodes may have left the grid or " + - "TCP connection cannot be established due to firewall issues) " + - "[nodes=" + nodes + ", topic=" + topic + - ", msg=" + msg + ", policy=" + plc + ']', e); - } - } - - /** * @param topic Listener's topic. * @param lsnr Listener to add. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/039c02a6/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java index ffbde37..23d186a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java @@ -28,6 +28,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; @@ -299,6 +300,11 @@ class GridDeploymentCommunication { if (log.isDebugEnabled()) log.debug("Sent peer class loading response [node=" + node.id() + ", res=" + res + ']'); } + catch (ClusterTopologyCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send peer class loading response to node " + + "(node does not exist): " + nodeId); + } catch (IgniteCheckedException e) { if (ctx.discovery().pingNodeNoError(nodeId)) U.error(log, "Failed to send peer class loading response to node: " + nodeId, e); http://git-wip-us.apache.org/repos/asf/ignite/blob/039c02a6/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java index 6d74bd0..bd43e43 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java @@ -44,6 +44,7 @@ import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.GridTopic; import org.apache.ignite.internal.IgniteDeploymentCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.GridManagerAdapter; import org.apache.ignite.internal.managers.communication.GridIoManager; import org.apache.ignite.internal.managers.communication.GridMessageListener; @@ -1276,6 +1277,10 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> ctx.io().sendToCustomTopic(node, req.responseTopic(), res, PUBLIC_POOL); } + catch (ClusterTopologyCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send event query response, node failed [node=" + nodeId + ']'); + } catch (IgniteCheckedException e) { U.error(log, "Failed to send event query response to node [node=" + nodeId + ", res=" + res + ']', e); http://git-wip-us.apache.org/repos/asf/ignite/blob/039c02a6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 09eea27..a920bd0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -34,6 +34,7 @@ import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.binary.BinaryObjectException; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.communication.GridIoPolicy; @@ -1079,6 +1080,18 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { } /** + * @param nodeId Node ID. + * @param sndErr Send error. + * @return {@code True} if node left. + * @param ping {@code True} if try ping node. + * @throws IgniteClientDisconnectedCheckedException If ping failed. + */ + public boolean checkNodeLeft(UUID nodeId, IgniteCheckedException sndErr, boolean ping) + throws IgniteClientDisconnectedCheckedException { + return cctx.gridIO().checkNodeLeft(nodeId, sndErr, ping); + } + + /** * Sends communication message. * * @param node Node to send the message to. @@ -1107,6 +1120,9 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { return; } + catch (ClusterTopologyCheckedException e) { + throw e; + } catch (IgniteCheckedException e) { if (!cctx.discovery().alive(node.id()) || !cctx.discovery().pingNode(node.id())) throw new ClusterTopologyCheckedException("Node left grid while sending message to: " + node.id(), e); @@ -1125,119 +1141,6 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { } /** - * Sends message and automatically accounts for lefts nodes. - * - * @param nodes Nodes to send to. - * @param msg Message to send. - * @param plc IO policy. - * @param fallback Callback for failed nodes. - * @throws IgniteCheckedException If send failed. - */ - @SuppressWarnings({"BusyWait", "unchecked"}) - public void safeSend(Collection<? extends ClusterNode> nodes, GridCacheMessage msg, byte plc, - @Nullable IgnitePredicate<ClusterNode> fallback) throws IgniteCheckedException { - assert nodes != null; - assert msg != null; - - if (nodes.isEmpty()) { - if (log.isDebugEnabled()) - log.debug("Message will not be sent as collection of nodes is empty: " + msg); - - return; - } - - if (!onSend(msg, null)) - return; - - if (log.isDebugEnabled()) - log.debug("Sending cache message [msg=" + msg + ", nodes=" + U.toShortString(nodes) + ']'); - - final Collection<UUID> leftIds = new GridLeanSet<>(); - - int cnt = 0; - - while (cnt < retryCnt) { - try { - Collection<? extends ClusterNode> nodesView = F.view(nodes, new P1<ClusterNode>() { - @Override public boolean apply(ClusterNode e) { - return !leftIds.contains(e.id()); - } - }); - - cctx.gridIO().sendToGridTopic(nodesView, TOPIC_CACHE, msg, plc); - - boolean added = false; - - // Even if there is no exception, we still check here, as node could have - // ignored the message during stopping. - for (ClusterNode n : nodes) { - if (!leftIds.contains(n.id()) && !cctx.discovery().alive(n.id())) { - leftIds.add(n.id()); - - if (fallback != null && !fallback.apply(n)) - // If fallback signalled to stop. - return; - - added = true; - } - } - - if (added) { - if (!F.exist(F.nodeIds(nodes), F0.not(F.contains(leftIds)))) { - if (log.isDebugEnabled()) - log.debug("Message will not be sent because all nodes left topology [msg=" + msg + - ", nodes=" + U.toShortString(nodes) + ']'); - - return; - } - } - - break; - } - catch (IgniteCheckedException e) { - boolean added = false; - - for (ClusterNode n : nodes) { - if (!leftIds.contains(n.id()) && - (!cctx.discovery().alive(n.id()) || !cctx.discovery().pingNode(n.id()))) { - leftIds.add(n.id()); - - if (fallback != null && !fallback.apply(n)) - // If fallback signalled to stop. - return; - - added = true; - } - } - - if (!added) { - cnt++; - - if (cnt == retryCnt) - throw e; - - U.sleep(retryDelay); - } - - if (!F.exist(F.nodeIds(nodes), F0.not(F.contains(leftIds)))) { - if (log.isDebugEnabled()) - log.debug("Message will not be sent because all nodes left topology [msg=" + msg + ", nodes=" + - U.toShortString(nodes) + ']'); - - return; - } - - if (log.isDebugEnabled()) - log.debug("Message send will be retried [msg=" + msg + ", nodes=" + U.toShortString(nodes) + - ", leftIds=" + leftIds + ']'); - } - } - - if (log.isDebugEnabled()) - log.debug("Sent cache message [msg=" + msg + ", nodes=" + U.toShortString(nodes) + ']'); - } - - /** * Sends communication message. * * @param nodeId ID of node to send the message to. @@ -1282,6 +1185,9 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { return; } + catch (ClusterTopologyCheckedException e) { + throw e; + } catch (IgniteCheckedException e) { if (cctx.discovery().node(node.id()) == null) throw new ClusterTopologyCheckedException("Node left grid while sending ordered message to: " + node.id(), e); @@ -1327,6 +1233,9 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { if (log.isDebugEnabled()) log.debug("Sent cache message [msg=" + msg + ", node=" + U.toShortString(node) + ']'); } + catch (ClusterTopologyCheckedException e) { + throw e; + } catch (IgniteCheckedException e) { if (!cctx.discovery().alive(node.id())) throw new ClusterTopologyCheckedException("Node left grid while sending message to: " + node.id(), e); http://git-wip-us.apache.org/repos/asf/ignite/blob/039c02a6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java index e4df075..00c760f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java @@ -32,6 +32,7 @@ import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.GridTopic; import org.apache.ignite.internal.binary.BinaryMetadata; import org.apache.ignite.internal.binary.BinaryUtils; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.communication.GridIoManager; import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.discovery.CustomEventListener; @@ -589,6 +590,10 @@ final class BinaryMetadataTransport { try { ioMgr.sendToGridTopic(nodeId, GridTopic.TOPIC_METADATA_REQ, resp, SYSTEM_POOL); } + catch (ClusterTopologyCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send metadata response, node failed: " + nodeId); + } catch (IgniteCheckedException e) { U.error(log, "Failed to send up-to-date metadata response.", e); } http://git-wip-us.apache.org/repos/asf/ignite/blob/039c02a6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 52f007a..71cdaad 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1870,7 +1870,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (dhtFut != null) { if (req.writeSynchronizationMode() == PRIMARY_SYNC // To avoid deadlock disable back-pressure for sender data node. - && !ctx.discovery().cacheAffinityNode(ctx.discovery().node(nodeId), ctx.name()) + && !ctx.discovery().cacheAffinityNode(node, ctx.name()) && !dhtFut.isDone()) { final IgniteRunnable tracker = GridNioBackPressureControl.threadTracker(); http://git-wip-us.apache.org/repos/asf/ignite/blob/039c02a6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index a7122b3..c8138f7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -1114,7 +1114,19 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte log.debug("Sending full partition map [nodeIds=" + F.viewReadOnly(nodes, F.node2id()) + ", exchId=" + exchId + ", msg=" + m + ']'); - cctx.io().safeSend(nodes, m, SYSTEM_POOL, null); + for (ClusterNode node : nodes) { + try { + cctx.io().send(node, m, SYSTEM_POOL); + } + catch (IgniteCheckedException e) { + if (cctx.io().checkNodeLeft(node.id(), e, false)) { + if (log.isDebugEnabled()) + log.debug("Failed to send partitions, node failed: " + node); + } + else + U.error(log, "Failed to send partitions [node=" + node + ']', e); + } + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/039c02a6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java index da16db7..241a1e6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java @@ -115,14 +115,19 @@ public class GridCacheDistributedQueryFuture<K, V, R> extends GridCacheQueryFutu }); if (!nodes.isEmpty()) { - cctx.io().safeSend(nodes, req, cctx.ioPolicy(), - new P1<ClusterNode>() { - @Override public boolean apply(ClusterNode node) { - onNodeLeft(node.id()); - - return !isDone(); + for (ClusterNode node : nodes) { + try { + cctx.io().send(node, req, cctx.ioPolicy()); + } + catch (IgniteCheckedException e) { + if (cctx.io().checkNodeLeft(node.id(), e, false)) { + if (log.isDebugEnabled()) + log.debug("Failed to send cancel request, node failed: " + node); } - }); + else + U.error(log, "Failed to send cancel request [node=" + node + ']', e); + } + } } } catch (IgniteCheckedException e) { http://git-wip-us.apache.org/repos/asf/ignite/blob/039c02a6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java index b112e1d..63228a0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java @@ -800,13 +800,21 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage // For example, a remote reducer has a state, we should not serialize and then send // the reducer changed by the local node. if (!F.isEmpty(rmtNodes)) { - cctx.io().safeSend(rmtNodes, req, GridIoPolicy.QUERY_POOL, new P1<ClusterNode>() { - @Override public boolean apply(ClusterNode node) { - fut.onNodeLeft(node.id()); + for (ClusterNode node : rmtNodes) { + try { + cctx.io().send(node, req, GridIoPolicy.QUERY_POOL); + } + catch (IgniteCheckedException e) { + if (cctx.io().checkNodeLeft(node.id(), e, true)) { + fut.onNodeLeft(node.id()); - return !fut.isDone(); + if (fut.isDone()) + return; + } + else + throw e; } - }); + } } if (locNode != null) { http://git-wip-us.apache.org/repos/asf/ignite/blob/039c02a6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index 0877305..a9aa13d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -2475,6 +2475,10 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { cctx.gridIO().sendToGridTopic(nodeId, TOPIC_TX, res, SYSTEM_POOL); } + catch (ClusterTopologyCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send response, node failed: " + nodeId); + } catch (IgniteCheckedException e) { U.error(log, "Failed to send response to node [node=" + nodeId + ", res=" + res + ']', e); } @@ -2512,6 +2516,10 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { try { cctx.gridIO().sendToGridTopic(nodeId, TOPIC_TX, res, SYSTEM_POOL); } + catch (ClusterTopologyCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send response, node failed: " + nodeId); + } catch (IgniteCheckedException e) { U.error(log, "Failed to send response to node (is node still alive?) [nodeId=" + nodeId + ", res=" + res + ']', e); http://git-wip-us.apache.org/repos/asf/ignite/blob/039c02a6/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index a72dcd6..f641399 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -1405,7 +1405,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { break; } - catch (IgniteInterruptedCheckedException e) { + catch (ClusterTopologyCheckedException | IgniteInterruptedCheckedException e) { throw e; } catch (IgniteCheckedException e) { http://git-wip-us.apache.org/repos/asf/ignite/blob/039c02a6/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index a991385..40988d3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -1709,6 +1709,11 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed if (log.isDebugEnabled()) log.debug("Sent request to node [nodeId=" + node.id() + ", req=" + req + ']'); } + catch (ClusterTopologyCheckedException e) { + GridFutureAdapter<Object> fut0 = ((GridFutureAdapter<Object>)fut); + + fut0.onDone(e); + } catch (IgniteCheckedException e) { GridFutureAdapter<Object> fut0 = ((GridFutureAdapter<Object>)fut); http://git-wip-us.apache.org/repos/asf/ignite/blob/039c02a6/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java index 0b2558a..0e049c4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java @@ -177,21 +177,6 @@ public class IgfsContext { } /** - * @param node Node. - * @param topic Topic. - * @param msg Message. - * @param plc Policy. - * @throws IgniteCheckedException In case of error. - */ - public void send(ClusterNode node, Object topic, IgfsCommunicationMessage msg, byte plc) - throws IgniteCheckedException { - if (!kernalContext().localNodeId().equals(node.id())) - msg.prepareMarshal(kernalContext().config().getMarshaller()); - - kernalContext().io().sendToCustomTopic(node, topic, msg, plc); - } - - /** * Checks if given node is a IGFS node. * * @param node Node to check. http://git-wip-us.apache.org/repos/asf/ignite/blob/039c02a6/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java index e0bc4d2..408396a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java @@ -55,6 +55,7 @@ import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.GridTaskSessionImpl; import org.apache.ignite.internal.GridTaskSessionRequest; import org.apache.ignite.internal.SkipDaemon; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.collision.GridCollisionJobContextAdapter; import org.apache.ignite.internal.managers.communication.GridIoManager; import org.apache.ignite.internal.managers.communication.GridMessageListener; @@ -1396,7 +1397,7 @@ public class GridJobProcessor extends GridProcessorAdapter { } catch (IgniteCheckedException e) { // The only option here is to log, as we must assume that resending will fail too. - if (isDeadNode(node.id())) + if ((e instanceof ClusterTopologyCheckedException) || isDeadNode(node.id())) // Avoid stack trace for left nodes. U.error(log, "Failed to reply to sender node because it left grid [nodeId=" + node.id() + ", jobId=" + req.getJobId() + ']'); http://git-wip-us.apache.org/repos/asf/ignite/blob/039c02a6/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java index c9129c1..56e3794 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java @@ -929,7 +929,7 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject { } catch (IgniteCheckedException e) { // Log and invoke the master-leave callback. - if (isDeadNode(taskNode.id())) { + if ((e instanceof ClusterTopologyCheckedException) || isDeadNode(taskNode.id())) { onMasterNodeLeft(); // Avoid stack trace for left nodes. http://git-wip-us.apache.org/repos/asf/ignite/blob/039c02a6/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java index 9b65aa5..8de6c49 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java @@ -30,6 +30,7 @@ import org.apache.ignite.events.Event; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.MarshallerContextImpl; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.communication.GridIoManager; import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.discovery.CustomEventListener; @@ -184,6 +185,10 @@ public class GridMarshallerMappingProcessor extends GridProcessorAdapter { new MissingMappingResponseMessage(platformId, typeId, resolvedClsName), SYSTEM_POOL); } + catch (ClusterTopologyCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send missing mapping response, node failed: " + nodeId); + } catch (IgniteCheckedException e) { U.error(log, "Failed to send missing mapping response.", e); } http://git-wip-us.apache.org/repos/asf/ignite/blob/039c02a6/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java index e73d292..6ae97dd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java @@ -53,6 +53,7 @@ import org.apache.ignite.internal.GridTaskSessionImpl; import org.apache.ignite.internal.GridTaskSessionRequest; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteDeploymentCheckedException; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.compute.ComputeTaskCancelledCheckedException; import org.apache.ignite.internal.managers.communication.GridIoManager; import org.apache.ignite.internal.managers.communication.GridMessageListener; @@ -994,7 +995,7 @@ public class GridTaskProcessor extends GridProcessorAdapter { false); } catch (IgniteCheckedException e) { - node = ctx.discovery().node(nodeId); + node = e instanceof ClusterTopologyCheckedException ? null : ctx.discovery().node(nodeId); if (node != null) { try { http://git-wip-us.apache.org/repos/asf/ignite/blob/039c02a6/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java index 5a1bfdb..ec95001 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java @@ -1284,6 +1284,10 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject { new GridJobCancelRequest(ses.getId(), res.getJobContext().getJobId(), /*courtesy*/true), PUBLIC_POOL); } + catch (ClusterTopologyCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send cancel request, node failed: " + nodeId); + } catch (IgniteCheckedException e) { try { if (!isDeadNode(nodeId)) @@ -1411,7 +1415,7 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject { IgniteException fakeErr = null; try { - boolean deadNode = isDeadNode(res.getNode().id()); + boolean deadNode = e instanceof ClusterTopologyCheckedException || isDeadNode(res.getNode().id()); // Avoid stack trace if node has left grid. if (deadNode) { http://git-wip-us.apache.org/repos/asf/ignite/blob/039c02a6/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 5d74a80..addb840 100755 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -138,6 +138,8 @@ import org.jsr166.LongAdder8; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.SSL_META; +import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RecoveryLastReceivedMessage.ALREADY_CONNECTED; +import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RecoveryLastReceivedMessage.NODE_STOPPING; /** * <tt>TcpCommunicationSpi</tt> is default communication SPI which uses @@ -487,7 +489,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati connectedNew(recoveryDesc, ses, true); else { if (c.failed) { - ses.send(new RecoveryLastReceivedMessage(-1)); + ses.send(new RecoveryLastReceivedMessage(ALREADY_CONNECTED)); for (GridNioSession ses0 : nioSrvr.sessions()) { ConnectionKey key0 = ses0.meta(CONN_IDX_META); @@ -520,7 +522,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati "to this node, rejecting [locNode=" + locNode.id() + ", rmtNode=" + sndId + ']'); - ses.send(new RecoveryLastReceivedMessage(-1)); + ses.send(new RecoveryLastReceivedMessage(ALREADY_CONNECTED)); return; } @@ -552,7 +554,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati "to this node, rejecting [locNode=" + locNode.id() + ", rmtNode=" + sndId + ']'); - ses.send(new RecoveryLastReceivedMessage(-1)); + ses.send(new RecoveryLastReceivedMessage(ALREADY_CONNECTED)); fut.onDone(oldClient); @@ -594,7 +596,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati ", rmtNodeOrder=" + rmtNode.order() + ']'); } - ses.send(new RecoveryLastReceivedMessage(-1)); + ses.send(new RecoveryLastReceivedMessage(ALREADY_CONNECTED)); } else { // The code below causes a race condition between shmem and TCP (see IGNITE-1294) @@ -608,7 +610,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati } } - @Override public void onMessage(GridNioSession ses, Message msg) { + @Override public void onMessage(final GridNioSession ses, Message msg) { ConnectionKey connKey = ses.meta(CONN_IDX_META); if (connKey == null) { @@ -618,7 +620,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati if (log.isDebugEnabled()) log.debug("Close incoming connection, failed to enter gateway."); - ses.close(); + ses.send(new RecoveryLastReceivedMessage(NODE_STOPPING)).listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> fut) { + ses.close(); + } + }); return; } @@ -822,7 +828,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recoveryDesc.received()), lsnr); } else - nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(-1)); + nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(ALREADY_CONNECTED)); } catch (IgniteCheckedException e) { U.error(log, "Failed to send message: " + e, e); @@ -2377,14 +2383,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati if (connectGate != null) connectGate.stopped(); - // Force closing. - for (GridCommunicationClient[] clients0 : clients.values()) { - for (GridCommunicationClient client : clients0) { - if (client != null) - client.forceClose(); - } - } - getSpiContext().deregisterPorts(); getSpiContext().removeLocalEventListener(discoLsnr); @@ -3022,7 +3020,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati return null; } - long rcvCnt = -1; + Long rcvCnt = null; Map<Integer, Object> meta = new HashMap<>(); @@ -3050,11 +3048,19 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati sslMeta, handshakeConnIdx); - if (rcvCnt == -1) + if (rcvCnt == ALREADY_CONNECTED) { + recoveryDesc.release(); + return null; + } + else if (rcvCnt == NODE_STOPPING) { + recoveryDesc.release(); + + throw new ClusterTopologyCheckedException("Remote node started stop procedure: " + node.id()); + } } finally { - if (recoveryDesc != null && rcvCnt == -1) + if (recoveryDesc != null && rcvCnt == null) recoveryDesc.release(); } @@ -3144,6 +3150,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati // Continue loop. } } + catch (ClusterTopologyCheckedException e) { + throw e; + } catch (Exception e) { if (client != null) { client.forceClose(); @@ -4045,6 +4054,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati client.release(); } + catch (ClusterTopologyCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Recovery reconnect failed, node stopping [rmtNode=" + recoveryDesc.node().id() + ']'); + } catch (IgniteCheckedException | IgniteException e) { try { if (recoveryDesc.nodeAlive(getSpiContext().node(node.id())) && getSpiContext().pingNode(node.id())) { @@ -4439,6 +4452,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati /** */ private static final long serialVersionUID = 0L; + /** */ + static final long ALREADY_CONNECTED = -1; + + /** */ + static final long NODE_STOPPING = -2; + /** Message body size in bytes. */ private static final int MESSAGE_SIZE = 8; http://git-wip-us.apache.org/repos/asf/ignite/blob/039c02a6/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesTest.java index 1d1b519..572f356 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesTest.java @@ -155,7 +155,6 @@ public class IgniteDiagnosticMessagesTest extends GridCommonAbstractTest { final Ignite node1 = ignite(1); UUID id0 = node0.cluster().localNode().id(); - UUID id1 = node1.cluster().localNode().id(); TestRecordingCommunicationSpi.spi(node0).blockMessages(GridNearSingleGetResponse.class, node1.name()); @@ -233,9 +232,9 @@ public class IgniteDiagnosticMessagesTest extends GridCommonAbstractTest { IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { @Override public Void call() throws Exception { - try (Transaction tx = node1.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { - IgniteCache<Object, Object> cache = node1.cache(DEFAULT_CACHE_NAME); + IgniteCache<Object, Object> cache = node1.cache(DEFAULT_CACHE_NAME); + try (Transaction tx = node1.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { Integer key = keys.get(idx.getAndIncrement() % keys.size()); cache.putIfAbsent(key, String.valueOf(key)); @@ -332,9 +331,9 @@ public class IgniteDiagnosticMessagesTest extends GridCommonAbstractTest { fut.add(GridTestUtils.runAsync(new Callable<Void>() { @Override public Void call() throws Exception { - try (Transaction tx = node0.transactions().txStart()) { - IgniteCache<Object, Object> cache = node0.cache(DEFAULT_CACHE_NAME); + IgniteCache<Object, Object> cache = node0.cache(DEFAULT_CACHE_NAME); + try (Transaction tx = node0.transactions().txStart()) { key.set(primaryKey(cache)); cache.putIfAbsent(key.get(), "dummy val"); @@ -351,11 +350,11 @@ public class IgniteDiagnosticMessagesTest extends GridCommonAbstractTest { fut.add(GridTestUtils.runAsync(new Callable<Void>() { @Override public Void call() throws Exception { + IgniteCache<Object, Object> cache = node1.cache(DEFAULT_CACHE_NAME); + try (Transaction tx = node1.transactions().txStart()) { l1.await(); - IgniteCache<Object, Object> cache = node1.cache(DEFAULT_CACHE_NAME); - cache.replace(key.get(), "dummy val2"); tx.commit(); @@ -431,9 +430,9 @@ public class IgniteDiagnosticMessagesTest extends GridCommonAbstractTest { IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { @Override public Void call() throws Exception { - try (Transaction tx = node1.transactions().txStart()) { - IgniteCache<Object, Object> cache = node1.cache(DEFAULT_CACHE_NAME); + IgniteCache<Object, Object> cache = node1.cache(DEFAULT_CACHE_NAME); + try (Transaction tx = node1.transactions().txStart()) { Integer key = keys.get(idx.getAndIncrement()); cache.getAndPut(key, "new-" + key); http://git-wip-us.apache.org/repos/asf/ignite/blob/039c02a6/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java index 3f6318f..9f447df 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java @@ -107,7 +107,7 @@ public class GridIoManagerSelfTest extends GridCommonAbstractTest { GridIoManager ioMgr = spy(new TestGridIoManager(ctx)); try { - ioMgr.sendUserMessage(F.asList(locNode, rmtNode), msg); + ioMgr.sendUserMessage(F.asList(locNode, rmtNode), msg, null, false, 0, false); } catch (IgniteCheckedException ignored) { // No-op. We are using mocks so real sending is impossible. http://git-wip-us.apache.org/repos/asf/ignite/blob/039c02a6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteRejectConnectOnNodeStopTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteRejectConnectOnNodeStopTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteRejectConnectOnNodeStopTest.java new file mode 100644 index 0000000..d34de12 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteRejectConnectOnNodeStopTest.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import java.util.concurrent.CountDownLatch; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCompute; +import org.apache.ignite.cluster.ClusterGroup; +import org.apache.ignite.cluster.ClusterTopologyException; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static java.util.concurrent.TimeUnit.MINUTES; + +/** + * Sanity test to check that node starts to reject connections when stop procedure started. + */ +public class IgniteRejectConnectOnNodeStopTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private boolean client; + + /** */ + private static CountDownLatch stopLatch = new CountDownLatch(1); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setDiscoverySpi(new TestDiscoverySpi()); + + TcpDiscoverySpi discoSpi = ((TcpDiscoverySpi)cfg.getDiscoverySpi()); + + discoSpi.setReconnectCount(2); + discoSpi.setAckTimeout(30_000); + discoSpi.setSocketTimeout(30_000); + discoSpi.setIpFinder(IP_FINDER); + + TcpCommunicationSpi commSpi = (TcpCommunicationSpi)cfg.getCommunicationSpi(); + + commSpi.setConnectTimeout(600000); + commSpi.setMaxConnectTimeout(600000 * 10); + commSpi.setReconnectCount(100); + commSpi.setSocketWriteTimeout(600000); + commSpi.setAckSendThreshold(100); + + cfg.setClientMode(client); + + CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME); + + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testNodeStop() throws Exception { + Ignite srv = startGrid(0); + + client = true; + + final Ignite c = startGrid(1); + + ClusterGroup grp = srv.cluster().forClients(); + + IgniteCompute srvCompute = srv.compute(grp); + + srvCompute.call(new DummyClosure()); + + IgniteInternalFuture fut = GridTestUtils.runAsync(new Runnable() { + @Override public void run() { + IgniteCache cache = c.cache(DEFAULT_CACHE_NAME); + + for (int i = 0; i < 100_000; i++) { + try { + cache.put(1, 1); + } + catch (Exception ignore) { + break; + } + } + } + }, "cache-put"); + + U.sleep(100); + + final CountDownLatch stopStartLatch = new CountDownLatch(1); + + IgniteInternalFuture<?> fut2 = GridTestUtils.runAsync(new Runnable() { + @Override public void run() { + stopStartLatch.countDown(); + + c.close(); + } + }); + + boolean err = false; + + try{ + stopStartLatch.await(); + + IgniteCacheMessageRecoveryAbstractTest.closeSessions(srv); + + long stopTime = U.currentTimeMillis() + 10_000; + + while (U.currentTimeMillis() < stopTime) { + try { + srvCompute.call(new DummyClosure()); + } + catch (ClusterTopologyException e) { + err = true; + + assertFalse(fut2.isDone()); + + break; + } + } + } + finally { + stopLatch.countDown(); + } + + fut.get(); + fut2.get(); + + assertTrue("Failed to get excpected error", err); + } + + + /** + * + */ + public static class DummyClosure implements IgniteCallable<Object> { + /** {@inheritDoc} */ + @Override public Object call() throws Exception { + return 1; + } + } + + /** + * + */ + static class TestDiscoverySpi extends TcpDiscoverySpi { + @Override public void spiStop() throws IgniteSpiException { + // Called communication SPI onContextDestroyed, but do not allow discovery to stop. + if (ignite.configuration().isClientMode()) { + try { + stopLatch.await(1, MINUTES); + } + catch (InterruptedException ignore) { + // No-op. + } + } + + super.spiStop(); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/039c02a6/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java index 140b1a5..de509ab 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java @@ -43,6 +43,7 @@ import org.apache.ignite.internal.processors.cache.IgniteDaemonNodeMarshallerCac import org.apache.ignite.internal.processors.cache.IgniteMarshallerCacheClassNameConflictTest; import org.apache.ignite.internal.processors.cache.IgniteMarshallerCacheClientRequestsMappingOnMissTest; import org.apache.ignite.internal.processors.cache.IgniteMarshallerCacheConcurrentReadWriteTest; +import org.apache.ignite.internal.processors.cache.distributed.IgniteRejectConnectOnNodeStopTest; import org.apache.ignite.internal.processors.closure.GridClosureProcessorSelfTest; import org.apache.ignite.internal.processors.closure.GridClosureSerializationTest; import org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest; @@ -177,6 +178,8 @@ public class IgniteBasicTestSuite extends TestSuite { suite.addTestSuite(IgniteDiagnosticMessagesTest.class); + suite.addTestSuite(IgniteRejectConnectOnNodeStopTest.class); + return suite; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/039c02a6/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index aa97197..6b7ba75 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -791,7 +791,7 @@ public class GridMapQueryExecutor { ctx.io().sendToGridTopic(node, GridTopic.TOPIC_QUERY, msg, QUERY_POOL); } catch (IgniteCheckedException e) { - log.error("Failed to send message.", e); + U.error(log, "Failed to send message.", e); throw new IgniteException(e); }
