ignite-3727 added test for sendOrdered + javadoc (behavior sendOrdered)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/de59444b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/de59444b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/de59444b Branch: refs/heads/ignite-3727-2 Commit: de59444b235ae325bb35976dbd0f211ce8dfb69c Parents: 60b030c Author: DmitriyGovorukhin <[email protected]> Authored: Wed Sep 14 11:18:10 2016 +0300 Committer: DmitriyGovorukhin <[email protected]> Committed: Wed Sep 14 11:18:10 2016 +0300 ---------------------------------------------------------------------- .../java/org/apache/ignite/IgniteMessaging.java | 4 +- .../messaging/IgniteMessagingSendAsyncTest.java | 147 ++++++++++++++++++- 2 files changed, 149 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/de59444b/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java b/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java index 00a4fc8..b0cbe1d 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java @@ -103,7 +103,9 @@ public interface IgniteMessaging extends IgniteAsyncSupport { /** * Sends given message with specified topic to the nodes in the underlying cluster group. Messages sent with * this method will arrive in the same order they were sent. Note that if a topic is used - * for ordered messages, then it cannot be reused for non-ordered messages. + * for ordered messages, then it cannot be reused for non-ordered messages. Note if you have local listener + * on this topic, all messages will process through thread pool, and current thread will never be blocked + * when you invoke sendOrdered, no matter which mode you used (default or {@link #withAsync()}). * <p> * The {@code timeout} parameter specifies how long an out-of-order message will stay in a queue, * waiting for messages that are ordered ahead of it to arrive. If timeout expires, then all ordered http://git-wip-us.apache.org/repos/asf/ignite/blob/de59444b/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java b/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java index 66c9ed0..4cfff38 100644 --- a/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java +++ b/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java @@ -17,6 +17,7 @@ package org.apache.ignite.messaging; +import com.google.common.collect.Lists; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteMessaging; import org.apache.ignite.lang.IgniteBiInClosure; @@ -25,6 +26,7 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Assert; import java.io.Serializable; +import java.util.List; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; @@ -65,7 +67,6 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest impleme Assert.assertEquals(msgStr, msg); } }); - } /** @@ -116,6 +117,72 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest impleme } /** + * Test for check, that SendOrdered work in our thread pool. 1 node in topology. + */ + public void testSendOrderedDefaultMode() throws Exception { + Ignite ignite1 = startGrid(1); + + final List<String> msgs = orderedMsg(); + + sendOrdered(ignite1.message(), msgs, new IgniteBiInClosure< List<String>, List<Thread>> () { + @Override public void apply(List<String> received, List<Thread> threads) { + assertFalse(threads.contains(Thread.currentThread())); + assertTrue(msgs.equals(received)); + } + }); + } + + /** + * Test for check, that SendOrdered work in our thread pool. 1 node in topology. + */ + public void testSendOrderedAsyncMode() throws Exception { + Ignite ignite1 = startGrid(1); + + final List<String> msgs = orderedMsg(); + + sendOrdered(ignite1.message().withAsync(), msgs, new IgniteBiInClosure< List<String>, List<Thread>> () { + @Override public void apply(List<String> received, List<Thread> threads) { + assertFalse(threads.contains(Thread.currentThread())); + assertTrue(msgs.equals(received)); + } + }); + } + + /** + * Test for check, that SendOrdered work in our thread pool. 2 node in topology. + */ + public void testSendOrderedDefaultMode2Node() throws Exception { + Ignite ignite1 = startGrid(1); + Ignite ignite2 = startGrid(2); + + final List<String> msgs = orderedMsg(); + + sendOrderedWith2Node(ignite2, ignite1.message(), msgs, new IgniteBiInClosure<List<String>, List<Thread>>() { + @Override public void apply(List<String> received, List<Thread> threads) { + assertFalse(threads.contains(Thread.currentThread())); + assertTrue(msgs.equals(received)); + } + }); + } + + /** + * Test for check, that SendOrdered work in our thread pool. 2 node in topology. + */ + public void testSendOrderedAsyncMode2Node() throws Exception { + Ignite ignite1 = startGrid(1); + Ignite ignite2 = startGrid(2); + + final List<String> msgs = orderedMsg(); + + sendOrderedWith2Node(ignite2, ignite1.message().withAsync(), msgs, new IgniteBiInClosure<List<String>, List<Thread>>() { + @Override public void apply(List<String> received, List<Thread> threads) { + assertFalse(threads.contains(Thread.currentThread())); + assertTrue(msgs.equals(received)); + } + }); + } + + /** * @param igniteMsg Ignite message. * @param msgStr Message string. * @param cls Callback for compare result. @@ -171,4 +238,82 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest impleme cls.apply(val.get(), thread.get()); } + + /** + * @param ignite2 Ignite 2. + * @param igniteMsg Ignite message. + * @param msgs messages for send. + * @param cls Callback for compare result. + */ + private void sendOrderedWith2Node( + final Ignite ignite2, + final IgniteMessaging igniteMsg, + final List<String> msgs, + final IgniteBiInClosure<List<String>,List<Thread>> cls + ) throws Exception { + + final CountDownLatch latch = new CountDownLatch(msgs.size()); + + final List<String> received = Lists.newArrayList(); + + ignite2.message().localListen(TOPIC, new IgniteBiPredicate<UUID, String>() { + @Override public boolean apply(UUID uuid, String msg) { + received.add(msg); + latch.countDown(); + return true; + } + }); + + sendOrdered(igniteMsg, msgs, cls); + + latch.await(); + + assertTrue(msgs.equals(received)); + + } + + /** + * @param igniteMsg Ignite message. + * @param msgs messages for send. + * @param cls Callback for compare result. + */ + private void sendOrdered( + final IgniteMessaging igniteMsg, + final List<String> msgs, + final IgniteBiInClosure<List<String>,List<Thread>> cls + ) throws InterruptedException { + + final CountDownLatch latch = new CountDownLatch(msgs.size()); + + final List<String> received = Lists.newArrayList(); + final List<Thread> threads = Lists.newArrayList(); + + for (String msg : msgs) + igniteMsg.sendOrdered(TOPIC, msg, 1000); + + igniteMsg.localListen(TOPIC, new IgniteBiPredicate<UUID, String>() { + @Override public boolean apply(UUID uuid, String s) { + received.add(s); + threads.add(Thread.currentThread()); + latch.countDown(); + return true; + } + }); + + latch.await(); + + cls.apply(received, threads); + } + + /** + * @return List ordered messages + */ + private List<String> orderedMsg() { + final List<String> msgs = Lists.newArrayList(); + + for (int i = 0; i < 1000; i++) + msgs.add("" + i); + + return msgs; + } }
