Merge remote-tracking branch 'remotes/origin/master' into ignite-3727-2 # Conflicts: # modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/68f2d38e Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/68f2d38e Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/68f2d38e Branch: refs/heads/ignite-3727-2 Commit: 68f2d38e3fda3adedc522d363bb02c2100a9a084 Parents: 91e8340 Author: sboikov <[email protected]> Authored: Tue Feb 14 16:08:26 2017 +0300 Committer: sboikov <[email protected]> Committed: Tue Feb 14 16:08:26 2017 +0300 ---------------------------------------------------------------------- .../java/org/apache/ignite/IgniteMessaging.java | 10 +- .../ignite/internal/IgniteMessagingImpl.java | 2 +- .../managers/communication/GridIoManager.java | 3 +- ...niteMessagingConfigVariationFullApiTest.java | 20 +- .../messaging/IgniteMessagingSendAsyncTest.java | 230 +++++++++++-------- 5 files changed, 164 insertions(+), 101 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/68f2d38e/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java b/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java index b0cbe1d..d769eb2f 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java @@ -77,8 +77,9 @@ public interface IgniteMessaging extends IgniteAsyncSupport { /** * Sends given message with specified topic to the nodes in the underlying cluster group. - * When you invoke method, all listeners who were registered on topic in the local node, will executing in the same thread - * by default, or if you use {@link #withAsync()}, listeners will execute through thread pool, and current thread will not be block. + * When you invoke method, all listeners who were registered on topic in the local node, will executing in + * the same thread by default, or if you use {@link #withAsync()}, listeners will execute + * through thread pool, and current thread will not be block. * * @param topic Topic to send to, {@code null} for default topic. * @param msg Message to send. @@ -89,8 +90,9 @@ public interface IgniteMessaging extends IgniteAsyncSupport { /** * Sends given messages with the specified topic to the nodes in the underlying cluster group. - * When you invoke method, all listeners who were registered on topic in the local node, will executing in the same thread - * by default, or if you use {@link #withAsync()}, listeners will execute through thread pool, and current thread will not be block. + * When you invoke method, all listeners who were registered on topic in the local node, will executing + * in the same thread by default, or if you use {@link #withAsync()}, listeners will execute + * through thread pool, and current thread will not be block. * * @param topic Topic to send to, {@code null} for default topic. * @param msgs Messages to send. Order of the sending is undefined. If the method produces http://git-wip-us.apache.org/repos/asf/ignite/blob/68f2d38e/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java index e586aa2..541fad4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java @@ -137,7 +137,7 @@ public class IgniteMessagingImpl extends AsyncSupportAdapter<IgniteMessaging> if (timeout == 0) timeout = ctx.config().getNetworkTimeout(); - ctx.io().sendUserMessage(snapshot, msg, topic, true, timeout, isAsync()); + ctx.io().sendUserMessage(snapshot, msg, topic, true, timeout, false); } catch (IgniteCheckedException e) { throw U.convertException(e); http://git-wip-us.apache.org/repos/asf/ignite/blob/68f2d38e/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index cda1321..50a4efe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -685,7 +685,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa case UTILITY_CACHE_POOL: case MARSH_CACHE_POOL: case IDX_POOL: - case IGFS_POOL: { + case IGFS_POOL: + { if (msg.isOrdered()) processOrderedMessage(nodeId, msg, plc, msgC); else http://git-wip-us.apache.org/repos/asf/ignite/blob/68f2d38e/modules/core/src/test/java/org/apache/ignite/internal/processors/messaging/IgniteMessagingConfigVariationFullApiTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/messaging/IgniteMessagingConfigVariationFullApiTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/messaging/IgniteMessagingConfigVariationFullApiTest.java index f66535a..c6b46d2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/messaging/IgniteMessagingConfigVariationFullApiTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/messaging/IgniteMessagingConfigVariationFullApiTest.java @@ -289,6 +289,8 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria /** * Single server test. + * + * @param async Async message send flag. * @throws Exception If failed. */ private void localServerInternal(boolean async) throws Exception { @@ -345,6 +347,8 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria /** * Server sends a message and client receives it. + * + * @param async Async message send flag. * @throws Exception If failed. */ private void serverClientMessage(boolean async) throws Exception { @@ -359,6 +363,8 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria /** * Client sends a message and client receives it. + * + * @param async Async message send flag. * @throws Exception If failed. */ private void clientClientMessage(boolean async) throws Exception { @@ -373,7 +379,8 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria /** * Client sends a message and client receives it. - * @param async flag async mode + * + * @param async Async message send flag. * @throws Exception If failed. */ private void clientServerMessage(boolean async) throws Exception { @@ -389,6 +396,7 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria /** * @param ignite Ignite. * @param grp Cluster group. + * @param async Async message send flag. * @throws Exception If fail. */ private void registerListenerAndSendMessages(Ignite ignite, ClusterGroup grp, boolean async) throws Exception { @@ -443,7 +451,7 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria } /** - * + * @param async Async message send flag. * @throws Exception If fail. */ private void orderedMessage(boolean async) throws Exception { @@ -457,7 +465,7 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria } /** - * + * @param async Async message send flag. * @throws Exception If fail. */ private void clientServerOrderedMessage(boolean async) throws Exception { @@ -471,7 +479,7 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria } /** - * + * @param async Async message send flag. * @throws Exception If fail. */ private void clientClientOrderedMessage(boolean async) throws Exception { @@ -485,7 +493,7 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria } /** - * + * @param async Async message send flag. * @throws Exception If fail. */ private void serverClientOrderedMessage(boolean async) throws Exception { @@ -501,6 +509,7 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria /** * @param ignite Ignite. * @param grp Cluster group. + * @param async Async message send flag. * @throws Exception If fail. */ private void registerListenerAndSendOrderedMessages(Ignite ignite, ClusterGroup grp, boolean async) throws Exception { @@ -531,6 +540,7 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria * @param nodeSnd Sender Ignite node. * @param grp Cluster group. * @param msg Message. + * @param async Async message send flag. */ private void sendMessage(Ignite nodeSnd, ClusterGroup grp, Object msg, boolean async) { if (async) http://git-wip-us.apache.org/repos/asf/ignite/blob/68f2d38e/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java b/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java index 73baeba..75e7d22 100644 --- a/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java +++ b/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java @@ -19,14 +19,6 @@ package org.apache.ignite.messaging; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteMessaging; -import org.apache.ignite.lang.IgniteBiInClosure; -import org.apache.ignite.lang.IgniteBiPredicate; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import org.jsr166.ThreadLocalRandom8; -import org.junit.Assert; - import java.io.Serializable; import java.util.List; import java.util.Map; @@ -34,36 +26,54 @@ import java.util.UUID; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteMessaging; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.lang.IgniteBiInClosure; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jsr166.ThreadLocalRandom8; +import org.junit.Assert; /** * */ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest implements Serializable { - /** - * Topic name. - */ + /** */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** Threads number for multi-thread tests. */ + private static final int THREADS = 10; + + /** */ private final String TOPIC = "topic"; - /** - * Message string. - */ + /** */ private final String msgStr = "message"; - /** Count threads for multi-thread test */ - private final int threads = 10; + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); - /** - * {@inheritDoc} - */ - @Override protected void afterTest() throws Exception { - super.afterTest(); + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + return cfg; + } + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { stopAllGrids(); + + super.afterTest(); } /** - * Test for check, that if use default mode, local listeners execute - * in the same thread. 1 node in topology. + * Checks if use default mode, local listeners execute in the same thread, 1 node in topology. + * + * @throws Exception If failed. */ public void testSendDefaultMode() throws Exception { Ignite ignite1 = startGrid(1); @@ -77,8 +87,9 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest impleme } /** - * Test for check, that if use async mode, local listeners execute - * in another thread(through pool). 1 node in topology. + * Checks if use async mode, local listeners execute in another thread, 1 node in topology. + * + * @throws Exception If failed. */ public void testSendAsyncMode() throws Exception { Ignite ignite1 = startGrid(1); @@ -92,14 +103,15 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest impleme } /** - * Test for check, that if use default mode, local listeners execute - * in the same thread. 2 node in topology. + * Checks if use default mode, local listeners execute in the same thread, 2 nodes in topology. + * + * @throws Exception If failed. */ - public void testSendDefaultMode2Node() throws Exception { + public void testSendDefaultMode2Nodes() throws Exception { Ignite ignite1 = startGrid(1); Ignite ignite2 = startGrid(2); - sendWith2Node(ignite2, ignite1.message(), msgStr, new IgniteBiInClosure<String, Thread> () { + sendWith2Nodes(ignite2, ignite1.message(), msgStr, new IgniteBiInClosure<String, Thread> () { @Override public void apply(String msg, Thread thread) { Assert.assertEquals(Thread.currentThread(), thread); Assert.assertEquals(msgStr, msg); @@ -108,14 +120,15 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest impleme } /** - * Test for check, that if use async mode, local listeners execute - * in another thread(through pool). 2 node in topology. + * Checks if use async mode, local listeners execute in another thread, 2 nodes in topology. + * + * @throws Exception If failed. */ public void testSendAsyncMode2Node() throws Exception { Ignite ignite1 = startGrid(1); Ignite ignite2 = startGrid(2); - sendWith2Node(ignite2, ignite1.message().withAsync(), msgStr, new IgniteBiInClosure<String, Thread> () { + sendWith2Nodes(ignite2, ignite1.message().withAsync(), msgStr, new IgniteBiInClosure<String, Thread> () { @Override public void apply(String msg, Thread thread) { Assert.assertTrue(!Thread.currentThread().equals(thread)); Assert.assertEquals(msgStr, msg); @@ -124,12 +137,14 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest impleme } /** - * Test for check, that SendOrdered work in our thread pool. 1 node in topology. + * Checks that sendOrdered works in thread pool, 1 node in topology. + * + * @throws Exception If failed. */ public void testSendOrderedDefaultMode() throws Exception { Ignite ignite1 = startGrid(1); - final List<String> msgs = orderedMsg(); + final List<String> msgs = orderedMessages(); sendOrdered(ignite1.message(), msgs, new IgniteBiInClosure< List<String>, List<Thread>> () { @Override public void apply(List<String> received, List<Thread> threads) { @@ -140,12 +155,14 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest impleme } /** - * Test for check, that SendOrdered work in our thread pool. 1 node in topology. + * Checks that sendOrdered work in thread pool, 1 node in topology. + * + * @throws Exception If failed. */ public void testSendOrderedAsyncMode() throws Exception { Ignite ignite1 = startGrid(1); - final List<String> msgs = orderedMsg(); + final List<String> msgs = orderedMessages(); sendOrdered(ignite1.message().withAsync(), msgs, new IgniteBiInClosure< List<String>, List<Thread>> () { @Override public void apply(List<String> received, List<Thread> threads) { @@ -156,13 +173,15 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest impleme } /** - * Test for check, that SendOrdered work in our thread pool. 2 node in topology. + * Checks that sendOrdered work in thread pool, 2 nodes in topology. + * + * @throws Exception If failed. */ public void testSendOrderedDefaultMode2Node() throws Exception { Ignite ignite1 = startGrid(1); Ignite ignite2 = startGrid(2); - final List<String> msgs = orderedMsg(); + final List<String> msgs = orderedMessages(); sendOrderedWith2Node(ignite2, ignite1.message(), msgs, new IgniteBiInClosure<List<String>, List<Thread>>() { @Override public void apply(List<String> received, List<Thread> threads) { @@ -173,13 +192,15 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest impleme } /** - * Test for check, that SendOrdered work in our thread pool. 2 node in topology. + * Checks that sendOrdered work in thread pool, 2 nodes in topology. + * + * @throws Exception If failed. */ public void testSendOrderedAsyncMode2Node() throws Exception { Ignite ignite1 = startGrid(1); Ignite ignite2 = startGrid(2); - final List<String> msgs = orderedMsg(); + final List<String> msgs = orderedMessages(); sendOrderedWith2Node(ignite2, ignite1.message().withAsync(), msgs, new IgniteBiInClosure<List<String>, List<Thread>>() { @Override public void apply(List<String> received, List<Thread> threads) { @@ -190,6 +211,7 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest impleme } /** + * @throws Exception If failed. */ public void testSendOrderedDefaultModeMultiThreads() throws Exception { Ignite ignite = startGrid(1); @@ -198,6 +220,7 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest impleme } /** + * @throws Exception If failed. */ public void testSendOrderedAsyncModeMultiThreads() throws Exception { Ignite ignite = startGrid(1); @@ -206,6 +229,7 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest impleme } /** + * @throws Exception If failed. */ public void testSendOrderedDefaultModeMultiThreadsWith2Node() throws Exception { Ignite ignite1 = startGrid(1); @@ -215,6 +239,7 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest impleme } /** + * @throws Exception If failed. */ public void testSendOrderedAsyncModeMultiThreadsWith2Node() throws Exception { Ignite ignite1 = startGrid(1); @@ -224,44 +249,45 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest impleme } /** - * @param ignite2 Ignite 2. + * @param ignite2 Second node. * @param ignMsg IgniteMessage. + * @throws Exception If failed. */ private void sendOrderedMultiThreadsWith2Node( final Ignite ignite2, final IgniteMessaging ignMsg - ) throws InterruptedException { + ) throws Exception { final ConcurrentMap<String, List<String>> expMsg = Maps.newConcurrentMap(); final ConcurrentMap<String, List<String>> actlMsg = Maps.newConcurrentMap(); - final List<String> msgs = orderedMsg(); + final List<String> msgs = orderedMessages(); sendOrderedMultiThreadsWith2Node(ignite2, ignMsg, expMsg, actlMsg, msgs); } - /** - * @param ignMsg IgniteMessage. + * @param ignMsg IgniteMessaging. + * @throws Exception If failed. */ private void sendOrderedMultiThreads( final IgniteMessaging ignMsg - ) throws InterruptedException { + ) throws Exception { final ConcurrentMap<String, List<String>> expMsg = Maps.newConcurrentMap(); final ConcurrentMap<String, List<String>> actlMsg = Maps.newConcurrentMap(); - final List<String> msgs = orderedMsg(); + final List<String> msgs = orderedMessages(); sendOrderedMultiThreads(ignMsg, expMsg, actlMsg, msgs); - } /** - * @param ignite2 Ignite 2. + * @param ignite2 Second node. * @param ignMsg Ignite for send message. * @param expMsg Expected messages map. * @param actlMsg Actual message map. - * @param msgs List msgs. + * @param msgs List of messages. + * @throws Exception If failed. */ private void sendOrderedMultiThreadsWith2Node( final Ignite ignite2, @@ -269,16 +295,19 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest impleme final ConcurrentMap<String, List<String>> expMsg, final ConcurrentMap<String, List<String>> actlMsg, final List<String> msgs - ) throws InterruptedException { - final CountDownLatch latch = new CountDownLatch(threads * msgs.size()); + ) throws Exception { + final CountDownLatch latch = new CountDownLatch(THREADS * msgs.size()); final ConcurrentMap<String, List<String>> actlMsgNode2 = Maps.newConcurrentMap(); ignite2.message().localListen(TOPIC, new IgniteBiPredicate<UUID, Message>() { @Override public boolean apply(UUID uuid, Message msg) { actlMsgNode2.putIfAbsent(msg.threadName, Lists.<String>newArrayList()); - actlMsgNode2.get(msg.threadName).add(msg.message); + + actlMsgNode2.get(msg.threadName).add(msg.msg); + latch.countDown(); + return true; } }); @@ -297,34 +326,41 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest impleme * @param ignMsg Ignite for send message. * @param expMsg Expected messages map. * @param actlMsg Actual message map. - * @param msgs List msgs. + * @param msgs List of messages. + * @throws Exception If failed. */ private void sendOrderedMultiThreads( final IgniteMessaging ignMsg, final ConcurrentMap<String, List<String>> expMsg, final ConcurrentMap<String, List<String>> actlMsg, final List<String> msgs - ) throws InterruptedException { - final CountDownLatch latch = new CountDownLatch(threads * msgs.size()); + ) throws Exception { + final CountDownLatch latch = new CountDownLatch(THREADS * msgs.size()); ignMsg.localListen(TOPIC, new IgniteBiPredicate<UUID, Message>() { @Override public boolean apply(UUID uuid, Message msg) { actlMsg.putIfAbsent(msg.threadName, Lists.<String>newArrayList()); - actlMsg.get(msg.threadName).add(msg.message); + + actlMsg.get(msg.threadName).add(msg.msg); + latch.countDown(); + return true; } }); - for (int i = 0; i < threads; i++) + for (int i = 0; i < THREADS; i++) new Thread(new Runnable() { @Override public void run() { String thdName = Thread.currentThread().getName(); + List<String> exp = Lists.newArrayList(); + expMsg.put(thdName, exp); for (String msg : msgs) { exp.add(msg); + ignMsg.sendOrdered(TOPIC, new Message(thdName, msg), 1000); } @@ -340,40 +376,26 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest impleme } /** - */ - private class Message implements Serializable{ - /** Thread name. */ - private String threadName; - /** Message. */ - private String message; - - /** - * @param threadName Thread name. - * @param msg Message. - */ - private Message(String threadName, String msg) { - this.threadName = threadName; - this.message = msg; - } - } - - /** + * @param ignite2 Second node. * @param igniteMsg Ignite message. * @param msgStr Message string. * @param cls Callback for compare result. + * @throws Exception If failed. */ - private void sendWith2Node( + private void sendWith2Nodes( final Ignite ignite2, final IgniteMessaging igniteMsg, final String msgStr, - final IgniteBiInClosure<String,Thread> cls + final IgniteBiInClosure<String, Thread> cls ) throws Exception { final CountDownLatch latch = new CountDownLatch(1); ignite2.message().localListen(TOPIC, new IgniteBiPredicate<UUID, String>() { @Override public boolean apply(UUID uuid, String msg) { Assert.assertEquals(msgStr, msg); + latch.countDown(); + return true; } }); @@ -384,15 +406,16 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest impleme } /** - * @param igniteMsg Ignite message. + * @param igniteMsg Ignite messaging. * @param msgStr Message string. * @param cls Callback for compare result. + * @throws Exception If failed. */ private void send( final IgniteMessaging igniteMsg, final String msgStr, - final IgniteBiInClosure<String,Thread> cls - ) throws InterruptedException { + final IgniteBiInClosure<String, Thread> cls + ) throws Exception { final CountDownLatch latch = new CountDownLatch(1); final AtomicReference<Thread> thread = new AtomicReference<>(); @@ -401,8 +424,11 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest impleme igniteMsg.localListen(TOPIC, new IgniteBiPredicate<UUID, String>() { @Override public boolean apply(UUID uuid, String msgStr) { thread.set(Thread.currentThread()); + val.set(msgStr); + latch.countDown(); + return true; } }); @@ -415,18 +441,18 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest impleme } /** - * @param ignite2 Ignite 2. + * @param ignite2 Second node. * @param igniteMsg Ignite message. * @param msgs messages for send. * @param cls Callback for compare result. + * @throws Exception If failed. */ private void sendOrderedWith2Node( final Ignite ignite2, final IgniteMessaging igniteMsg, final List<String> msgs, - final IgniteBiInClosure<List<String>,List<Thread>> cls + final IgniteBiInClosure<List<String>, List<Thread>> cls ) throws Exception { - final CountDownLatch latch = new CountDownLatch(msgs.size()); final List<String> received = Lists.newArrayList(); @@ -434,7 +460,9 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest impleme ignite2.message().localListen(TOPIC, new IgniteBiPredicate<UUID, String>() { @Override public boolean apply(UUID uuid, String msg) { received.add(msg); + latch.countDown(); + return true; } }); @@ -444,20 +472,19 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest impleme latch.await(); assertTrue(msgs.equals(received)); - } /** * @param igniteMsg Ignite message. * @param msgs messages for send. * @param cls Callback for compare result. + * @throws Exception If failed. */ private<T> void sendOrdered( final IgniteMessaging igniteMsg, final List<T> msgs, final IgniteBiInClosure<List<T>,List<Thread>> cls - ) throws InterruptedException { - + ) throws Exception { final CountDownLatch latch = new CountDownLatch(msgs.size()); final List<T> received = Lists.newArrayList(); @@ -469,8 +496,11 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest impleme igniteMsg.localListen(TOPIC, new IgniteBiPredicate<UUID, T>() { @Override public boolean apply(UUID uuid, T s) { received.add(s); + threads.add(Thread.currentThread()); + latch.countDown(); + return true; } }); @@ -481,14 +511,34 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest impleme } /** - * @return List ordered messages + * @return List of ordered messages */ - private List<String> orderedMsg() { + private List<String> orderedMessages() { final List<String> msgs = Lists.newArrayList(); for (int i = 0; i < 1000; i++) - msgs.add("" + ThreadLocalRandom8.current().nextInt()); + msgs.add(String.valueOf(ThreadLocalRandom8.current().nextInt())); return msgs; } + + /** + * + */ + private static class Message implements Serializable{ + /** Thread name. */ + private final String threadName; + + /** Message. */ + private final String msg; + + /** + * @param threadName Thread name. + * @param msg Message. + */ + private Message(String threadName, String msg) { + this.threadName = threadName; + this.msg = msg; + } + } }
