Repository: ignite Updated Branches: refs/heads/master e16a6e16a -> 1e77c3b0d
IGNITE-10622 Fixed missed discovery event in the case of the next node failure - Fixes #5628. Signed-off-by: Alexey Goncharuk <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1e77c3b0 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1e77c3b0 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1e77c3b0 Branch: refs/heads/master Commit: 1e77c3b0df68db0085b91aa5210e571a19d4bc00 Parents: e16a6e1 Author: Anton Kalashnikov <[email protected]> Authored: Tue Dec 11 15:39:42 2018 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Tue Dec 11 15:39:42 2018 +0300 ---------------------------------------------------------------------- .../ignite/spi/discovery/tcp/ServerImpl.java | 2 +- .../TcpDiscoveryPendingMessageDeliveryTest.java | 55 ++++++++++++++++++++ 2 files changed, 56 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1e77c3b0/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index bab9ec0..99e1424 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -3224,7 +3224,7 @@ class ServerImpl extends TcpDiscoveryImpl { assert !forceSndPending || msg instanceof TcpDiscoveryNodeLeftMessage; - if (failure || forceSndPending) { + if (failure || forceSndPending || newNextNode) { if (log.isDebugEnabled()) log.debug("Pending messages will be sent [failure=" + failure + ", newNextNode=" + newNextNode + http://git-wip-us.apache.org/repos/asf/ignite/blob/1e77c3b0/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java index 6e5588a..51a8978 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java @@ -24,6 +24,7 @@ import java.util.Set; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper; import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; @@ -76,6 +77,8 @@ public class TcpDiscoveryPendingMessageDeliveryTest extends GridCommonAbstractTe disco = new DyingDiscoverySpi(); else if (igniteInstanceName.startsWith("listener")) disco = new ListeningDiscoverySpi(); + else if (igniteInstanceName.startsWith("receiver")) + disco = new DyingThreadDiscoverySpi(); else disco = new TcpDiscoverySpi(); @@ -191,6 +194,47 @@ public class TcpDiscoveryPendingMessageDeliveryTest extends GridCommonAbstractTe } /** + * @throws Exception If failed. + */ + public void testDeliveryAllFailedMessagesInCorrectOrder() throws Exception { + IgniteEx coord = startGrid("coordinator"); + TcpDiscoverySpi coordDisco = (TcpDiscoverySpi)coord.configuration().getDiscoverySpi(); + + Set<TcpDiscoveryAbstractMessage> sentEnsuredMsgs = new GridConcurrentHashSet<>(); + coordDisco.addSendMessageListener(msg -> { + if (coordDisco.ensured(msg)) + sentEnsuredMsgs.add(msg); + }); + + //Node which receive message but will not send it further around the ring. + IgniteEx receiver = startGrid("receiver"); + + //Node which will be failed first. + IgniteEx dummy = startGrid("dummy"); + + //Node which should received all fail message in any way. + startGrid("listener"); + + sentEnsuredMsgs.clear(); + receivedEnsuredMsgs.clear(); + + blockMsgs = true; + + log.info("Sending fail node messages"); + + coord.context().discovery().failNode(dummy.localNode().id(), "Dummy node failed"); + coord.context().discovery().failNode(receiver.localNode().id(), "Receiver node failed"); + + boolean delivered = GridTestUtils.waitForCondition(() -> { + log.info("Waiting for messages delivery"); + + return receivedEnsuredMsgs.equals(sentEnsuredMsgs); + }, 5000); + + assertTrue("Sent: " + sentEnsuredMsgs + "; received: " + receivedEnsuredMsgs, delivered); + } + + /** * @param disco Discovery SPI. * @param id Message id. */ @@ -199,6 +243,17 @@ public class TcpDiscoveryPendingMessageDeliveryTest extends GridCommonAbstractTe } /** + * Discovery SPI, that makes a thread to die when {@code blockMsgs} is set to {@code true}. + */ + private class DyingThreadDiscoverySpi extends TcpDiscoverySpi { + /** {@inheritDoc} */ + @Override protected void startMessageProcess(TcpDiscoveryAbstractMessage msg) { + if (blockMsgs) + throw new RuntimeException("Thread is dying"); + } + } + + /** * Discovery SPI, that makes a node stop sending messages when {@code blockMsgs} is set to {@code true}. */ private class DyingDiscoverySpi extends TcpDiscoverySpi {
