IGNITE-3727 supported sync/async execution for local listener

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a6980abb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a6980abb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a6980abb

Branch: refs/heads/ignite-3727-2
Commit: a6980abbf5d8ff80ed8a83f479729ef418dfa7c9
Parents: ecd3d93
Author: DmitriyGovorukhin <[email protected]>
Authored: Thu Sep 8 20:02:43 2016 +0300
Committer: DmitriyGovorukhin <[email protected]>
Committed: Thu Sep 8 20:02:43 2016 +0300

----------------------------------------------------------------------
 .../ignite/internal/IgniteMessagingImpl.java    |   6 +-
 .../internal/managers/GridManagerAdapter.java   |   2 +-
 .../managers/communication/GridIoManager.java   |  40 +++--
 .../communication/GridIoManagerSelfTest.java    |   6 +-
 ...niteMessagingConfigVariationFullApiTest.java | 177 +++++++++++++++----
 5 files changed, 175 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a6980abb/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
index 2800777..e586aa2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
@@ -86,7 +86,7 @@ public class IgniteMessagingImpl extends 
AsyncSupportAdapter<IgniteMessaging>
             if (snapshot.isEmpty())
                 throw U.emptyTopologyException();
 
-            ctx.io().sendUserMessage(snapshot, msg, topic, false, 0);
+            ctx.io().sendUserMessage(snapshot, msg, topic, false, 0, 
isAsync());
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -111,7 +111,7 @@ public class IgniteMessagingImpl extends 
AsyncSupportAdapter<IgniteMessaging>
             for (Object msg : msgs) {
                 A.notNull(msg, "msg");
 
-                ctx.io().sendUserMessage(snapshot, msg, topic, false, 0);
+                ctx.io().sendUserMessage(snapshot, msg, topic, false, 0, 
isAsync());
             }
         }
         catch (IgniteCheckedException e) {
@@ -137,7 +137,7 @@ public class IgniteMessagingImpl extends 
AsyncSupportAdapter<IgniteMessaging>
             if (timeout == 0)
                 timeout = ctx.config().getNetworkTimeout();
 
-            ctx.io().sendUserMessage(snapshot, msg, topic, true, timeout);
+            ctx.io().sendUserMessage(snapshot, msg, topic, true, timeout, 
isAsync());
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6980abb/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
index 4fe8ca8..43510f6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
@@ -389,7 +389,7 @@ public abstract class GridManagerAdapter<T extends 
IgniteSpi> implements GridMan
                             if (msg instanceof Message)
                                 ctx.io().send(node, topic, (Message)msg, 
SYSTEM_POOL);
                             else
-                                
ctx.io().sendUserMessage(Collections.singletonList(node), msg, topic, false, 0);
+                                
ctx.io().sendUserMessage(Collections.singletonList(node), msg, topic, false, 0, 
false);
                         }
                         catch (IgniteCheckedException e) {
                             throw unwrapException(e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6980abb/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 9d40bf9..124fba5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -1271,7 +1271,8 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
         boolean ordered,
         long timeout,
         boolean skipOnTimeout,
-        IgniteInClosure<IgniteException> ackC
+        IgniteInClosure<IgniteException> ackC,
+        boolean async
     ) throws IgniteCheckedException {
         assert node != null;
         assert topic != null;
@@ -1289,8 +1290,10 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
 
             if (ordered)
                 processOrderedMessage(locNodeId, ioMsg, plc, null);
-            else
+            else if (async)
                 processRegularMessage(locNodeId, ioMsg, plc, null);
+            else
+                processRegularMessage0(ioMsg, locNodeId);
 
             if (ackC != null)
                 ackC.apply(null);
@@ -1346,7 +1349,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
         if (node == null)
             throw new IgniteCheckedException("Failed to send message to node 
(has node left grid?): " + nodeId);
 
-        send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null);
+        send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null, 
false);
     }
 
     /**
@@ -1358,7 +1361,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
      */
     public void send(ClusterNode node, Object topic, Message msg, byte plc)
         throws IgniteCheckedException {
-        send(node, topic, -1, msg, plc, false, 0, false, null);
+        send(node, topic, -1, msg, plc, false, 0, false, null, false);
     }
 
     /**
@@ -1368,9 +1371,9 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
      * @param plc Type of processing.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
-    public void send(ClusterNode node, GridTopic topic, Message msg, byte plc)
+    public void send(ClusterNode node, GridTopic topic, Message msg, byte plc, 
boolean async)
         throws IgniteCheckedException {
-        send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null);
+        send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null, 
false);
     }
 
     /**
@@ -1383,7 +1386,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
      */
     public void send(ClusterNode node, Object topic, int topicOrd, Message 
msg, byte plc)
         throws IgniteCheckedException {
-        send(node, topic, topicOrd, msg, plc, false, 0, false, null);
+        send(node, topic, topicOrd, msg, plc, false, 0, false, null, false);
     }
 
     /**
@@ -1405,7 +1408,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
     ) throws IgniteCheckedException {
         assert timeout > 0 || skipOnTimeout;
 
-        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, 
null);
+        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, 
null, false);
     }
 
     /**
@@ -1432,7 +1435,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
         if (node == null)
             throw new IgniteCheckedException("Failed to send message to node 
(has node left grid?): " + nodeId);
 
-        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, 
null);
+        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, 
null, false);
     }
 
     /**
@@ -1445,7 +1448,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
      */
     public void send(ClusterNode node, GridTopic topic, Message msg, byte plc,
         IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
-        send(node, topic, topic.ordinal(), msg, plc, false, 0, false, ackC);
+        send(node, topic, topic.ordinal(), msg, plc, false, 0, false, ackC, 
false);
     }
 
     /**
@@ -1481,7 +1484,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
      */
     public void send(ClusterNode node, Object topic, Message msg, byte plc, 
IgniteInClosure<IgniteException> ackC)
         throws IgniteCheckedException {
-        send(node, topic, -1, msg, plc, false, 0, false, ackC);
+        send(node, topic, -1, msg, plc, false, 0, false, ackC, false);
     }
 
     /**
@@ -1537,7 +1540,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
     ) throws IgniteCheckedException {
         assert timeout > 0 || skipOnTimeout;
 
-        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, 
ackC);
+        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, 
ackC, false);
     }
 
     /**
@@ -1548,7 +1551,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
     public void sendUserMessage(Collection<? extends ClusterNode> nodes, 
Object msg) throws IgniteCheckedException {
-        sendUserMessage(nodes, msg, null, false, 0);
+        sendUserMessage(nodes, msg, null, false, 0, false);
     }
 
     /**
@@ -1563,7 +1566,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
      */
     @SuppressWarnings("ConstantConditions")
     public void sendUserMessage(Collection<? extends ClusterNode> nodes, 
Object msg,
-        @Nullable Object topic, boolean ordered, long timeout) throws 
IgniteCheckedException {
+        @Nullable Object topic, boolean ordered, long timeout, boolean async) 
throws IgniteCheckedException {
         boolean loc = nodes.size() == 1 && 
F.first(nodes).id().equals(locNodeId);
 
         byte[] serMsg = null;
@@ -1617,10 +1620,11 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
             if (!rmtNodes.isEmpty())
                 send(rmtNodes, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL);
 
-            // Will call local listeners in current thread synchronously, so 
must go the last
+            // Will call local listeners in current thread synchronously or 
through pool,
+            // depending async flag, so must go the last
             // to allow remote nodes execute the requested operation in 
parallel.
             if (locNode != null)
-                send(locNode, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL);
+                send(locNode, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL, async);
         }
     }
 
@@ -1687,7 +1691,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
         if (node == null)
             throw new IgniteCheckedException("Failed to send message to node 
(has node left grid?): " + nodeId);
 
-        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, 
ackC);
+        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, 
ackC, false);
     }
 
     /**
@@ -1724,7 +1728,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
             // messages to one node vs. many.
             if (!nodes.isEmpty()) {
                 for (ClusterNode node : nodes)
-                    send(node, topic, topicOrd, msg, plc, ordered, timeout, 
skipOnTimeout, null);
+                    send(node, topic, topicOrd, msg, plc, ordered, timeout, 
skipOnTimeout, null, false);
             }
             else if (log.isDebugEnabled())
                 log.debug("Failed to send message to empty nodes collection 
[topic=" + topic + ", msg=" +

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6980abb/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
index c2cfa76..f5499d3 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
@@ -145,7 +145,7 @@ public class GridIoManagerSelfTest extends 
GridCommonAbstractTest {
         GridIoManager ioMgr = spy(new TestGridIoManager(ctx));
 
         try {
-            ioMgr.sendUserMessage(F.asList(locNode, rmtNode), msg, 
GridTopic.TOPIC_IGFS, false, 123L);
+            ioMgr.sendUserMessage(F.asList(locNode, rmtNode), msg, 
GridTopic.TOPIC_IGFS, false, 123L, false);
         }
         catch (IgniteCheckedException ignored) {
             // No-op. We are using mocks so real sending is impossible.
@@ -169,7 +169,7 @@ public class GridIoManagerSelfTest extends 
GridCommonAbstractTest {
         GridIoManager ioMgr = spy(new TestGridIoManager(ctx));
 
         try {
-            ioMgr.sendUserMessage(F.asList(locNode, rmtNode), msg, 
GridTopic.TOPIC_IGFS, true, 123L);
+            ioMgr.sendUserMessage(F.asList(locNode, rmtNode), msg, 
GridTopic.TOPIC_IGFS, true, 123L, false);
         }
         catch (Exception ignored) {
             // No-op. We are using mocks so real sending is impossible.
@@ -196,7 +196,7 @@ public class GridIoManagerSelfTest extends 
GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
-        @Override public void send(ClusterNode node, GridTopic topic, Message 
msg, byte plc)
+        @Override public void send(ClusterNode node, GridTopic topic, Message 
msg, byte plc, boolean async)
             throws IgniteCheckedException {
             // No-op.
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6980abb/modules/core/src/test/java/org/apache/ignite/internal/processors/messaging/IgniteMessagingConfigVariationFullApiTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/messaging/IgniteMessagingConfigVariationFullApiTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/messaging/IgniteMessagingConfigVariationFullApiTest.java
index 31b0663..fe85970 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/messaging/IgniteMessagingConfigVariationFullApiTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/messaging/IgniteMessagingConfigVariationFullApiTest.java
@@ -58,7 +58,18 @@ public class IgniteMessagingConfigVariationFullApiTest 
extends IgniteConfigVaria
     public void testLocalServer() throws Exception {
         runInAllDataModes(new TestRunnable() {
             @Override public void run() throws Exception {
-                localServerInternal();
+                localServerInternal(false);
+            }
+        });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLocalServerAsync() throws Exception {
+        runInAllDataModes(new TestRunnable() {
+            @Override public void run() throws Exception {
+                localServerInternal(true);
             }
         });
     }
@@ -83,7 +94,21 @@ public class IgniteMessagingConfigVariationFullApiTest 
extends IgniteConfigVaria
 
         runInAllDataModes(new TestRunnable() {
             @Override public void run() throws Exception {
-                serverClientMessage();
+                serverClientMessage(false);
+            }
+        });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testServerClientMessageAsync() throws Exception {
+        if (!testsCfg.withClients())
+            return;
+
+        runInAllDataModes(new TestRunnable() {
+            @Override public void run() throws Exception {
+                serverClientMessage(true);
             }
         });
     }
@@ -97,7 +122,21 @@ public class IgniteMessagingConfigVariationFullApiTest 
extends IgniteConfigVaria
 
         runInAllDataModes(new TestRunnable() {
             @Override public void run() throws Exception {
-                clientClientMessage();
+                clientClientMessage(false);
+            }
+        });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testClientClientMessageAsync() throws Exception {
+        if (!testsCfg.withClients())
+            return;
+
+        runInAllDataModes(new TestRunnable() {
+            @Override public void run() throws Exception {
+                clientClientMessage(false);
             }
         });
     }
@@ -111,7 +150,21 @@ public class IgniteMessagingConfigVariationFullApiTest 
extends IgniteConfigVaria
 
         runInAllDataModes(new TestRunnable() {
             @Override public void run() throws Exception {
-                clientServerMessage();
+                clientServerMessage(false);
+            }
+        });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testClientServerMessageAsync() throws Exception {
+        if (!testsCfg.withClients())
+            return;
+
+        runInAllDataModes(new TestRunnable() {
+            @Override public void run() throws Exception {
+                clientServerMessage(true);
             }
         });
     }
@@ -133,7 +186,18 @@ public class IgniteMessagingConfigVariationFullApiTest 
extends IgniteConfigVaria
     public void testOrderedMessage() throws Exception {
         runInAllDataModes(new TestRunnable() {
             @Override public void run() throws Exception {
-                orderedMessage();
+                orderedMessage(false);
+            }
+        });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testOrderedMessageAsync() throws Exception {
+        runInAllDataModes(new TestRunnable() {
+            @Override public void run() throws Exception {
+                orderedMessage(true);
             }
         });
     }
@@ -147,7 +211,7 @@ public class IgniteMessagingConfigVariationFullApiTest 
extends IgniteConfigVaria
 
         runInAllDataModes(new TestRunnable() {
             @Override public void run() throws Exception {
-                clientServerOrderedMessage();
+                clientServerOrderedMessage(false);
             }
         });
     }
@@ -155,13 +219,42 @@ public class IgniteMessagingConfigVariationFullApiTest 
extends IgniteConfigVaria
     /**
      * @throws Exception If failed.
      */
+    public void testClientServerOrderedMessageAsync() throws Exception {
+        if (!testsCfg.withClients())
+            return;
+
+        runInAllDataModes(new TestRunnable() {
+            @Override public void run() throws Exception {
+                clientServerOrderedMessage(true);
+            }
+        });
+    }
+
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testClientClientOrderedMessage() throws Exception {
         if (!testsCfg.withClients())
             return;
 
         runInAllDataModes(new TestRunnable() {
             @Override public void run() throws Exception {
-                clientClientOrderedMessage();
+                clientClientOrderedMessage(true);
+            }
+        });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testClientClientOrderedMessageAsync() throws Exception {
+        if (!testsCfg.withClients())
+            return;
+
+        runInAllDataModes(new TestRunnable() {
+            @Override public void run() throws Exception {
+                clientClientOrderedMessage(true);
             }
         });
     }
@@ -175,7 +268,21 @@ public class IgniteMessagingConfigVariationFullApiTest 
extends IgniteConfigVaria
 
         runInAllDataModes(new TestRunnable() {
             @Override public void run() throws Exception {
-                serverClientOrderedMessage();
+                serverClientOrderedMessage(false);
+            }
+        });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testServerClientOrderedMessageAsync() throws Exception {
+        if (!testsCfg.withClients())
+            return;
+
+        runInAllDataModes(new TestRunnable() {
+            @Override public void run() throws Exception {
+                serverClientOrderedMessage(true);
             }
         });
     }
@@ -184,7 +291,7 @@ public class IgniteMessagingConfigVariationFullApiTest 
extends IgniteConfigVaria
      * Single server test.
      * @throws Exception If failed.
      */
-    private void localServerInternal() throws Exception {
+    private void localServerInternal(boolean async) throws Exception {
         int messages = MSGS;
 
         Ignite ignite = grid(SERVER_NODE_IDX);
@@ -197,7 +304,7 @@ public class IgniteMessagingConfigVariationFullApiTest 
extends IgniteConfigVaria
 
         try {
             for (int i = 0; i < messages; i++)
-                sendMessage(ignite, grp, value(i));
+                sendMessage(ignite, grp, value(i), async);
 
             assertTrue(LATCH.await(10, TimeUnit.SECONDS));
 
@@ -240,42 +347,43 @@ public class IgniteMessagingConfigVariationFullApiTest 
extends IgniteConfigVaria
      * Server sends a message and client receives it.
      * @throws Exception If failed.
      */
-    private void serverClientMessage() throws Exception {
+    private void serverClientMessage(boolean async) throws Exception {
         Ignite ignite = grid(SERVER_NODE_IDX);
 
         ClusterGroup grp = ignite.cluster().forClients();
 
         assert grp.nodes().size() > 0;
 
-        registerListenerAndSendMessages(ignite, grp);
+        registerListenerAndSendMessages(ignite, grp, async);
     }
 
     /**
      * Client sends a message and client receives it.
      * @throws Exception If failed.
      */
-    private void clientClientMessage() throws Exception {
+    private void clientClientMessage(boolean async) throws Exception {
         Ignite ignite = grid(CLIENT_NODE_IDX);
 
         ClusterGroup grp = ignite.cluster().forClients();
 
         assert grp.nodes().size() > 0;
 
-        registerListenerAndSendMessages(ignite, grp);
+        registerListenerAndSendMessages(ignite, grp, async);
     }
 
     /**
      * Client sends a message and client receives it.
+     * @param async flag async mode
      * @throws Exception If failed.
      */
-    private void clientServerMessage() throws Exception {
+    private void clientServerMessage(boolean async) throws Exception {
         Ignite ignite = grid(CLIENT_NODE_IDX);
 
         ClusterGroup grp = ignite.cluster().forServers();
 
         assert grp.nodes().size() > 0;
 
-        registerListenerAndSendMessages(ignite, grp);
+        registerListenerAndSendMessages(ignite, grp, async);
     }
 
     /**
@@ -283,7 +391,7 @@ public class IgniteMessagingConfigVariationFullApiTest 
extends IgniteConfigVaria
      * @param grp Cluster group.
      * @throws Exception If fail.
      */
-    private void registerListenerAndSendMessages(Ignite ignite, ClusterGroup 
grp) throws Exception {
+    private void registerListenerAndSendMessages(Ignite ignite, ClusterGroup 
grp, boolean async) throws Exception {
         int messages = MSGS;
 
         LATCH = new CountDownLatch(grp.nodes().size() * messages);
@@ -292,7 +400,7 @@ public class IgniteMessagingConfigVariationFullApiTest 
extends IgniteConfigVaria
 
         try {
             for (int i = 0; i < messages; i++)
-                sendMessage(ignite, grp, value(i));
+                sendMessage(ignite, grp, value(i), async);
 
             assertTrue(LATCH.await(10, TimeUnit.SECONDS));
 
@@ -338,56 +446,56 @@ public class IgniteMessagingConfigVariationFullApiTest 
extends IgniteConfigVaria
      *
      * @throws Exception If fail.
      */
-    private void orderedMessage() throws Exception {
+    private void orderedMessage(boolean async) throws Exception {
         Ignite ignite = grid(SERVER_NODE_IDX);
 
         ClusterGroup grp = gridCount() > 1 ? ignite.cluster().forRemotes() : 
ignite.cluster().forLocal();
 
         assert grp.nodes().size() > 0;
 
-        registerListenerAndSendOrderedMessages(ignite, grp);
+        registerListenerAndSendOrderedMessages(ignite, grp, async);
     }
 
     /**
      *
      * @throws Exception If fail.
      */
-    private void clientServerOrderedMessage() throws Exception {
+    private void clientServerOrderedMessage(boolean async) throws Exception {
         Ignite ignite = grid(CLIENT_NODE_IDX);
 
         ClusterGroup grp = ignite.cluster().forServers();
 
         assert grp.nodes().size() > 0;
 
-        registerListenerAndSendOrderedMessages(ignite, grp);
+        registerListenerAndSendOrderedMessages(ignite, grp, async);
     }
 
     /**
      *
      * @throws Exception If fail.
      */
-    private void clientClientOrderedMessage() throws Exception {
+    private void clientClientOrderedMessage(boolean async) throws Exception {
         Ignite ignite = grid(CLIENT_NODE_IDX);
 
         ClusterGroup grp = ignite.cluster().forClients();
 
         assert grp.nodes().size() > 0;
 
-        registerListenerAndSendOrderedMessages(ignite, grp);
+        registerListenerAndSendOrderedMessages(ignite, grp, async);
     }
 
     /**
      *
      * @throws Exception If fail.
      */
-    private void serverClientOrderedMessage() throws Exception {
+    private void serverClientOrderedMessage(boolean async) throws Exception {
         Ignite ignite = grid(SERVER_NODE_IDX);
 
         ClusterGroup grp = ignite.cluster().forClients();
 
         assert grp.nodes().size() > 0;
 
-        registerListenerAndSendOrderedMessages(ignite, grp);
+        registerListenerAndSendOrderedMessages(ignite, grp, async);
     }
 
     /**
@@ -395,7 +503,7 @@ public class IgniteMessagingConfigVariationFullApiTest 
extends IgniteConfigVaria
      * @param grp Cluster group.
      * @throws Exception If fail.
      */
-    private void registerListenerAndSendOrderedMessages(Ignite ignite, 
ClusterGroup grp) throws Exception {
+    private void registerListenerAndSendOrderedMessages(Ignite ignite, 
ClusterGroup grp, boolean async) throws Exception {
         int messages = MSGS;
 
         LATCH = new CountDownLatch(grp.nodes().size() * messages);
@@ -403,8 +511,12 @@ public class IgniteMessagingConfigVariationFullApiTest 
extends IgniteConfigVaria
         UUID opId = ignite.message(grp).remoteListen(MESSAGE_TOPIC, new 
OrderedMessageListener());
 
         try {
-            for (int i=0; i < messages; i++)
-                ignite.message(grp).sendOrdered(MESSAGE_TOPIC, value(i), 2000);
+            for (int i=0; i < messages; i++){
+                if (async)
+                    ignite.message(grp).withAsync().sendOrdered(MESSAGE_TOPIC, 
value(i), 2000);
+                else
+                    ignite.message(grp).withAsync().sendOrdered(MESSAGE_TOPIC, 
value(i), 2000);
+            }
 
             assertTrue(LATCH.await(10, TimeUnit.SECONDS));
 
@@ -420,8 +532,11 @@ public class IgniteMessagingConfigVariationFullApiTest 
extends IgniteConfigVaria
      * @param grp Cluster group.
      * @param msg Message.
      */
-    private void sendMessage(Ignite nodeSnd, ClusterGroup grp, Object msg) {
-        nodeSnd.message(grp).send(MESSAGE_TOPIC, msg);
+    private void sendMessage(Ignite nodeSnd, ClusterGroup grp, Object msg, 
boolean async) {
+        if (async)
+            nodeSnd.message(grp).withAsync().send(MESSAGE_TOPIC, msg);
+        else
+            nodeSnd.message(grp).send(MESSAGE_TOPIC, msg);
     }
 
     /**

Reply via email to