ignite-3727 added test for multi-thread sendOrdered
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c9ed5c65 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c9ed5c65 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c9ed5c65 Branch: refs/heads/ignite-3727-2 Commit: c9ed5c6583e803110a34fc0126ae4e5af56331b1 Parents: de59444 Author: DmitriyGovorukhin <[email protected]> Authored: Wed Sep 14 12:58:20 2016 +0300 Committer: DmitriyGovorukhin <[email protected]> Committed: Wed Sep 14 12:58:20 2016 +0300 ---------------------------------------------------------------------- .../messaging/IgniteMessagingSendAsyncTest.java | 191 ++++++++++++++++++- 1 file changed, 183 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/c9ed5c65/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java b/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java index 4cfff38..73baeba 100644 --- a/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java +++ b/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java @@ -18,16 +18,20 @@ package org.apache.ignite.messaging; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteMessaging; import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jsr166.ThreadLocalRandom8; import org.junit.Assert; import java.io.Serializable; import java.util.List; +import java.util.Map; import java.util.UUID; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; @@ -45,6 +49,9 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest impleme */ private final String msgStr = "message"; + /** Count threads for multi-thread test */ + private final int threads = 10; + /** * {@inheritDoc} */ @@ -183,6 +190,174 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest impleme } /** + */ + public void testSendOrderedDefaultModeMultiThreads() throws Exception { + Ignite ignite = startGrid(1); + + sendOrderedMultiThreads(ignite.message()); + } + + /** + */ + public void testSendOrderedAsyncModeMultiThreads() throws Exception { + Ignite ignite = startGrid(1); + + sendOrderedMultiThreads(ignite.message().withAsync()); + } + + /** + */ + public void testSendOrderedDefaultModeMultiThreadsWith2Node() throws Exception { + Ignite ignite1 = startGrid(1); + Ignite ignite2 = startGrid(2); + + sendOrderedMultiThreadsWith2Node(ignite2, ignite1.message()); + } + + /** + */ + public void testSendOrderedAsyncModeMultiThreadsWith2Node() throws Exception { + Ignite ignite1 = startGrid(1); + Ignite ignite2 = startGrid(2); + + sendOrderedMultiThreadsWith2Node(ignite2, ignite1.message().withAsync()); + } + + /** + * @param ignite2 Ignite 2. + * @param ignMsg IgniteMessage. + */ + private void sendOrderedMultiThreadsWith2Node( + final Ignite ignite2, + final IgniteMessaging ignMsg + ) throws InterruptedException { + final ConcurrentMap<String, List<String>> expMsg = Maps.newConcurrentMap(); + final ConcurrentMap<String, List<String>> actlMsg = Maps.newConcurrentMap(); + + final List<String> msgs = orderedMsg(); + + sendOrderedMultiThreadsWith2Node(ignite2, ignMsg, expMsg, actlMsg, msgs); + + } + + + /** + * @param ignMsg IgniteMessage. + */ + private void sendOrderedMultiThreads( + final IgniteMessaging ignMsg + ) throws InterruptedException { + final ConcurrentMap<String, List<String>> expMsg = Maps.newConcurrentMap(); + final ConcurrentMap<String, List<String>> actlMsg = Maps.newConcurrentMap(); + + final List<String> msgs = orderedMsg(); + + sendOrderedMultiThreads(ignMsg, expMsg, actlMsg, msgs); + + } + + /** + * @param ignite2 Ignite 2. + * @param ignMsg Ignite for send message. + * @param expMsg Expected messages map. + * @param actlMsg Actual message map. + * @param msgs List msgs. + */ + private void sendOrderedMultiThreadsWith2Node( + final Ignite ignite2, + final IgniteMessaging ignMsg, + final ConcurrentMap<String, List<String>> expMsg, + final ConcurrentMap<String, List<String>> actlMsg, + final List<String> msgs + ) throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(threads * msgs.size()); + + final ConcurrentMap<String, List<String>> actlMsgNode2 = Maps.newConcurrentMap(); + + ignite2.message().localListen(TOPIC, new IgniteBiPredicate<UUID, Message>() { + @Override public boolean apply(UUID uuid, Message msg) { + actlMsgNode2.putIfAbsent(msg.threadName, Lists.<String>newArrayList()); + actlMsgNode2.get(msg.threadName).add(msg.message); + latch.countDown(); + return true; + } + }); + + sendOrderedMultiThreads(ignMsg, expMsg, actlMsg, msgs); + + latch.await(); + + assertEquals(expMsg.size(), actlMsgNode2.size()); + + for (Map.Entry<String, List<String>> entry : expMsg.entrySet()) + assertTrue(actlMsgNode2.get(entry.getKey()).equals(entry.getValue())); + } + + /** + * @param ignMsg Ignite for send message. + * @param expMsg Expected messages map. + * @param actlMsg Actual message map. + * @param msgs List msgs. + */ + private void sendOrderedMultiThreads( + final IgniteMessaging ignMsg, + final ConcurrentMap<String, List<String>> expMsg, + final ConcurrentMap<String, List<String>> actlMsg, + final List<String> msgs + ) throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(threads * msgs.size()); + + ignMsg.localListen(TOPIC, new IgniteBiPredicate<UUID, Message>() { + @Override public boolean apply(UUID uuid, Message msg) { + actlMsg.putIfAbsent(msg.threadName, Lists.<String>newArrayList()); + actlMsg.get(msg.threadName).add(msg.message); + latch.countDown(); + return true; + } + }); + + for (int i = 0; i < threads; i++) + new Thread(new Runnable() { + @Override public void run() { + String thdName = Thread.currentThread().getName(); + List<String> exp = Lists.newArrayList(); + expMsg.put(thdName, exp); + + for (String msg : msgs) { + exp.add(msg); + ignMsg.sendOrdered(TOPIC, new Message(thdName, msg), 1000); + } + + } + }).start(); + + latch.await(); + + assertEquals(expMsg.size(), actlMsg.size()); + + for (Map.Entry<String, List<String>> entry : expMsg.entrySet()) + assertTrue(actlMsg.get(entry.getKey()).equals(entry.getValue())); + } + + /** + */ + private class Message implements Serializable{ + /** Thread name. */ + private String threadName; + /** Message. */ + private String message; + + /** + * @param threadName Thread name. + * @param msg Message. + */ + private Message(String threadName, String msg) { + this.threadName = threadName; + this.message = msg; + } + } + + /** * @param igniteMsg Ignite message. * @param msgStr Message string. * @param cls Callback for compare result. @@ -277,22 +452,22 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest impleme * @param msgs messages for send. * @param cls Callback for compare result. */ - private void sendOrdered( + private<T> void sendOrdered( final IgniteMessaging igniteMsg, - final List<String> msgs, - final IgniteBiInClosure<List<String>,List<Thread>> cls + final List<T> msgs, + final IgniteBiInClosure<List<T>,List<Thread>> cls ) throws InterruptedException { final CountDownLatch latch = new CountDownLatch(msgs.size()); - final List<String> received = Lists.newArrayList(); + final List<T> received = Lists.newArrayList(); final List<Thread> threads = Lists.newArrayList(); - for (String msg : msgs) + for (T msg : msgs) igniteMsg.sendOrdered(TOPIC, msg, 1000); - igniteMsg.localListen(TOPIC, new IgniteBiPredicate<UUID, String>() { - @Override public boolean apply(UUID uuid, String s) { + igniteMsg.localListen(TOPIC, new IgniteBiPredicate<UUID, T>() { + @Override public boolean apply(UUID uuid, T s) { received.add(s); threads.add(Thread.currentThread()); latch.countDown(); @@ -312,7 +487,7 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest impleme final List<String> msgs = Lists.newArrayList(); for (int i = 0; i < 1000; i++) - msgs.add("" + i); + msgs.add("" + ThreadLocalRandom8.current().nextInt()); return msgs; }
