http://git-wip-us.apache.org/repos/asf/ignite/blob/91e83407/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java ----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/91e83407/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 3d9238a,7ef7bc0..cda1321 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@@ -830,14 -785,36 +784,37 @@@ public class GridIoManager extends Grid finally { threadProcessingMessage(false); - msgC.run(); + if (msgC != null) + msgC.run(); } } + + @Override public String toString() { + return "Message closure [msg=" + msg + ']'; + } }; + if (msg.topicOrdinal() == TOPIC_IO_TEST.ordinal()) { + IgniteIoTestMessage msg0 = (IgniteIoTestMessage)msg.message(); + + if (msg0.processFromNioThread()) { + c.run(); + + return; + } + } + + if (ctx.config().getStripedPoolSize() > 0 && + plc == GridIoPolicy.SYSTEM_POOL && + msg.partition() != Integer.MIN_VALUE + ) { + ctx.getStripedExecutorService().execute(msg.partition(), c); + + return; + } + try { - pool(plc).execute(c); + pools.poolForPolicy(plc).execute(c); } catch (RejectedExecutionException e) { U.error(log, "Failed to process regular message due to execution rejection. Increase the upper bound " + http://git-wip-us.apache.org/repos/asf/ignite/blob/91e83407/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java index 3eb7e5f,9e20d2a..688edf7 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java @@@ -51,10 -53,11 +53,12 @@@ import org.apache.ignite.internal.produ import org.apache.ignite.internal.util.nio.IgniteExceptionInNioWorkerSelfTest; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.marshaller.DynamicProxySerializationMultiJvmSelfTest; + import org.apache.ignite.marshaller.MarshallerContextSelfTest; import org.apache.ignite.messaging.GridMessagingNoPeerClassLoadingSelfTest; import org.apache.ignite.messaging.GridMessagingSelfTest; +import org.apache.ignite.messaging.IgniteMessagingSendAsyncTest; import org.apache.ignite.messaging.IgniteMessagingWithClientTest; + import org.apache.ignite.plugin.security.SecurityPermissionSetBuilderTest; import org.apache.ignite.spi.GridSpiLocalHostInjectionTest; import org.apache.ignite.startup.properties.NotStringSystemPropertyTest; import org.apache.ignite.testframework.GridTestUtils; http://git-wip-us.apache.org/repos/asf/ignite/blob/91e83407/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java ---------------------------------------------------------------------- diff --cc modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java index d1b9eaa,8ffea8c..3db68c4 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java @@@ -117,7 -144,10 +144,10 @@@ public class HadoopShuffle extends Hado private void send0(UUID nodeId, Object msg) throws IgniteCheckedException { ClusterNode node = ctx.kernalContext().discovery().node(nodeId); - ctx.kernalContext().io().sendUserMessage(F.asList(node), msg, GridTopic.TOPIC_HADOOP, false, 0, false); + if (msg instanceof Message) + ctx.kernalContext().io().send(node, GridTopic.TOPIC_HADOOP_MSG, (Message)msg, GridIoPolicy.PUBLIC_POOL); + else - ctx.kernalContext().io().sendUserMessage(F.asList(node), msg, GridTopic.TOPIC_HADOOP, false, 0); ++ ctx.kernalContext().io().sendUserMessage(F.asList(node), msg, GridTopic.TOPIC_HADOOP, false, 0, false); } /**
