Repository: ignite Updated Branches: refs/heads/ignite-3727-2 [created] 68f2d38e3
IGNITE-3727: added ability intercept "stop listener" message, check future status after invoke stopRemoteListen. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4e6605f1 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4e6605f1 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4e6605f1 Branch: refs/heads/ignite-3727-2 Commit: 4e6605f1691a43b2ba57da3d4eef98e6dd460a43 Parents: 2d5d5bc Author: DmitriyGovorukhin <[email protected]> Authored: Tue Aug 30 18:38:13 2016 +0300 Committer: DmitriyGovorukhin <[email protected]> Committed: Tue Aug 30 18:38:13 2016 +0300 ---------------------------------------------------------------------- .../ignite/messaging/GridMessagingSelfTest.java | 101 +++++++++++++++++-- 1 file changed, 92 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/4e6605f1/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java index e796eb5..2e2afd4 100644 --- a/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java @@ -24,27 +24,26 @@ import java.io.ObjectOutput; import java.io.Serializable; import java.net.URL; import java.net.URLClassLoader; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.UUID; +import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteMessaging; + +import org.apache.ignite.*; import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.managers.discovery.*; +import org.apache.ignite.internal.processors.continuous.*; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.typedef.P2; import org.apache.ignite.internal.util.typedef.PA; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.spi.discovery.*; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; @@ -198,7 +197,7 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); - TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + TestTcpDiscoverySpi discoSpi = new TestTcpDiscoverySpi(); discoSpi.setIpFinder(ipFinder); @@ -944,7 +943,7 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser * @throws Exception If error occurs. */ public void testSendMessageWithExternalClassLoader() throws Exception { - URL[] urls = new URL[] { new URL(GridTestProperties.getProperty("p2p.uri.cls")) }; + URL[] urls = new URL[] {new URL(GridTestProperties.getProperty("p2p.uri.cls"))}; ClassLoader extLdr = new URLClassLoader(urls); @@ -1028,6 +1027,8 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser public void testAsync() throws Exception { final AtomicInteger msgCnt = new AtomicInteger(); + TestTcpDiscoverySpi discoSpi = (TestTcpDiscoverySpi)ignite2.configuration().getDiscoverySpi(); + assertFalse(ignite2.message().isAsync()); final IgniteMessaging msg = ignite2.message().withAsync(); @@ -1085,6 +1086,8 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser assertEquals(1, msgCnt.get()); + discoSpi.blockCustomEvent(); + msg.stopRemoteListen(id); IgniteFuture<?> stopFut = msg.future(); @@ -1099,8 +1102,14 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser } }, IllegalStateException.class, null); + Assert.assertFalse(stopFut.isDone()); + + discoSpi.stopBlock(); + stopFut.get(); + Assert.assertTrue(stopFut.isDone()); + message(ignite1.cluster().forRemotes()).send(topic, "msg2"); U.sleep(1000); @@ -1109,6 +1118,80 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser } /** + * + */ + static class TestTcpDiscoverySpi extends TcpDiscoverySpi { + /** */ + private boolean blockCustomEvt; + + /** */ + private final Object mux = new Object(); + + /** */ + private List<DiscoverySpiCustomMessage> blockedMsgs = new ArrayList<>(); + + /** {@inheritDoc} */ + @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException { + synchronized (mux) { + if (blockCustomEvt) { + DiscoveryCustomMessage msg0 = GridTestUtils.getFieldValue(msg, "delegate"); + if (msg0 instanceof StopRoutineDiscoveryMessage) { + log.info("Block custom message: " + msg0); + blockedMsgs.add(msg); + + mux.notifyAll(); + } + return; + } + } + + super.sendCustomEvent(msg); + } + + /** + * + */ + public void blockCustomEvent() { + synchronized (mux) { + assert blockedMsgs.isEmpty() : blockedMsgs; + + blockCustomEvt = true; + } + } + + /** + * @throws InterruptedException If interrupted. + */ + public void waitCustomEvent() throws InterruptedException { + synchronized (mux) { + while (blockedMsgs.isEmpty()) + mux.wait(); + } + } + + /** + * + */ + public void stopBlock() { + List<DiscoverySpiCustomMessage> msgs; + + synchronized (this) { + msgs = new ArrayList<>(blockedMsgs); + + blockCustomEvt = false; + + blockedMsgs.clear(); + } + + for (DiscoverySpiCustomMessage msg : msgs) { + log.info("Resend blocked message: " + msg); + + super.sendCustomEvent(msg); + } + } + } + + /** * Tests that message listener registers only for one oldest node. * * @throws Exception If an error occurred.
