Repository: ignite Updated Branches: refs/heads/master 4b4d263ea -> e2cf5cf99
IGNITE-8922 Fix for discovery message delivery guarantee can be violated - Fixes #4349. Signed-off-by: Dmitriy Pavlov <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e2cf5cf9 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e2cf5cf9 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e2cf5cf9 Branch: refs/heads/master Commit: e2cf5cf99907825e8e19cf162c24a816b8ffd2df Parents: 4b4d263 Author: Denis Mekhanikov <[email protected]> Authored: Thu Jul 19 16:11:38 2018 +0300 Committer: Dmitriy Pavlov <[email protected]> Committed: Thu Jul 19 16:11:38 2018 +0300 ---------------------------------------------------------------------- .../discovery/CustomMessageWrapper.java | 2 +- .../ignite/spi/discovery/tcp/ServerImpl.java | 21 +- .../TcpDiscoveryPendingMessageDeliveryTest.java | 280 +++++++++++++++++++ .../IgniteSpiDiscoverySelfTestSuite.java | 3 + 4 files changed, 301 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/e2cf5cf9/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java index 4b6b7a2..c7feba3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java @@ -33,7 +33,7 @@ public class CustomMessageWrapper implements DiscoverySpiCustomMessage { /** * @param delegate Delegate. */ - CustomMessageWrapper(DiscoveryCustomMessage delegate) { + public CustomMessageWrapper(DiscoveryCustomMessage delegate) { this.delegate = delegate; } http://git-wip-us.apache.org/repos/asf/ignite/blob/e2cf5cf9/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 18a87ec..1bfa467 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -2409,12 +2409,22 @@ class ServerImpl extends TcpDiscoveryImpl { msgs.add(new PendingMessage(msg)); while (msgs.size() > MAX) { - PendingMessage polled = msgs.poll(); + PendingMessage queueHead = msgs.peek(); - assert polled != null; + assert queueHead != null; - if (polled.id.equals(discardId)) + if (queueHead.customMsg && customDiscardId != null) { + if (queueHead.id.equals(customDiscardId)) + customDiscardId = null; + } + else if (!queueHead.customMsg && discardId != null) { + if (queueHead.id.equals(discardId)) + discardId = null; + } + else break; + + msgs.poll(); } } @@ -5453,8 +5463,11 @@ class ServerImpl extends TcpDiscoveryImpl { if (sendMessageToRemotes(msg)) sendMessageAcrossRing(msg); - else + else { + registerPendingMessage(msg); + processCustomMessage(msg); + } } msg.message(null, msg.messageBytes()); http://git-wip-us.apache.org/repos/asf/ignite/blob/e2cf5cf9/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java new file mode 100644 index 0000000..9b3dfee --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.Socket; +import java.util.Set; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper; +import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.eclipse.jetty.util.ConcurrentHashSet; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public class TcpDiscoveryPendingMessageDeliveryTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private volatile boolean blockMsgs; + + /** */ + private Set<TcpDiscoveryAbstractMessage> receivedEnsuredMsgs; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + blockMsgs = false; + receivedEnsuredMsgs = new ConcurrentHashSet<>(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + TcpDiscoverySpi disco; + + if (igniteInstanceName.startsWith("victim")) + disco = new DyingDiscoverySpi(); + else if (igniteInstanceName.startsWith("listener")) + disco = new ListeningDiscoverySpi(); + else + disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + cfg.setDiscoverySpi(disco); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testPendingMessagesOverflow() throws Exception { + Ignite coord = startGrid("coordinator"); + TcpDiscoverySpi coordDisco = (TcpDiscoverySpi)coord.configuration().getDiscoverySpi(); + + Set<TcpDiscoveryAbstractMessage> sentEnsuredMsgs = new ConcurrentHashSet<>(); + coordDisco.addSendMessageListener(msg -> { + if (coordDisco.ensured(msg)) + sentEnsuredMsgs.add(msg); + }); + + // Victim doesn't send acknowledges, so we need an intermediate node to accept messages, + // so the coordinator could mark them as pending. + Ignite mediator = startGrid("mediator"); + + Ignite victim = startGrid("victim"); + + startGrid("listener"); + + sentEnsuredMsgs.clear(); + receivedEnsuredMsgs.clear(); + + // Initial custom message will travel across the ring and will be discarded. + sendDummyCustomMessage(coordDisco, IgniteUuid.randomUuid()); + + assertTrue("Sent: " + sentEnsuredMsgs + "; received: " + receivedEnsuredMsgs, + GridTestUtils.waitForCondition(() -> { + log.info("Waiting for messages delivery"); + + return receivedEnsuredMsgs.equals(sentEnsuredMsgs); + }, 10000)); + + blockMsgs = true; + + log.info("Sending dummy custom messages"); + + // Non-discarded messages shouldn't be dropped from the queue. + int msgsNum = 2000; + + for (int i = 0; i < msgsNum; i++) + sendDummyCustomMessage(coordDisco, IgniteUuid.randomUuid()); + + mediator.close(); + victim.close(); + + assertTrue("Sent: " + sentEnsuredMsgs + "; received: " + receivedEnsuredMsgs, + GridTestUtils.waitForCondition(() -> { + log.info("Waiting for messages delivery"); + + return receivedEnsuredMsgs.equals(sentEnsuredMsgs); + }, 10000)); + } + + /** + * @throws Exception If failed. + */ + public void testCustomMessageInSingletonCluster() throws Exception { + Ignite coord = startGrid("coordinator"); + TcpDiscoverySpi coordDisco = (TcpDiscoverySpi)coord.configuration().getDiscoverySpi(); + + Set<TcpDiscoveryAbstractMessage> sentEnsuredMsgs = new ConcurrentHashSet<>(); + coordDisco.addSendMessageListener(msg -> { + if (coordDisco.ensured(msg)) + sentEnsuredMsgs.add(msg); + }); + + // Custom message on a singleton cluster shouldn't break consistency of PendingMessages. + sendDummyCustomMessage(coordDisco, IgniteUuid.randomUuid()); + + // Victim doesn't send acknowledges, so we need an intermediate node to accept messages, + // so the coordinator could mark them as pending. + Ignite mediator = startGrid("mediator"); + + Ignite victim = startGrid("victim"); + + startGrid("listener"); + + sentEnsuredMsgs.clear(); + receivedEnsuredMsgs.clear(); + + blockMsgs = true; + + log.info("Sending dummy custom messages"); + + // Non-discarded messages shouldn't be dropped from the queue. + int msgsNum = 100; + + for (int i = 0; i < msgsNum; i++) + sendDummyCustomMessage(coordDisco, IgniteUuid.randomUuid()); + + mediator.close(); + victim.close(); + + assertTrue("Sent: " + sentEnsuredMsgs + "; received: " + receivedEnsuredMsgs, + GridTestUtils.waitForCondition(() -> { + log.info("Waiting for messages delivery"); + + return receivedEnsuredMsgs.equals(sentEnsuredMsgs); + }, 10000)); + } + + /** + * @param disco Discovery SPI. + * @param id Message id. + */ + private void sendDummyCustomMessage(TcpDiscoverySpi disco, IgniteUuid id) { + disco.sendCustomEvent(new CustomMessageWrapper(new DummyCustomDiscoveryMessage(id))); + } + + /** + * Discovery SPI, that makes a node stop sending messages when {@code blockMsgs} is set to {@code true}. + */ + private class DyingDiscoverySpi extends TcpDiscoverySpi { + /** {@inheritDoc} */ + @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, byte[] data, + long timeout) throws IOException { + if (!blockMsgs) + super.writeToSocket(sock, msg, data, timeout); + } + + /** {@inheritDoc} */ + @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, + long timeout) throws IOException, IgniteCheckedException { + if (!blockMsgs) + super.writeToSocket(sock, msg, timeout); + } + + /** {@inheritDoc} */ + @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, + long timeout) throws IOException, IgniteCheckedException { + if (!blockMsgs) + super.writeToSocket(sock, out, msg, timeout); + } + + /** {@inheritDoc} */ + @Override protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res, + long timeout) throws IOException { + if (!blockMsgs) + super.writeToSocket(msg, sock, res, timeout); + } + } + + /** + * + */ + private class ListeningDiscoverySpi extends TcpDiscoverySpi { + /** {@inheritDoc} */ + @Override protected void startMessageProcess(TcpDiscoveryAbstractMessage msg) { + if (ensured(msg)) + receivedEnsuredMsgs.add(msg); + } + } + + /** + * + */ + private static class DummyCustomDiscoveryMessage implements DiscoveryCustomMessage { + /** */ + private final IgniteUuid id; + + /** + * @param id Message id. + */ + DummyCustomDiscoveryMessage(IgniteUuid id) { + this.id = id; + } + + /** {@inheritDoc} */ + @Override public IgniteUuid id() { + return id; + } + + /** {@inheritDoc} */ + @Nullable @Override public DiscoveryCustomMessage ackMessage() { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean isMutable() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean stopProcess() { + return false; + } + + /** {@inheritDoc} */ + @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, AffinityTopologyVersion topVer, + DiscoCache discoCache) { + throw new UnsupportedOperationException(); + } + } +} + http://git-wip-us.apache.org/repos/asf/ignite/blob/e2cf5cf9/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java index e34200a..4b68af4 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java @@ -35,6 +35,7 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryMultiThreadedTest; import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryNodeAttributesUpdateOnReconnectTest; import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryNodeConfigConsistentIdSelfTest; import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryNodeConsistentIdSelfTest; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryPendingMessageDeliveryTest; import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryRestartTest; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySegmentationPolicyTest; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySelfTest; @@ -128,6 +129,8 @@ public class IgniteSpiDiscoverySelfTestSuite extends TestSuite { suite.addTest(new TestSuite(FilterDataForClientNodeDiscoveryTest.class)); + suite.addTest(new TestSuite(TcpDiscoveryPendingMessageDeliveryTest.class)); + return suite; } }
