Repository: ignite Updated Branches: refs/heads/ignite-1.9 bea863f44 -> 9d811f1b5
Revert: ignite-3727 Support local listeners async execution for IgniteMessage.send Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9d811f1b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9d811f1b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9d811f1b Branch: refs/heads/ignite-1.9 Commit: 9d811f1b56a07a463f9ca88535581097477929b2 Parents: bea863f Author: Yakov Zhdanov <[email protected]> Authored: Wed Mar 1 19:20:18 2017 +0300 Committer: Yakov Zhdanov <[email protected]> Committed: Wed Mar 1 19:20:18 2017 +0300 ---------------------------------------------------------------------- .../java/org/apache/ignite/IgniteMessaging.java | 13 +- .../ignite/internal/IgniteMessagingImpl.java | 8 +- .../internal/managers/GridManagerAdapter.java | 2 +- .../managers/communication/GridIoManager.java | 55 +- .../communication/GridIoManagerSelfTest.java | 8 +- ...niteMessagingConfigVariationFullApiTest.java | 195 ++----- .../ignite/messaging/GridMessagingSelfTest.java | 116 +--- .../messaging/IgniteMessagingSendAsyncTest.java | 544 ------------------- .../ignite/testsuites/IgniteBasicTestSuite.java | 2 - .../hadoop/shuffle/HadoopShuffle.java | 4 +- 10 files changed, 77 insertions(+), 870 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/9d811f1b/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java b/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java index e64ded5..ff52ed8 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java @@ -77,10 +77,6 @@ public interface IgniteMessaging extends IgniteAsyncSupport { /** * Sends given message with specified topic to the nodes in the underlying cluster group. - * <p> - * By default all local listeners will be executed in the calling thread, or if you use - * {@link #withAsync()}, listeners will execute in public thread pool (in this case it is user's - * responsibility to implement back-pressure and limit number of concurrently executed async messages). * * @param topic Topic to send to, {@code null} for default topic. * @param msg Message to send. @@ -91,10 +87,6 @@ public interface IgniteMessaging extends IgniteAsyncSupport { /** * Sends given messages with the specified topic to the nodes in the underlying cluster group. - * <p> - * By default all local listeners will be executed in the calling thread, or if you use - * {@link #withAsync()}, listeners will execute in public thread pool (in this case it is user's - * responsibility to implement back-pressure and limit number of concurrently executed async messages). * * @param topic Topic to send to, {@code null} for default topic. * @param msgs Messages to send. Order of the sending is undefined. If the method produces @@ -107,8 +99,7 @@ public interface IgniteMessaging extends IgniteAsyncSupport { /** * Sends given message with specified topic to the nodes in the underlying cluster group. Messages sent with * this method will arrive in the same order they were sent. Note that if a topic is used - * for ordered messages, then it cannot be reused for non-ordered messages. Note that local listeners - * are always executed in public thread pool, no matter default or {@link #withAsync()} mode is used. + * for ordered messages, then it cannot be reused for non-ordered messages. * <p> * The {@code timeout} parameter specifies how long an out-of-order message will stay in a queue, * waiting for messages that are ordered ahead of it to arrive. If timeout expires, then all ordered @@ -171,4 +162,4 @@ public interface IgniteMessaging extends IgniteAsyncSupport { /** {@inheritDoc} */ @Override IgniteMessaging withAsync(); -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9d811f1b/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java index 541fad4..6b33aa5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java @@ -86,7 +86,7 @@ public class IgniteMessagingImpl extends AsyncSupportAdapter<IgniteMessaging> if (snapshot.isEmpty()) throw U.emptyTopologyException(); - ctx.io().sendUserMessage(snapshot, msg, topic, false, 0, isAsync()); + ctx.io().sendUserMessage(snapshot, msg, topic, false, 0); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -111,7 +111,7 @@ public class IgniteMessagingImpl extends AsyncSupportAdapter<IgniteMessaging> for (Object msg : msgs) { A.notNull(msg, "msg"); - ctx.io().sendUserMessage(snapshot, msg, topic, false, 0, isAsync()); + ctx.io().sendUserMessage(snapshot, msg, topic, false, 0); } } catch (IgniteCheckedException e) { @@ -137,7 +137,7 @@ public class IgniteMessagingImpl extends AsyncSupportAdapter<IgniteMessaging> if (timeout == 0) timeout = ctx.config().getNetworkTimeout(); - ctx.io().sendUserMessage(snapshot, msg, topic, true, timeout, false); + ctx.io().sendUserMessage(snapshot, msg, topic, true, timeout); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -254,4 +254,4 @@ public class IgniteMessagingImpl extends AsyncSupportAdapter<IgniteMessaging> protected Object readResolve() throws ObjectStreamException { return prj.message(); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9d811f1b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java index 5992eda..584cc56 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java @@ -390,7 +390,7 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan if (msg instanceof Message) ctx.io().send(node, topic, (Message)msg, SYSTEM_POOL); else - ctx.io().sendUserMessage(Collections.singletonList(node), msg, topic, false, 0, false); + ctx.io().sendUserMessage(Collections.singletonList(node), msg, topic, false, 0); } catch (IgniteCheckedException e) { throw unwrapException(e); http://git-wip-us.apache.org/repos/asf/ignite/blob/9d811f1b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 84b4543..7ef7bc0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -785,8 +785,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa finally { threadProcessingMessage(false); - if (msgC != null) - msgC.run(); + msgC.run(); } } @@ -1238,7 +1237,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * @param timeout Timeout. * @param skipOnTimeout Whether message can be skipped on timeout. * @param ackC Ack closure. - * @param async If {@code true} message for local node will be processed in pool, otherwise in current thread. * @throws IgniteCheckedException Thrown in case of any errors. */ private void send( @@ -1250,8 +1248,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa boolean ordered, long timeout, boolean skipOnTimeout, - IgniteInClosure<IgniteException> ackC, - boolean async + IgniteInClosure<IgniteException> ackC ) throws IgniteCheckedException { assert node != null; assert topic != null; @@ -1269,11 +1266,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (ordered) processOrderedMessage(locNodeId, ioMsg, plc, null); - else if (async) { - assert msg instanceof GridIoUserMessage : ioMsg; // Async execution was added only for IgniteMessaging. - - processRegularMessage(locNodeId, ioMsg, plc, null); - } else processRegularMessage0(ioMsg, locNodeId); @@ -1331,7 +1323,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (node == null) throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId); - send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null, false); + send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null); } /** @@ -1343,7 +1335,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa */ public void send(ClusterNode node, Object topic, Message msg, byte plc) throws IgniteCheckedException { - send(node, topic, -1, msg, plc, false, 0, false, null, false); + send(node, topic, -1, msg, plc, false, 0, false, null); } /** @@ -1351,12 +1343,11 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * @param topic Topic to send the message to. * @param msg Message to send. * @param plc Type of processing. - * @param async Async flag. * @throws IgniteCheckedException Thrown in case of any errors. */ - public void send(ClusterNode node, GridTopic topic, Message msg, byte plc, boolean async) + public void send(ClusterNode node, GridTopic topic, Message msg, byte plc) throws IgniteCheckedException { - send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null, async); + send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null); } /** @@ -1369,7 +1360,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa */ public void send(ClusterNode node, Object topic, int topicOrd, Message msg, byte plc) throws IgniteCheckedException { - send(node, topic, topicOrd, msg, plc, false, 0, false, null, false); + send(node, topic, topicOrd, msg, plc, false, 0, false, null); } /** @@ -1391,7 +1382,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa ) throws IgniteCheckedException { assert timeout > 0 || skipOnTimeout; - send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null, false); + send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null); } /** @@ -1418,7 +1409,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (node == null) throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId); - send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null, false); + send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null); } /** @@ -1431,7 +1422,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa */ public void send(ClusterNode node, GridTopic topic, Message msg, byte plc, IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException { - send(node, topic, topic.ordinal(), msg, plc, false, 0, false, ackC, false); + send(node, topic, topic.ordinal(), msg, plc, false, 0, false, ackC); } /** @@ -1467,7 +1458,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa */ public void send(ClusterNode node, Object topic, Message msg, byte plc, IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException { - send(node, topic, -1, msg, plc, false, 0, false, ackC, false); + send(node, topic, -1, msg, plc, false, 0, false, ackC); } /** @@ -1523,10 +1514,10 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa ) throws IgniteCheckedException { assert timeout > 0 || skipOnTimeout; - send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackC, false); + send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackC); } - /** + /** * Sends a peer deployable user message. * * @param nodes Destination nodes. @@ -1534,7 +1525,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * @throws IgniteCheckedException Thrown in case of any errors. */ public void sendUserMessage(Collection<? extends ClusterNode> nodes, Object msg) throws IgniteCheckedException { - sendUserMessage(nodes, msg, null, false, 0, false); + sendUserMessage(nodes, msg, null, false, 0); } /** @@ -1545,12 +1536,11 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * @param topic Message topic to use. * @param ordered Is message ordered? * @param timeout Message timeout in milliseconds for ordered messages. - * @param async Async flag. * @throws IgniteCheckedException Thrown in case of any errors. */ @SuppressWarnings("ConstantConditions") public void sendUserMessage(Collection<? extends ClusterNode> nodes, Object msg, - @Nullable Object topic, boolean ordered, long timeout, boolean async) throws IgniteCheckedException { + @Nullable Object topic, boolean ordered, long timeout) throws IgniteCheckedException { boolean loc = nodes.size() == 1 && F.first(nodes).id().equals(locNodeId); byte[] serMsg = null; @@ -1595,7 +1585,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (ordered) sendOrderedMessage(nodes, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL, timeout, true); else if (loc) - send(F.first(nodes), TOPIC_COMM_USER, ioMsg, PUBLIC_POOL, async); + send(F.first(nodes), TOPIC_COMM_USER, ioMsg, PUBLIC_POOL); else { ClusterNode locNode = F.find(nodes, null, F.localNode(locNodeId)); @@ -1604,11 +1594,10 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (!rmtNodes.isEmpty()) send(rmtNodes, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL); - // Will call local listeners in current thread synchronously or through pool, - // depending async flag, so must go the last + // Will call local listeners in current thread synchronously, so must go the last // to allow remote nodes execute the requested operation in parallel. if (locNode != null) - send(locNode, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL, async); + send(locNode, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL); } } @@ -1675,7 +1664,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (node == null) throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId); - send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackC, false); + send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackC); } /** @@ -1712,7 +1701,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa // messages to one node vs. many. if (!nodes.isEmpty()) { for (ClusterNode node : nodes) - send(node, topic, topicOrd, msg, plc, ordered, timeout, skipOnTimeout, null, false); + send(node, topic, topicOrd, msg, plc, ordered, timeout, skipOnTimeout, null); } else if (log.isDebugEnabled()) log.debug("Failed to send message to empty nodes collection [topic=" + topic + ", msg=" + @@ -1940,7 +1929,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (rmv && log.isDebugEnabled()) log.debug("Removed message listener [topic=" + topic + ", lsnr=" + lsnr + ']'); - if (lsnr instanceof ArrayListener) { + if (lsnr instanceof ArrayListener) + { for (GridMessageListener childLsnr : ((ArrayListener)lsnr).arr) closeListener(childLsnr); } @@ -1952,7 +1942,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa /** * Closes a listener, if applicable. - * * @param lsnr Listener. */ private void closeListener(GridMessageListener lsnr) { http://git-wip-us.apache.org/repos/asf/ignite/blob/9d811f1b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java index f5499d3..2039d81 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java @@ -145,7 +145,7 @@ public class GridIoManagerSelfTest extends GridCommonAbstractTest { GridIoManager ioMgr = spy(new TestGridIoManager(ctx)); try { - ioMgr.sendUserMessage(F.asList(locNode, rmtNode), msg, GridTopic.TOPIC_IGFS, false, 123L, false); + ioMgr.sendUserMessage(F.asList(locNode, rmtNode), msg, GridTopic.TOPIC_IGFS, false, 123L); } catch (IgniteCheckedException ignored) { // No-op. We are using mocks so real sending is impossible. @@ -169,7 +169,7 @@ public class GridIoManagerSelfTest extends GridCommonAbstractTest { GridIoManager ioMgr = spy(new TestGridIoManager(ctx)); try { - ioMgr.sendUserMessage(F.asList(locNode, rmtNode), msg, GridTopic.TOPIC_IGFS, true, 123L, false); + ioMgr.sendUserMessage(F.asList(locNode, rmtNode), msg, GridTopic.TOPIC_IGFS, true, 123L); } catch (Exception ignored) { // No-op. We are using mocks so real sending is impossible. @@ -196,7 +196,7 @@ public class GridIoManagerSelfTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ - @Override public void send(ClusterNode node, GridTopic topic, Message msg, byte plc, boolean async) + @Override public void send(ClusterNode node, GridTopic topic, Message msg, byte plc) throws IgniteCheckedException { // No-op. } @@ -257,4 +257,4 @@ public class GridIoManagerSelfTest extends GridCommonAbstractTest { return 0; } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9d811f1b/modules/core/src/test/java/org/apache/ignite/internal/processors/messaging/IgniteMessagingConfigVariationFullApiTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/messaging/IgniteMessagingConfigVariationFullApiTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/messaging/IgniteMessagingConfigVariationFullApiTest.java index 49aab10..31b0663 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/messaging/IgniteMessagingConfigVariationFullApiTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/messaging/IgniteMessagingConfigVariationFullApiTest.java @@ -58,18 +58,7 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria public void testLocalServer() throws Exception { runInAllDataModes(new TestRunnable() { @Override public void run() throws Exception { - localServerInternal(false); - } - }); - } - - /** - * @throws Exception If failed. - */ - public void testLocalServerAsync() throws Exception { - runInAllDataModes(new TestRunnable() { - @Override public void run() throws Exception { - localServerInternal(true); + localServerInternal(); } }); } @@ -94,21 +83,7 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria runInAllDataModes(new TestRunnable() { @Override public void run() throws Exception { - serverClientMessage(false); - } - }); - } - - /** - * @throws Exception If failed. - */ - public void testServerClientMessageAsync() throws Exception { - if (!testsCfg.withClients()) - return; - - runInAllDataModes(new TestRunnable() { - @Override public void run() throws Exception { - serverClientMessage(true); + serverClientMessage(); } }); } @@ -122,21 +97,7 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria runInAllDataModes(new TestRunnable() { @Override public void run() throws Exception { - clientClientMessage(false); - } - }); - } - - /** - * @throws Exception If failed. - */ - public void testClientClientMessageAsync() throws Exception { - if (!testsCfg.withClients()) - return; - - runInAllDataModes(new TestRunnable() { - @Override public void run() throws Exception { - clientClientMessage(true); + clientClientMessage(); } }); } @@ -150,21 +111,7 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria runInAllDataModes(new TestRunnable() { @Override public void run() throws Exception { - clientServerMessage(false); - } - }); - } - - /** - * @throws Exception If failed. - */ - public void testClientServerMessageAsync() throws Exception { - if (!testsCfg.withClients()) - return; - - runInAllDataModes(new TestRunnable() { - @Override public void run() throws Exception { - clientServerMessage(true); + clientServerMessage(); } }); } @@ -186,18 +133,7 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria public void testOrderedMessage() throws Exception { runInAllDataModes(new TestRunnable() { @Override public void run() throws Exception { - orderedMessage(false); - } - }); - } - - /** - * @throws Exception If failed. - */ - public void testOrderedMessageAsync() throws Exception { - runInAllDataModes(new TestRunnable() { - @Override public void run() throws Exception { - orderedMessage(true); + orderedMessage(); } }); } @@ -211,7 +147,7 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria runInAllDataModes(new TestRunnable() { @Override public void run() throws Exception { - clientServerOrderedMessage(false); + clientServerOrderedMessage(); } }); } @@ -219,42 +155,13 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria /** * @throws Exception If failed. */ - public void testClientServerOrderedMessageAsync() throws Exception { - if (!testsCfg.withClients()) - return; - - runInAllDataModes(new TestRunnable() { - @Override public void run() throws Exception { - clientServerOrderedMessage(true); - } - }); - } - - - /** - * @throws Exception If failed. - */ public void testClientClientOrderedMessage() throws Exception { if (!testsCfg.withClients()) return; runInAllDataModes(new TestRunnable() { @Override public void run() throws Exception { - clientClientOrderedMessage(false); - } - }); - } - - /** - * @throws Exception If failed. - */ - public void testClientClientOrderedMessageAsync() throws Exception { - if (!testsCfg.withClients()) - return; - - runInAllDataModes(new TestRunnable() { - @Override public void run() throws Exception { - clientClientOrderedMessage(true); + clientClientOrderedMessage(); } }); } @@ -268,32 +175,16 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria runInAllDataModes(new TestRunnable() { @Override public void run() throws Exception { - serverClientOrderedMessage(false); - } - }); - } - - /** - * @throws Exception If failed. - */ - public void testServerClientOrderedMessageAsync() throws Exception { - if (!testsCfg.withClients()) - return; - - runInAllDataModes(new TestRunnable() { - @Override public void run() throws Exception { - serverClientOrderedMessage(true); + serverClientOrderedMessage(); } }); } /** * Single server test. - * - * @param async Async message send flag. * @throws Exception If failed. */ - private void localServerInternal(boolean async) throws Exception { + private void localServerInternal() throws Exception { int messages = MSGS; Ignite ignite = grid(SERVER_NODE_IDX); @@ -306,7 +197,7 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria try { for (int i = 0; i < messages; i++) - sendMessage(ignite, grp, value(i), async); + sendMessage(ignite, grp, value(i)); assertTrue(LATCH.await(10, TimeUnit.SECONDS)); @@ -347,59 +238,52 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria /** * Server sends a message and client receives it. - * - * @param async Async message send flag. * @throws Exception If failed. */ - private void serverClientMessage(boolean async) throws Exception { + private void serverClientMessage() throws Exception { Ignite ignite = grid(SERVER_NODE_IDX); ClusterGroup grp = ignite.cluster().forClients(); assert grp.nodes().size() > 0; - registerListenerAndSendMessages(ignite, grp, async); + registerListenerAndSendMessages(ignite, grp); } /** * Client sends a message and client receives it. - * - * @param async Async message send flag. * @throws Exception If failed. */ - private void clientClientMessage(boolean async) throws Exception { + private void clientClientMessage() throws Exception { Ignite ignite = grid(CLIENT_NODE_IDX); ClusterGroup grp = ignite.cluster().forClients(); assert grp.nodes().size() > 0; - registerListenerAndSendMessages(ignite, grp, async); + registerListenerAndSendMessages(ignite, grp); } /** * Client sends a message and client receives it. - * - * @param async Async message send flag. * @throws Exception If failed. */ - private void clientServerMessage(boolean async) throws Exception { + private void clientServerMessage() throws Exception { Ignite ignite = grid(CLIENT_NODE_IDX); ClusterGroup grp = ignite.cluster().forServers(); assert grp.nodes().size() > 0; - registerListenerAndSendMessages(ignite, grp, async); + registerListenerAndSendMessages(ignite, grp); } /** * @param ignite Ignite. * @param grp Cluster group. - * @param async Async message send flag. * @throws Exception If fail. */ - private void registerListenerAndSendMessages(Ignite ignite, ClusterGroup grp, boolean async) throws Exception { + private void registerListenerAndSendMessages(Ignite ignite, ClusterGroup grp) throws Exception { int messages = MSGS; LATCH = new CountDownLatch(grp.nodes().size() * messages); @@ -408,7 +292,7 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria try { for (int i = 0; i < messages; i++) - sendMessage(ignite, grp, value(i), async); + sendMessage(ignite, grp, value(i)); assertTrue(LATCH.await(10, TimeUnit.SECONDS)); @@ -451,68 +335,67 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria } /** - * @param async Async message send flag. + * * @throws Exception If fail. */ - private void orderedMessage(boolean async) throws Exception { + private void orderedMessage() throws Exception { Ignite ignite = grid(SERVER_NODE_IDX); ClusterGroup grp = gridCount() > 1 ? ignite.cluster().forRemotes() : ignite.cluster().forLocal(); assert grp.nodes().size() > 0; - registerListenerAndSendOrderedMessages(ignite, grp, async); + registerListenerAndSendOrderedMessages(ignite, grp); } /** - * @param async Async message send flag. + * * @throws Exception If fail. */ - private void clientServerOrderedMessage(boolean async) throws Exception { + private void clientServerOrderedMessage() throws Exception { Ignite ignite = grid(CLIENT_NODE_IDX); ClusterGroup grp = ignite.cluster().forServers(); assert grp.nodes().size() > 0; - registerListenerAndSendOrderedMessages(ignite, grp, async); + registerListenerAndSendOrderedMessages(ignite, grp); } /** - * @param async Async message send flag. + * * @throws Exception If fail. */ - private void clientClientOrderedMessage(boolean async) throws Exception { + private void clientClientOrderedMessage() throws Exception { Ignite ignite = grid(CLIENT_NODE_IDX); ClusterGroup grp = ignite.cluster().forClients(); assert grp.nodes().size() > 0; - registerListenerAndSendOrderedMessages(ignite, grp, async); + registerListenerAndSendOrderedMessages(ignite, grp); } /** - * @param async Async message send flag. + * * @throws Exception If fail. */ - private void serverClientOrderedMessage(boolean async) throws Exception { + private void serverClientOrderedMessage() throws Exception { Ignite ignite = grid(SERVER_NODE_IDX); ClusterGroup grp = ignite.cluster().forClients(); assert grp.nodes().size() > 0; - registerListenerAndSendOrderedMessages(ignite, grp, async); + registerListenerAndSendOrderedMessages(ignite, grp); } /** * @param ignite Ignite. * @param grp Cluster group. - * @param async Async message send flag. * @throws Exception If fail. */ - private void registerListenerAndSendOrderedMessages(Ignite ignite, ClusterGroup grp, boolean async) throws Exception { + private void registerListenerAndSendOrderedMessages(Ignite ignite, ClusterGroup grp) throws Exception { int messages = MSGS; LATCH = new CountDownLatch(grp.nodes().size() * messages); @@ -520,12 +403,8 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria UUID opId = ignite.message(grp).remoteListen(MESSAGE_TOPIC, new OrderedMessageListener()); try { - for (int i=0; i < messages; i++){ - if (async) - ignite.message(grp).withAsync().sendOrdered(MESSAGE_TOPIC, value(i), 2000); - else - ignite.message(grp).sendOrdered(MESSAGE_TOPIC, value(i), 2000); - } + for (int i=0; i < messages; i++) + ignite.message(grp).sendOrdered(MESSAGE_TOPIC, value(i), 2000); assertTrue(LATCH.await(10, TimeUnit.SECONDS)); @@ -540,13 +419,9 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria * @param nodeSnd Sender Ignite node. * @param grp Cluster group. * @param msg Message. - * @param async Async message send flag. */ - private void sendMessage(Ignite nodeSnd, ClusterGroup grp, Object msg, boolean async) { - if (async) - nodeSnd.message(grp).withAsync().send(MESSAGE_TOPIC, msg); - else - nodeSnd.message(grp).send(MESSAGE_TOPIC, msg); + private void sendMessage(Ignite nodeSnd, ClusterGroup grp, Object msg) { + nodeSnd.message(grp).send(MESSAGE_TOPIC, msg); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/9d811f1b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java index a166c3d..5a0dfa2 100644 --- a/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java @@ -24,7 +24,6 @@ import java.io.ObjectOutput; import java.io.Serializable; import java.net.URL; import java.net.URLClassLoader; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -37,20 +36,15 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteMessaging; import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; -import org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessage; -import org.apache.ignite.internal.processors.continuous.StopRoutineDiscoveryMessage; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.typedef.P2; import org.apache.ignite.internal.util.typedef.PA; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteFuture; -import org.apache.ignite.resources.IgniteInstanceResource;; -import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; +import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; @@ -204,7 +198,7 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); - TestTcpDiscoverySpi discoSpi = new TestTcpDiscoverySpi(); + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); discoSpi.setIpFinder(ipFinder); @@ -950,7 +944,7 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser * @throws Exception If error occurs. */ public void testSendMessageWithExternalClassLoader() throws Exception { - URL[] urls = new URL[] {new URL(GridTestProperties.getProperty("p2p.uri.cls"))}; + URL[] urls = new URL[] { new URL(GridTestProperties.getProperty("p2p.uri.cls")) }; ClassLoader extLdr = new URLClassLoader(urls); @@ -1034,8 +1028,6 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser public void testAsync() throws Exception { final AtomicInteger msgCnt = new AtomicInteger(); - TestTcpDiscoverySpi discoSpi = (TestTcpDiscoverySpi)ignite2.configuration().getDiscoverySpi(); - assertFalse(ignite2.message().isAsync()); final IgniteMessaging msg = ignite2.message().withAsync(); @@ -1052,8 +1044,6 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser } }, IllegalStateException.class, null); - discoSpi.blockCustomEvent(); - final String topic = "topic"; UUID id = msg.remoteListen(topic, new P2<UUID, Object>() { @@ -1069,15 +1059,9 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser Assert.assertNull(id); - IgniteFuture<UUID> starFut = msg.future(); - - Assert.assertNotNull(starFut); - - U.sleep(500); + IgniteFuture<UUID> fut = msg.future(); - Assert.assertFalse(starFut.isDone()); - - discoSpi.stopBlock(); + Assert.assertNotNull(fut); GridTestUtils.assertThrows(log, new Callable<Void>() { @Override public Void call() throws Exception { @@ -1087,14 +1071,10 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser } }, IllegalStateException.class, null); - id = starFut.get(); + id = fut.get(); Assert.assertNotNull(id); - Assert.assertTrue(starFut.isDone()); - - discoSpi.blockCustomEvent(); - message(ignite1.cluster().forRemotes()).send(topic, "msg1"); GridTestUtils.waitForCondition(new PA() { @@ -1119,16 +1099,8 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser } }, IllegalStateException.class, null); - U.sleep(500); - - Assert.assertFalse(stopFut.isDone()); - - discoSpi.stopBlock(); - stopFut.get(); - Assert.assertTrue(stopFut.isDone()); - message(ignite1.cluster().forRemotes()).send(topic, "msg2"); U.sleep(1000); @@ -1137,80 +1109,6 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser } /** - * - */ - static class TestTcpDiscoverySpi extends TcpDiscoverySpi { - /** */ - private boolean blockCustomEvt; - - /** */ - private final Object mux = new Object(); - - /** */ - private List<DiscoverySpiCustomMessage> blockedMsgs = new ArrayList<>(); - - /** {@inheritDoc} */ - @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException { - synchronized (mux) { - if (blockCustomEvt) { - DiscoveryCustomMessage msg0 = GridTestUtils.getFieldValue(msg, "delegate"); - if (msg0 instanceof StopRoutineDiscoveryMessage || msg0 instanceof StartRoutineDiscoveryMessage) { - log.info("Block custom message: " + msg0); - blockedMsgs.add(msg); - - mux.notifyAll(); - } - return; - } - } - - super.sendCustomEvent(msg); - } - - /** - * - */ - public void blockCustomEvent() { - synchronized (mux) { - assert blockedMsgs.isEmpty() : blockedMsgs; - - blockCustomEvt = true; - } - } - - /** - * @throws InterruptedException If interrupted. - */ - public void waitCustomEvent() throws InterruptedException { - synchronized (mux) { - while (blockedMsgs.isEmpty()) - mux.wait(); - } - } - - /** - * - */ - public void stopBlock() { - List<DiscoverySpiCustomMessage> msgs; - - synchronized (this) { - msgs = new ArrayList<>(blockedMsgs); - - blockCustomEvt = false; - - blockedMsgs.clear(); - } - - for (DiscoverySpiCustomMessage msg : msgs) { - log.info("Resend blocked message: " + msg); - - super.sendCustomEvent(msg); - } - } - } - - /** * Tests that message listener registers only for one oldest node. * * @throws Exception If an error occurred. @@ -1254,4 +1152,4 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser assertEquals(1, MSG_CNT.get()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9d811f1b/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java b/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java deleted file mode 100644 index 75e7d22..0000000 --- a/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java +++ /dev/null @@ -1,544 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.messaging; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import java.io.Serializable; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicReference; -import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteMessaging; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.lang.IgniteBiInClosure; -import org.apache.ignite.lang.IgniteBiPredicate; -import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; -import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import org.jsr166.ThreadLocalRandom8; -import org.junit.Assert; - -/** - * - */ -public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest implements Serializable { - /** */ - private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - - /** Threads number for multi-thread tests. */ - private static final int THREADS = 10; - - /** */ - private final String TOPIC = "topic"; - - /** */ - private final String msgStr = "message"; - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - stopAllGrids(); - - super.afterTest(); - } - - /** - * Checks if use default mode, local listeners execute in the same thread, 1 node in topology. - * - * @throws Exception If failed. - */ - public void testSendDefaultMode() throws Exception { - Ignite ignite1 = startGrid(1); - - send(ignite1.message(), msgStr, new IgniteBiInClosure<String, Thread> () { - @Override public void apply(String msg, Thread thread) { - Assert.assertEquals(Thread.currentThread(), thread); - Assert.assertEquals(msgStr, msg); - } - }); - } - - /** - * Checks if use async mode, local listeners execute in another thread, 1 node in topology. - * - * @throws Exception If failed. - */ - public void testSendAsyncMode() throws Exception { - Ignite ignite1 = startGrid(1); - - send(ignite1.message().withAsync(), msgStr, new IgniteBiInClosure<String, Thread> () { - @Override public void apply(String msg, Thread thread) { - Assert.assertTrue(!Thread.currentThread().equals(thread)); - Assert.assertEquals(msgStr, msg); - } - }); - } - - /** - * Checks if use default mode, local listeners execute in the same thread, 2 nodes in topology. - * - * @throws Exception If failed. - */ - public void testSendDefaultMode2Nodes() throws Exception { - Ignite ignite1 = startGrid(1); - Ignite ignite2 = startGrid(2); - - sendWith2Nodes(ignite2, ignite1.message(), msgStr, new IgniteBiInClosure<String, Thread> () { - @Override public void apply(String msg, Thread thread) { - Assert.assertEquals(Thread.currentThread(), thread); - Assert.assertEquals(msgStr, msg); - } - }); - } - - /** - * Checks if use async mode, local listeners execute in another thread, 2 nodes in topology. - * - * @throws Exception If failed. - */ - public void testSendAsyncMode2Node() throws Exception { - Ignite ignite1 = startGrid(1); - Ignite ignite2 = startGrid(2); - - sendWith2Nodes(ignite2, ignite1.message().withAsync(), msgStr, new IgniteBiInClosure<String, Thread> () { - @Override public void apply(String msg, Thread thread) { - Assert.assertTrue(!Thread.currentThread().equals(thread)); - Assert.assertEquals(msgStr, msg); - } - }); - } - - /** - * Checks that sendOrdered works in thread pool, 1 node in topology. - * - * @throws Exception If failed. - */ - public void testSendOrderedDefaultMode() throws Exception { - Ignite ignite1 = startGrid(1); - - final List<String> msgs = orderedMessages(); - - sendOrdered(ignite1.message(), msgs, new IgniteBiInClosure< List<String>, List<Thread>> () { - @Override public void apply(List<String> received, List<Thread> threads) { - assertFalse(threads.contains(Thread.currentThread())); - assertTrue(msgs.equals(received)); - } - }); - } - - /** - * Checks that sendOrdered work in thread pool, 1 node in topology. - * - * @throws Exception If failed. - */ - public void testSendOrderedAsyncMode() throws Exception { - Ignite ignite1 = startGrid(1); - - final List<String> msgs = orderedMessages(); - - sendOrdered(ignite1.message().withAsync(), msgs, new IgniteBiInClosure< List<String>, List<Thread>> () { - @Override public void apply(List<String> received, List<Thread> threads) { - assertFalse(threads.contains(Thread.currentThread())); - assertTrue(msgs.equals(received)); - } - }); - } - - /** - * Checks that sendOrdered work in thread pool, 2 nodes in topology. - * - * @throws Exception If failed. - */ - public void testSendOrderedDefaultMode2Node() throws Exception { - Ignite ignite1 = startGrid(1); - Ignite ignite2 = startGrid(2); - - final List<String> msgs = orderedMessages(); - - sendOrderedWith2Node(ignite2, ignite1.message(), msgs, new IgniteBiInClosure<List<String>, List<Thread>>() { - @Override public void apply(List<String> received, List<Thread> threads) { - assertFalse(threads.contains(Thread.currentThread())); - assertTrue(msgs.equals(received)); - } - }); - } - - /** - * Checks that sendOrdered work in thread pool, 2 nodes in topology. - * - * @throws Exception If failed. - */ - public void testSendOrderedAsyncMode2Node() throws Exception { - Ignite ignite1 = startGrid(1); - Ignite ignite2 = startGrid(2); - - final List<String> msgs = orderedMessages(); - - sendOrderedWith2Node(ignite2, ignite1.message().withAsync(), msgs, new IgniteBiInClosure<List<String>, List<Thread>>() { - @Override public void apply(List<String> received, List<Thread> threads) { - assertFalse(threads.contains(Thread.currentThread())); - assertTrue(msgs.equals(received)); - } - }); - } - - /** - * @throws Exception If failed. - */ - public void testSendOrderedDefaultModeMultiThreads() throws Exception { - Ignite ignite = startGrid(1); - - sendOrderedMultiThreads(ignite.message()); - } - - /** - * @throws Exception If failed. - */ - public void testSendOrderedAsyncModeMultiThreads() throws Exception { - Ignite ignite = startGrid(1); - - sendOrderedMultiThreads(ignite.message().withAsync()); - } - - /** - * @throws Exception If failed. - */ - public void testSendOrderedDefaultModeMultiThreadsWith2Node() throws Exception { - Ignite ignite1 = startGrid(1); - Ignite ignite2 = startGrid(2); - - sendOrderedMultiThreadsWith2Node(ignite2, ignite1.message()); - } - - /** - * @throws Exception If failed. - */ - public void testSendOrderedAsyncModeMultiThreadsWith2Node() throws Exception { - Ignite ignite1 = startGrid(1); - Ignite ignite2 = startGrid(2); - - sendOrderedMultiThreadsWith2Node(ignite2, ignite1.message().withAsync()); - } - - /** - * @param ignite2 Second node. - * @param ignMsg IgniteMessage. - * @throws Exception If failed. - */ - private void sendOrderedMultiThreadsWith2Node( - final Ignite ignite2, - final IgniteMessaging ignMsg - ) throws Exception { - final ConcurrentMap<String, List<String>> expMsg = Maps.newConcurrentMap(); - final ConcurrentMap<String, List<String>> actlMsg = Maps.newConcurrentMap(); - - final List<String> msgs = orderedMessages(); - - sendOrderedMultiThreadsWith2Node(ignite2, ignMsg, expMsg, actlMsg, msgs); - - } - - /** - * @param ignMsg IgniteMessaging. - * @throws Exception If failed. - */ - private void sendOrderedMultiThreads( - final IgniteMessaging ignMsg - ) throws Exception { - final ConcurrentMap<String, List<String>> expMsg = Maps.newConcurrentMap(); - final ConcurrentMap<String, List<String>> actlMsg = Maps.newConcurrentMap(); - - final List<String> msgs = orderedMessages(); - - sendOrderedMultiThreads(ignMsg, expMsg, actlMsg, msgs); - } - - /** - * @param ignite2 Second node. - * @param ignMsg Ignite for send message. - * @param expMsg Expected messages map. - * @param actlMsg Actual message map. - * @param msgs List of messages. - * @throws Exception If failed. - */ - private void sendOrderedMultiThreadsWith2Node( - final Ignite ignite2, - final IgniteMessaging ignMsg, - final ConcurrentMap<String, List<String>> expMsg, - final ConcurrentMap<String, List<String>> actlMsg, - final List<String> msgs - ) throws Exception { - final CountDownLatch latch = new CountDownLatch(THREADS * msgs.size()); - - final ConcurrentMap<String, List<String>> actlMsgNode2 = Maps.newConcurrentMap(); - - ignite2.message().localListen(TOPIC, new IgniteBiPredicate<UUID, Message>() { - @Override public boolean apply(UUID uuid, Message msg) { - actlMsgNode2.putIfAbsent(msg.threadName, Lists.<String>newArrayList()); - - actlMsgNode2.get(msg.threadName).add(msg.msg); - - latch.countDown(); - - return true; - } - }); - - sendOrderedMultiThreads(ignMsg, expMsg, actlMsg, msgs); - - latch.await(); - - assertEquals(expMsg.size(), actlMsgNode2.size()); - - for (Map.Entry<String, List<String>> entry : expMsg.entrySet()) - assertTrue(actlMsgNode2.get(entry.getKey()).equals(entry.getValue())); - } - - /** - * @param ignMsg Ignite for send message. - * @param expMsg Expected messages map. - * @param actlMsg Actual message map. - * @param msgs List of messages. - * @throws Exception If failed. - */ - private void sendOrderedMultiThreads( - final IgniteMessaging ignMsg, - final ConcurrentMap<String, List<String>> expMsg, - final ConcurrentMap<String, List<String>> actlMsg, - final List<String> msgs - ) throws Exception { - final CountDownLatch latch = new CountDownLatch(THREADS * msgs.size()); - - ignMsg.localListen(TOPIC, new IgniteBiPredicate<UUID, Message>() { - @Override public boolean apply(UUID uuid, Message msg) { - actlMsg.putIfAbsent(msg.threadName, Lists.<String>newArrayList()); - - actlMsg.get(msg.threadName).add(msg.msg); - - latch.countDown(); - - return true; - } - }); - - for (int i = 0; i < THREADS; i++) - new Thread(new Runnable() { - @Override public void run() { - String thdName = Thread.currentThread().getName(); - - List<String> exp = Lists.newArrayList(); - - expMsg.put(thdName, exp); - - for (String msg : msgs) { - exp.add(msg); - - ignMsg.sendOrdered(TOPIC, new Message(thdName, msg), 1000); - } - - } - }).start(); - - latch.await(); - - assertEquals(expMsg.size(), actlMsg.size()); - - for (Map.Entry<String, List<String>> entry : expMsg.entrySet()) - assertTrue(actlMsg.get(entry.getKey()).equals(entry.getValue())); - } - - /** - * @param ignite2 Second node. - * @param igniteMsg Ignite message. - * @param msgStr Message string. - * @param cls Callback for compare result. - * @throws Exception If failed. - */ - private void sendWith2Nodes( - final Ignite ignite2, - final IgniteMessaging igniteMsg, - final String msgStr, - final IgniteBiInClosure<String, Thread> cls - ) throws Exception { - final CountDownLatch latch = new CountDownLatch(1); - - ignite2.message().localListen(TOPIC, new IgniteBiPredicate<UUID, String>() { - @Override public boolean apply(UUID uuid, String msg) { - Assert.assertEquals(msgStr, msg); - - latch.countDown(); - - return true; - } - }); - - send(igniteMsg, msgStr, cls); - - latch.await(); - } - - /** - * @param igniteMsg Ignite messaging. - * @param msgStr Message string. - * @param cls Callback for compare result. - * @throws Exception If failed. - */ - private void send( - final IgniteMessaging igniteMsg, - final String msgStr, - final IgniteBiInClosure<String, Thread> cls - ) throws Exception { - final CountDownLatch latch = new CountDownLatch(1); - - final AtomicReference<Thread> thread = new AtomicReference<>(); - final AtomicReference<String> val = new AtomicReference<>(); - - igniteMsg.localListen(TOPIC, new IgniteBiPredicate<UUID, String>() { - @Override public boolean apply(UUID uuid, String msgStr) { - thread.set(Thread.currentThread()); - - val.set(msgStr); - - latch.countDown(); - - return true; - } - }); - - igniteMsg.send(TOPIC, msgStr); - - latch.await(); - - cls.apply(val.get(), thread.get()); - } - - /** - * @param ignite2 Second node. - * @param igniteMsg Ignite message. - * @param msgs messages for send. - * @param cls Callback for compare result. - * @throws Exception If failed. - */ - private void sendOrderedWith2Node( - final Ignite ignite2, - final IgniteMessaging igniteMsg, - final List<String> msgs, - final IgniteBiInClosure<List<String>, List<Thread>> cls - ) throws Exception { - final CountDownLatch latch = new CountDownLatch(msgs.size()); - - final List<String> received = Lists.newArrayList(); - - ignite2.message().localListen(TOPIC, new IgniteBiPredicate<UUID, String>() { - @Override public boolean apply(UUID uuid, String msg) { - received.add(msg); - - latch.countDown(); - - return true; - } - }); - - sendOrdered(igniteMsg, msgs, cls); - - latch.await(); - - assertTrue(msgs.equals(received)); - } - - /** - * @param igniteMsg Ignite message. - * @param msgs messages for send. - * @param cls Callback for compare result. - * @throws Exception If failed. - */ - private<T> void sendOrdered( - final IgniteMessaging igniteMsg, - final List<T> msgs, - final IgniteBiInClosure<List<T>,List<Thread>> cls - ) throws Exception { - final CountDownLatch latch = new CountDownLatch(msgs.size()); - - final List<T> received = Lists.newArrayList(); - final List<Thread> threads = Lists.newArrayList(); - - for (T msg : msgs) - igniteMsg.sendOrdered(TOPIC, msg, 1000); - - igniteMsg.localListen(TOPIC, new IgniteBiPredicate<UUID, T>() { - @Override public boolean apply(UUID uuid, T s) { - received.add(s); - - threads.add(Thread.currentThread()); - - latch.countDown(); - - return true; - } - }); - - latch.await(); - - cls.apply(received, threads); - } - - /** - * @return List of ordered messages - */ - private List<String> orderedMessages() { - final List<String> msgs = Lists.newArrayList(); - - for (int i = 0; i < 1000; i++) - msgs.add(String.valueOf(ThreadLocalRandom8.current().nextInt())); - - return msgs; - } - - /** - * - */ - private static class Message implements Serializable{ - /** Thread name. */ - private final String threadName; - - /** Message. */ - private final String msg; - - /** - * @param threadName Thread name. - * @param msg Message. - */ - private Message(String threadName, String msg) { - this.threadName = threadName; - this.msg = msg; - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/9d811f1b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java index 688edf7..9e20d2a 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java @@ -56,7 +56,6 @@ import org.apache.ignite.marshaller.DynamicProxySerializationMultiJvmSelfTest; import org.apache.ignite.marshaller.MarshallerContextSelfTest; import org.apache.ignite.messaging.GridMessagingNoPeerClassLoadingSelfTest; import org.apache.ignite.messaging.GridMessagingSelfTest; -import org.apache.ignite.messaging.IgniteMessagingSendAsyncTest; import org.apache.ignite.messaging.IgniteMessagingWithClientTest; import org.apache.ignite.plugin.security.SecurityPermissionSetBuilderTest; import org.apache.ignite.spi.GridSpiLocalHostInjectionTest; @@ -102,7 +101,6 @@ public class IgniteBasicTestSuite extends TestSuite { suite.addTest(new TestSuite(GridSelfTest.class)); suite.addTest(new TestSuite(ClusterGroupHostsSelfTest.class)); suite.addTest(new TestSuite(IgniteMessagingWithClientTest.class)); - suite.addTest(new TestSuite(IgniteMessagingSendAsyncTest.class)); GridTestUtils.addTestIfNeeded(suite, ClusterGroupSelfTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, GridMessagingSelfTest.class, ignoredTests); http://git-wip-us.apache.org/repos/asf/ignite/blob/9d811f1b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java index 3db68c4..10f18a6 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java @@ -147,7 +147,7 @@ public class HadoopShuffle extends HadoopComponent { if (msg instanceof Message) ctx.kernalContext().io().send(node, GridTopic.TOPIC_HADOOP_MSG, (Message)msg, GridIoPolicy.PUBLIC_POOL); else - ctx.kernalContext().io().sendUserMessage(F.asList(node), msg, GridTopic.TOPIC_HADOOP, false, 0, false); + ctx.kernalContext().io().sendUserMessage(F.asList(node), msg, GridTopic.TOPIC_HADOOP, false, 0); } /** @@ -298,4 +298,4 @@ public class HadoopShuffle extends HadoopComponent { public GridUnsafeMemory memory() { return mem; } -} \ No newline at end of file +}
