IGNITE-3727 supported sync/async execution for local listener
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a6980abb Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a6980abb Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a6980abb Branch: refs/heads/ignite-3727-2 Commit: a6980abbf5d8ff80ed8a83f479729ef418dfa7c9 Parents: ecd3d93 Author: DmitriyGovorukhin <[email protected]> Authored: Thu Sep 8 20:02:43 2016 +0300 Committer: DmitriyGovorukhin <[email protected]> Committed: Thu Sep 8 20:02:43 2016 +0300 ---------------------------------------------------------------------- .../ignite/internal/IgniteMessagingImpl.java | 6 +- .../internal/managers/GridManagerAdapter.java | 2 +- .../managers/communication/GridIoManager.java | 40 +++-- .../communication/GridIoManagerSelfTest.java | 6 +- ...niteMessagingConfigVariationFullApiTest.java | 177 +++++++++++++++---- 5 files changed, 175 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a6980abb/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java index 2800777..e586aa2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java @@ -86,7 +86,7 @@ public class IgniteMessagingImpl extends AsyncSupportAdapter<IgniteMessaging> if (snapshot.isEmpty()) throw U.emptyTopologyException(); - ctx.io().sendUserMessage(snapshot, msg, topic, false, 0); + ctx.io().sendUserMessage(snapshot, msg, topic, false, 0, isAsync()); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -111,7 +111,7 @@ public class IgniteMessagingImpl extends AsyncSupportAdapter<IgniteMessaging> for (Object msg : msgs) { A.notNull(msg, "msg"); - ctx.io().sendUserMessage(snapshot, msg, topic, false, 0); + ctx.io().sendUserMessage(snapshot, msg, topic, false, 0, isAsync()); } } catch (IgniteCheckedException e) { @@ -137,7 +137,7 @@ public class IgniteMessagingImpl extends AsyncSupportAdapter<IgniteMessaging> if (timeout == 0) timeout = ctx.config().getNetworkTimeout(); - ctx.io().sendUserMessage(snapshot, msg, topic, true, timeout); + ctx.io().sendUserMessage(snapshot, msg, topic, true, timeout, isAsync()); } catch (IgniteCheckedException e) { throw U.convertException(e); http://git-wip-us.apache.org/repos/asf/ignite/blob/a6980abb/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java index 4fe8ca8..43510f6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java @@ -389,7 +389,7 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan if (msg instanceof Message) ctx.io().send(node, topic, (Message)msg, SYSTEM_POOL); else - ctx.io().sendUserMessage(Collections.singletonList(node), msg, topic, false, 0); + ctx.io().sendUserMessage(Collections.singletonList(node), msg, topic, false, 0, false); } catch (IgniteCheckedException e) { throw unwrapException(e); http://git-wip-us.apache.org/repos/asf/ignite/blob/a6980abb/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 9d40bf9..124fba5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -1271,7 +1271,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa boolean ordered, long timeout, boolean skipOnTimeout, - IgniteInClosure<IgniteException> ackC + IgniteInClosure<IgniteException> ackC, + boolean async ) throws IgniteCheckedException { assert node != null; assert topic != null; @@ -1289,8 +1290,10 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (ordered) processOrderedMessage(locNodeId, ioMsg, plc, null); - else + else if (async) processRegularMessage(locNodeId, ioMsg, plc, null); + else + processRegularMessage0(ioMsg, locNodeId); if (ackC != null) ackC.apply(null); @@ -1346,7 +1349,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (node == null) throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId); - send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null); + send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null, false); } /** @@ -1358,7 +1361,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa */ public void send(ClusterNode node, Object topic, Message msg, byte plc) throws IgniteCheckedException { - send(node, topic, -1, msg, plc, false, 0, false, null); + send(node, topic, -1, msg, plc, false, 0, false, null, false); } /** @@ -1368,9 +1371,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * @param plc Type of processing. * @throws IgniteCheckedException Thrown in case of any errors. */ - public void send(ClusterNode node, GridTopic topic, Message msg, byte plc) + public void send(ClusterNode node, GridTopic topic, Message msg, byte plc, boolean async) throws IgniteCheckedException { - send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null); + send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null, false); } /** @@ -1383,7 +1386,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa */ public void send(ClusterNode node, Object topic, int topicOrd, Message msg, byte plc) throws IgniteCheckedException { - send(node, topic, topicOrd, msg, plc, false, 0, false, null); + send(node, topic, topicOrd, msg, plc, false, 0, false, null, false); } /** @@ -1405,7 +1408,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa ) throws IgniteCheckedException { assert timeout > 0 || skipOnTimeout; - send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null); + send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null, false); } /** @@ -1432,7 +1435,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (node == null) throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId); - send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null); + send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null, false); } /** @@ -1445,7 +1448,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa */ public void send(ClusterNode node, GridTopic topic, Message msg, byte plc, IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException { - send(node, topic, topic.ordinal(), msg, plc, false, 0, false, ackC); + send(node, topic, topic.ordinal(), msg, plc, false, 0, false, ackC, false); } /** @@ -1481,7 +1484,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa */ public void send(ClusterNode node, Object topic, Message msg, byte plc, IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException { - send(node, topic, -1, msg, plc, false, 0, false, ackC); + send(node, topic, -1, msg, plc, false, 0, false, ackC, false); } /** @@ -1537,7 +1540,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa ) throws IgniteCheckedException { assert timeout > 0 || skipOnTimeout; - send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackC); + send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackC, false); } /** @@ -1548,7 +1551,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * @throws IgniteCheckedException Thrown in case of any errors. */ public void sendUserMessage(Collection<? extends ClusterNode> nodes, Object msg) throws IgniteCheckedException { - sendUserMessage(nodes, msg, null, false, 0); + sendUserMessage(nodes, msg, null, false, 0, false); } /** @@ -1563,7 +1566,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa */ @SuppressWarnings("ConstantConditions") public void sendUserMessage(Collection<? extends ClusterNode> nodes, Object msg, - @Nullable Object topic, boolean ordered, long timeout) throws IgniteCheckedException { + @Nullable Object topic, boolean ordered, long timeout, boolean async) throws IgniteCheckedException { boolean loc = nodes.size() == 1 && F.first(nodes).id().equals(locNodeId); byte[] serMsg = null; @@ -1617,10 +1620,11 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (!rmtNodes.isEmpty()) send(rmtNodes, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL); - // Will call local listeners in current thread synchronously, so must go the last + // Will call local listeners in current thread synchronously or through pool, + // depending async flag, so must go the last // to allow remote nodes execute the requested operation in parallel. if (locNode != null) - send(locNode, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL); + send(locNode, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL, async); } } @@ -1687,7 +1691,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (node == null) throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId); - send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackC); + send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackC, false); } /** @@ -1724,7 +1728,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa // messages to one node vs. many. if (!nodes.isEmpty()) { for (ClusterNode node : nodes) - send(node, topic, topicOrd, msg, plc, ordered, timeout, skipOnTimeout, null); + send(node, topic, topicOrd, msg, plc, ordered, timeout, skipOnTimeout, null, false); } else if (log.isDebugEnabled()) log.debug("Failed to send message to empty nodes collection [topic=" + topic + ", msg=" + http://git-wip-us.apache.org/repos/asf/ignite/blob/a6980abb/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java index c2cfa76..f5499d3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java @@ -145,7 +145,7 @@ public class GridIoManagerSelfTest extends GridCommonAbstractTest { GridIoManager ioMgr = spy(new TestGridIoManager(ctx)); try { - ioMgr.sendUserMessage(F.asList(locNode, rmtNode), msg, GridTopic.TOPIC_IGFS, false, 123L); + ioMgr.sendUserMessage(F.asList(locNode, rmtNode), msg, GridTopic.TOPIC_IGFS, false, 123L, false); } catch (IgniteCheckedException ignored) { // No-op. We are using mocks so real sending is impossible. @@ -169,7 +169,7 @@ public class GridIoManagerSelfTest extends GridCommonAbstractTest { GridIoManager ioMgr = spy(new TestGridIoManager(ctx)); try { - ioMgr.sendUserMessage(F.asList(locNode, rmtNode), msg, GridTopic.TOPIC_IGFS, true, 123L); + ioMgr.sendUserMessage(F.asList(locNode, rmtNode), msg, GridTopic.TOPIC_IGFS, true, 123L, false); } catch (Exception ignored) { // No-op. We are using mocks so real sending is impossible. @@ -196,7 +196,7 @@ public class GridIoManagerSelfTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ - @Override public void send(ClusterNode node, GridTopic topic, Message msg, byte plc) + @Override public void send(ClusterNode node, GridTopic topic, Message msg, byte plc, boolean async) throws IgniteCheckedException { // No-op. } http://git-wip-us.apache.org/repos/asf/ignite/blob/a6980abb/modules/core/src/test/java/org/apache/ignite/internal/processors/messaging/IgniteMessagingConfigVariationFullApiTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/messaging/IgniteMessagingConfigVariationFullApiTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/messaging/IgniteMessagingConfigVariationFullApiTest.java index 31b0663..fe85970 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/messaging/IgniteMessagingConfigVariationFullApiTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/messaging/IgniteMessagingConfigVariationFullApiTest.java @@ -58,7 +58,18 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria public void testLocalServer() throws Exception { runInAllDataModes(new TestRunnable() { @Override public void run() throws Exception { - localServerInternal(); + localServerInternal(false); + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testLocalServerAsync() throws Exception { + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + localServerInternal(true); } }); } @@ -83,7 +94,21 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria runInAllDataModes(new TestRunnable() { @Override public void run() throws Exception { - serverClientMessage(); + serverClientMessage(false); + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testServerClientMessageAsync() throws Exception { + if (!testsCfg.withClients()) + return; + + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + serverClientMessage(true); } }); } @@ -97,7 +122,21 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria runInAllDataModes(new TestRunnable() { @Override public void run() throws Exception { - clientClientMessage(); + clientClientMessage(false); + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testClientClientMessageAsync() throws Exception { + if (!testsCfg.withClients()) + return; + + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + clientClientMessage(false); } }); } @@ -111,7 +150,21 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria runInAllDataModes(new TestRunnable() { @Override public void run() throws Exception { - clientServerMessage(); + clientServerMessage(false); + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testClientServerMessageAsync() throws Exception { + if (!testsCfg.withClients()) + return; + + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + clientServerMessage(true); } }); } @@ -133,7 +186,18 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria public void testOrderedMessage() throws Exception { runInAllDataModes(new TestRunnable() { @Override public void run() throws Exception { - orderedMessage(); + orderedMessage(false); + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testOrderedMessageAsync() throws Exception { + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + orderedMessage(true); } }); } @@ -147,7 +211,7 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria runInAllDataModes(new TestRunnable() { @Override public void run() throws Exception { - clientServerOrderedMessage(); + clientServerOrderedMessage(false); } }); } @@ -155,13 +219,42 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria /** * @throws Exception If failed. */ + public void testClientServerOrderedMessageAsync() throws Exception { + if (!testsCfg.withClients()) + return; + + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + clientServerOrderedMessage(true); + } + }); + } + + + /** + * @throws Exception If failed. + */ public void testClientClientOrderedMessage() throws Exception { if (!testsCfg.withClients()) return; runInAllDataModes(new TestRunnable() { @Override public void run() throws Exception { - clientClientOrderedMessage(); + clientClientOrderedMessage(true); + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testClientClientOrderedMessageAsync() throws Exception { + if (!testsCfg.withClients()) + return; + + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + clientClientOrderedMessage(true); } }); } @@ -175,7 +268,21 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria runInAllDataModes(new TestRunnable() { @Override public void run() throws Exception { - serverClientOrderedMessage(); + serverClientOrderedMessage(false); + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testServerClientOrderedMessageAsync() throws Exception { + if (!testsCfg.withClients()) + return; + + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + serverClientOrderedMessage(true); } }); } @@ -184,7 +291,7 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria * Single server test. * @throws Exception If failed. */ - private void localServerInternal() throws Exception { + private void localServerInternal(boolean async) throws Exception { int messages = MSGS; Ignite ignite = grid(SERVER_NODE_IDX); @@ -197,7 +304,7 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria try { for (int i = 0; i < messages; i++) - sendMessage(ignite, grp, value(i)); + sendMessage(ignite, grp, value(i), async); assertTrue(LATCH.await(10, TimeUnit.SECONDS)); @@ -240,42 +347,43 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria * Server sends a message and client receives it. * @throws Exception If failed. */ - private void serverClientMessage() throws Exception { + private void serverClientMessage(boolean async) throws Exception { Ignite ignite = grid(SERVER_NODE_IDX); ClusterGroup grp = ignite.cluster().forClients(); assert grp.nodes().size() > 0; - registerListenerAndSendMessages(ignite, grp); + registerListenerAndSendMessages(ignite, grp, async); } /** * Client sends a message and client receives it. * @throws Exception If failed. */ - private void clientClientMessage() throws Exception { + private void clientClientMessage(boolean async) throws Exception { Ignite ignite = grid(CLIENT_NODE_IDX); ClusterGroup grp = ignite.cluster().forClients(); assert grp.nodes().size() > 0; - registerListenerAndSendMessages(ignite, grp); + registerListenerAndSendMessages(ignite, grp, async); } /** * Client sends a message and client receives it. + * @param async flag async mode * @throws Exception If failed. */ - private void clientServerMessage() throws Exception { + private void clientServerMessage(boolean async) throws Exception { Ignite ignite = grid(CLIENT_NODE_IDX); ClusterGroup grp = ignite.cluster().forServers(); assert grp.nodes().size() > 0; - registerListenerAndSendMessages(ignite, grp); + registerListenerAndSendMessages(ignite, grp, async); } /** @@ -283,7 +391,7 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria * @param grp Cluster group. * @throws Exception If fail. */ - private void registerListenerAndSendMessages(Ignite ignite, ClusterGroup grp) throws Exception { + private void registerListenerAndSendMessages(Ignite ignite, ClusterGroup grp, boolean async) throws Exception { int messages = MSGS; LATCH = new CountDownLatch(grp.nodes().size() * messages); @@ -292,7 +400,7 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria try { for (int i = 0; i < messages; i++) - sendMessage(ignite, grp, value(i)); + sendMessage(ignite, grp, value(i), async); assertTrue(LATCH.await(10, TimeUnit.SECONDS)); @@ -338,56 +446,56 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria * * @throws Exception If fail. */ - private void orderedMessage() throws Exception { + private void orderedMessage(boolean async) throws Exception { Ignite ignite = grid(SERVER_NODE_IDX); ClusterGroup grp = gridCount() > 1 ? ignite.cluster().forRemotes() : ignite.cluster().forLocal(); assert grp.nodes().size() > 0; - registerListenerAndSendOrderedMessages(ignite, grp); + registerListenerAndSendOrderedMessages(ignite, grp, async); } /** * * @throws Exception If fail. */ - private void clientServerOrderedMessage() throws Exception { + private void clientServerOrderedMessage(boolean async) throws Exception { Ignite ignite = grid(CLIENT_NODE_IDX); ClusterGroup grp = ignite.cluster().forServers(); assert grp.nodes().size() > 0; - registerListenerAndSendOrderedMessages(ignite, grp); + registerListenerAndSendOrderedMessages(ignite, grp, async); } /** * * @throws Exception If fail. */ - private void clientClientOrderedMessage() throws Exception { + private void clientClientOrderedMessage(boolean async) throws Exception { Ignite ignite = grid(CLIENT_NODE_IDX); ClusterGroup grp = ignite.cluster().forClients(); assert grp.nodes().size() > 0; - registerListenerAndSendOrderedMessages(ignite, grp); + registerListenerAndSendOrderedMessages(ignite, grp, async); } /** * * @throws Exception If fail. */ - private void serverClientOrderedMessage() throws Exception { + private void serverClientOrderedMessage(boolean async) throws Exception { Ignite ignite = grid(SERVER_NODE_IDX); ClusterGroup grp = ignite.cluster().forClients(); assert grp.nodes().size() > 0; - registerListenerAndSendOrderedMessages(ignite, grp); + registerListenerAndSendOrderedMessages(ignite, grp, async); } /** @@ -395,7 +503,7 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria * @param grp Cluster group. * @throws Exception If fail. */ - private void registerListenerAndSendOrderedMessages(Ignite ignite, ClusterGroup grp) throws Exception { + private void registerListenerAndSendOrderedMessages(Ignite ignite, ClusterGroup grp, boolean async) throws Exception { int messages = MSGS; LATCH = new CountDownLatch(grp.nodes().size() * messages); @@ -403,8 +511,12 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria UUID opId = ignite.message(grp).remoteListen(MESSAGE_TOPIC, new OrderedMessageListener()); try { - for (int i=0; i < messages; i++) - ignite.message(grp).sendOrdered(MESSAGE_TOPIC, value(i), 2000); + for (int i=0; i < messages; i++){ + if (async) + ignite.message(grp).withAsync().sendOrdered(MESSAGE_TOPIC, value(i), 2000); + else + ignite.message(grp).withAsync().sendOrdered(MESSAGE_TOPIC, value(i), 2000); + } assertTrue(LATCH.await(10, TimeUnit.SECONDS)); @@ -420,8 +532,11 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria * @param grp Cluster group. * @param msg Message. */ - private void sendMessage(Ignite nodeSnd, ClusterGroup grp, Object msg) { - nodeSnd.message(grp).send(MESSAGE_TOPIC, msg); + private void sendMessage(Ignite nodeSnd, ClusterGroup grp, Object msg, boolean async) { + if (async) + nodeSnd.message(grp).withAsync().send(MESSAGE_TOPIC, msg); + else + nodeSnd.message(grp).send(MESSAGE_TOPIC, msg); } /**
