Repository: ignite
Updated Branches:
  refs/heads/master e16a6e16a -> 1e77c3b0d


IGNITE-10622 Fixed missed discovery event in the case of the next node failure 
- Fixes #5628.

Signed-off-by: Alexey Goncharuk <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1e77c3b0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1e77c3b0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1e77c3b0

Branch: refs/heads/master
Commit: 1e77c3b0df68db0085b91aa5210e571a19d4bc00
Parents: e16a6e1
Author: Anton Kalashnikov <[email protected]>
Authored: Tue Dec 11 15:39:42 2018 +0300
Committer: Alexey Goncharuk <[email protected]>
Committed: Tue Dec 11 15:39:42 2018 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ServerImpl.java    |  2 +-
 .../TcpDiscoveryPendingMessageDeliveryTest.java | 55 ++++++++++++++++++++
 2 files changed, 56 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/1e77c3b0/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index bab9ec0..99e1424 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -3224,7 +3224,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                             assert !forceSndPending || msg instanceof 
TcpDiscoveryNodeLeftMessage;
 
-                            if (failure || forceSndPending) {
+                            if (failure || forceSndPending || newNextNode) {
                                 if (log.isDebugEnabled())
                                     log.debug("Pending messages will be sent 
[failure=" + failure +
                                         ", newNextNode=" + newNextNode +

http://git-wip-us.apache.org/repos/asf/ignite/blob/1e77c3b0/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java
index 6e5588a..51a8978 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java
@@ -24,6 +24,7 @@ import java.util.Set;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
@@ -76,6 +77,8 @@ public class TcpDiscoveryPendingMessageDeliveryTest extends 
GridCommonAbstractTe
             disco = new DyingDiscoverySpi();
         else if (igniteInstanceName.startsWith("listener"))
             disco = new ListeningDiscoverySpi();
+        else if (igniteInstanceName.startsWith("receiver"))
+            disco = new DyingThreadDiscoverySpi();
         else
             disco = new TcpDiscoverySpi();
 
@@ -191,6 +194,47 @@ public class TcpDiscoveryPendingMessageDeliveryTest 
extends GridCommonAbstractTe
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testDeliveryAllFailedMessagesInCorrectOrder() throws Exception 
{
+        IgniteEx coord = startGrid("coordinator");
+        TcpDiscoverySpi coordDisco = 
(TcpDiscoverySpi)coord.configuration().getDiscoverySpi();
+
+        Set<TcpDiscoveryAbstractMessage> sentEnsuredMsgs = new 
GridConcurrentHashSet<>();
+        coordDisco.addSendMessageListener(msg -> {
+            if (coordDisco.ensured(msg))
+                sentEnsuredMsgs.add(msg);
+        });
+
+        //Node which receive message but will not send it further around the 
ring.
+        IgniteEx receiver = startGrid("receiver");
+
+        //Node which will be failed first.
+        IgniteEx dummy = startGrid("dummy");
+
+        //Node which should received all fail message in any way.
+        startGrid("listener");
+
+        sentEnsuredMsgs.clear();
+        receivedEnsuredMsgs.clear();
+
+        blockMsgs = true;
+
+        log.info("Sending fail node messages");
+
+        coord.context().discovery().failNode(dummy.localNode().id(), "Dummy 
node failed");
+        coord.context().discovery().failNode(receiver.localNode().id(), 
"Receiver node failed");
+
+        boolean delivered = GridTestUtils.waitForCondition(() -> {
+            log.info("Waiting for messages delivery");
+
+            return receivedEnsuredMsgs.equals(sentEnsuredMsgs);
+        }, 5000);
+
+        assertTrue("Sent: " + sentEnsuredMsgs + "; received: " + 
receivedEnsuredMsgs, delivered);
+    }
+
+    /**
      * @param disco Discovery SPI.
      * @param id Message id.
      */
@@ -199,6 +243,17 @@ public class TcpDiscoveryPendingMessageDeliveryTest 
extends GridCommonAbstractTe
     }
 
     /**
+     * Discovery SPI, that makes a thread to die when {@code blockMsgs} is set 
to {@code true}.
+     */
+    private class DyingThreadDiscoverySpi extends TcpDiscoverySpi {
+        /** {@inheritDoc} */
+        @Override protected void 
startMessageProcess(TcpDiscoveryAbstractMessage msg) {
+            if (blockMsgs)
+                throw new RuntimeException("Thread is dying");
+        }
+    }
+
+    /**
      * Discovery SPI, that makes a node stop sending messages when {@code 
blockMsgs} is set to {@code true}.
      */
     private class DyingDiscoverySpi extends TcpDiscoverySpi {

Reply via email to