Repository: ignite
Updated Branches:
  refs/heads/master 4b4d263ea -> e2cf5cf99


IGNITE-8922 Fix for discovery message delivery guarantee can be violated - 
Fixes #4349.

Signed-off-by: Dmitriy Pavlov <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e2cf5cf9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e2cf5cf9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e2cf5cf9

Branch: refs/heads/master
Commit: e2cf5cf99907825e8e19cf162c24a816b8ffd2df
Parents: 4b4d263
Author: Denis Mekhanikov <[email protected]>
Authored: Thu Jul 19 16:11:38 2018 +0300
Committer: Dmitriy Pavlov <[email protected]>
Committed: Thu Jul 19 16:11:38 2018 +0300

----------------------------------------------------------------------
 .../discovery/CustomMessageWrapper.java         |   2 +-
 .../ignite/spi/discovery/tcp/ServerImpl.java    |  21 +-
 .../TcpDiscoveryPendingMessageDeliveryTest.java | 280 +++++++++++++++++++
 .../IgniteSpiDiscoverySelfTestSuite.java        |   3 +
 4 files changed, 301 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e2cf5cf9/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java
index 4b6b7a2..c7feba3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java
@@ -33,7 +33,7 @@ public class CustomMessageWrapper implements 
DiscoverySpiCustomMessage {
     /**
      * @param delegate Delegate.
      */
-    CustomMessageWrapper(DiscoveryCustomMessage delegate) {
+    public CustomMessageWrapper(DiscoveryCustomMessage delegate) {
         this.delegate = delegate;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e2cf5cf9/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 18a87ec..1bfa467 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -2409,12 +2409,22 @@ class ServerImpl extends TcpDiscoveryImpl {
             msgs.add(new PendingMessage(msg));
 
             while (msgs.size() > MAX) {
-                PendingMessage polled = msgs.poll();
+                PendingMessage queueHead = msgs.peek();
 
-                assert polled != null;
+                assert queueHead != null;
 
-                if (polled.id.equals(discardId))
+                if (queueHead.customMsg && customDiscardId != null) {
+                    if (queueHead.id.equals(customDiscardId))
+                        customDiscardId = null;
+                }
+                else if (!queueHead.customMsg && discardId != null) {
+                    if (queueHead.id.equals(discardId))
+                        discardId = null;
+                }
+                else
                     break;
+
+                msgs.poll();
             }
         }
 
@@ -5453,8 +5463,11 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                         if (sendMessageToRemotes(msg))
                             sendMessageAcrossRing(msg);
-                        else
+                        else {
+                            registerPendingMessage(msg);
+
                             processCustomMessage(msg);
+                        }
                     }
 
                     msg.message(null, msg.messageBytes());

http://git-wip-us.apache.org/repos/asf/ignite/blob/e2cf5cf9/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java
new file mode 100644
index 0000000..9b3dfee
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.util.Set;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.eclipse.jetty.util.ConcurrentHashSet;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class TcpDiscoveryPendingMessageDeliveryTest extends 
GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private volatile boolean blockMsgs;
+
+    /** */
+    private Set<TcpDiscoveryAbstractMessage> receivedEnsuredMsgs;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        blockMsgs = false;
+        receivedEnsuredMsgs = new ConcurrentHashSet<>();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        TcpDiscoverySpi disco;
+
+        if (igniteInstanceName.startsWith("victim"))
+            disco = new DyingDiscoverySpi();
+        else if (igniteInstanceName.startsWith("listener"))
+            disco = new ListeningDiscoverySpi();
+        else
+            disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(IP_FINDER);
+        cfg.setDiscoverySpi(disco);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPendingMessagesOverflow() throws Exception {
+        Ignite coord = startGrid("coordinator");
+        TcpDiscoverySpi coordDisco = 
(TcpDiscoverySpi)coord.configuration().getDiscoverySpi();
+
+        Set<TcpDiscoveryAbstractMessage> sentEnsuredMsgs = new 
ConcurrentHashSet<>();
+        coordDisco.addSendMessageListener(msg -> {
+            if (coordDisco.ensured(msg))
+                sentEnsuredMsgs.add(msg);
+        });
+
+        // Victim doesn't send acknowledges, so we need an intermediate node 
to accept messages,
+        // so the coordinator could mark them as pending.
+        Ignite mediator = startGrid("mediator");
+
+        Ignite victim = startGrid("victim");
+
+        startGrid("listener");
+
+        sentEnsuredMsgs.clear();
+        receivedEnsuredMsgs.clear();
+
+        // Initial custom message will travel across the ring and will be 
discarded.
+        sendDummyCustomMessage(coordDisco, IgniteUuid.randomUuid());
+
+        assertTrue("Sent: " + sentEnsuredMsgs + "; received: " + 
receivedEnsuredMsgs,
+            GridTestUtils.waitForCondition(() -> {
+                log.info("Waiting for messages delivery");
+
+                return receivedEnsuredMsgs.equals(sentEnsuredMsgs);
+            }, 10000));
+
+        blockMsgs = true;
+
+        log.info("Sending dummy custom messages");
+
+        // Non-discarded messages shouldn't be dropped from the queue.
+        int msgsNum = 2000;
+
+        for (int i = 0; i < msgsNum; i++)
+            sendDummyCustomMessage(coordDisco, IgniteUuid.randomUuid());
+
+        mediator.close();
+        victim.close();
+
+        assertTrue("Sent: " + sentEnsuredMsgs + "; received: " + 
receivedEnsuredMsgs,
+            GridTestUtils.waitForCondition(() -> {
+                log.info("Waiting for messages delivery");
+
+                return receivedEnsuredMsgs.equals(sentEnsuredMsgs);
+            }, 10000));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCustomMessageInSingletonCluster() throws Exception {
+        Ignite coord = startGrid("coordinator");
+        TcpDiscoverySpi coordDisco = 
(TcpDiscoverySpi)coord.configuration().getDiscoverySpi();
+
+        Set<TcpDiscoveryAbstractMessage> sentEnsuredMsgs = new 
ConcurrentHashSet<>();
+        coordDisco.addSendMessageListener(msg -> {
+            if (coordDisco.ensured(msg))
+                sentEnsuredMsgs.add(msg);
+        });
+
+        // Custom message on a singleton cluster shouldn't break consistency 
of PendingMessages.
+        sendDummyCustomMessage(coordDisco, IgniteUuid.randomUuid());
+
+        // Victim doesn't send acknowledges, so we need an intermediate node 
to accept messages,
+        // so the coordinator could mark them as pending.
+        Ignite mediator = startGrid("mediator");
+
+        Ignite victim = startGrid("victim");
+
+        startGrid("listener");
+
+        sentEnsuredMsgs.clear();
+        receivedEnsuredMsgs.clear();
+
+        blockMsgs = true;
+
+        log.info("Sending dummy custom messages");
+
+        // Non-discarded messages shouldn't be dropped from the queue.
+        int msgsNum = 100;
+
+        for (int i = 0; i < msgsNum; i++)
+            sendDummyCustomMessage(coordDisco, IgniteUuid.randomUuid());
+
+        mediator.close();
+        victim.close();
+
+        assertTrue("Sent: " + sentEnsuredMsgs + "; received: " + 
receivedEnsuredMsgs,
+            GridTestUtils.waitForCondition(() -> {
+                log.info("Waiting for messages delivery");
+
+                return receivedEnsuredMsgs.equals(sentEnsuredMsgs);
+            }, 10000));
+    }
+
+    /**
+     * @param disco Discovery SPI.
+     * @param id Message id.
+     */
+    private void sendDummyCustomMessage(TcpDiscoverySpi disco, IgniteUuid id) {
+        disco.sendCustomEvent(new CustomMessageWrapper(new 
DummyCustomDiscoveryMessage(id)));
+    }
+
+    /**
+     * Discovery SPI, that makes a node stop sending messages when {@code 
blockMsgs} is set to {@code true}.
+     */
+    private class DyingDiscoverySpi extends TcpDiscoverySpi {
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(Socket sock, 
TcpDiscoveryAbstractMessage msg, byte[] data,
+            long timeout) throws IOException {
+            if (!blockMsgs)
+                super.writeToSocket(sock, msg, data, timeout);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(Socket sock, 
TcpDiscoveryAbstractMessage msg,
+            long timeout) throws IOException, IgniteCheckedException {
+            if (!blockMsgs)
+                super.writeToSocket(sock, msg, timeout);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(Socket sock, OutputStream out, 
TcpDiscoveryAbstractMessage msg,
+            long timeout) throws IOException, IgniteCheckedException {
+            if (!blockMsgs)
+                super.writeToSocket(sock, out, msg, timeout);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(TcpDiscoveryAbstractMessage 
msg, Socket sock, int res,
+            long timeout) throws IOException {
+            if (!blockMsgs)
+                super.writeToSocket(msg, sock, res, timeout);
+        }
+    }
+
+    /**
+     *
+     */
+    private class ListeningDiscoverySpi extends TcpDiscoverySpi {
+        /** {@inheritDoc} */
+        @Override protected void 
startMessageProcess(TcpDiscoveryAbstractMessage msg) {
+            if (ensured(msg))
+                receivedEnsuredMsgs.add(msg);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class DummyCustomDiscoveryMessage implements 
DiscoveryCustomMessage {
+        /** */
+        private final IgniteUuid id;
+
+        /**
+         * @param id Message id.
+         */
+        DummyCustomDiscoveryMessage(IgniteUuid id) {
+            this.id = id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteUuid id() {
+            return id;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public DiscoveryCustomMessage ackMessage() {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isMutable() {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean stopProcess() {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, 
AffinityTopologyVersion topVer,
+            DiscoCache discoCache) {
+            throw new UnsupportedOperationException();
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/ignite/blob/e2cf5cf9/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
index e34200a..4b68af4 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
@@ -35,6 +35,7 @@ import 
org.apache.ignite.spi.discovery.tcp.TcpDiscoveryMultiThreadedTest;
 import 
org.apache.ignite.spi.discovery.tcp.TcpDiscoveryNodeAttributesUpdateOnReconnectTest;
 import 
org.apache.ignite.spi.discovery.tcp.TcpDiscoveryNodeConfigConsistentIdSelfTest;
 import 
org.apache.ignite.spi.discovery.tcp.TcpDiscoveryNodeConsistentIdSelfTest;
+import 
org.apache.ignite.spi.discovery.tcp.TcpDiscoveryPendingMessageDeliveryTest;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryRestartTest;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySegmentationPolicyTest;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySelfTest;
@@ -128,6 +129,8 @@ public class IgniteSpiDiscoverySelfTestSuite extends 
TestSuite {
 
         suite.addTest(new 
TestSuite(FilterDataForClientNodeDiscoveryTest.class));
 
+        suite.addTest(new 
TestSuite(TcpDiscoveryPendingMessageDeliveryTest.class));
+
         return suite;
     }
 }

Reply via email to