IGNITE-3727 send message, local listeners will execute through thread pool
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7e604242 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7e604242 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7e604242 Branch: refs/heads/ignite-3727-2 Commit: 7e6042420b804fffaed771767ba1f00e8bb5413d Parents: ccae9b0 Author: DmitriyGovorukhin <[email protected]> Authored: Thu Sep 8 15:42:35 2016 +0300 Committer: DmitriyGovorukhin <[email protected]> Committed: Thu Sep 8 15:42:35 2016 +0300 ---------------------------------------------------------------------- .../managers/communication/GridIoManager.java | 23 ++++++++++---------- 1 file changed, 12 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/7e604242/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 3fdda30..9d40bf9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -665,8 +665,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa case UTILITY_CACHE_POOL: case MARSH_CACHE_POOL: case IDX_POOL: - case IGFS_POOL: - { + case IGFS_POOL: { if (msg.isOrdered()) processOrderedMessage(nodeId, msg, plc, msgC); else @@ -831,7 +830,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa finally { threadProcessingMessage(false); - msgC.run(); + if (msgC != null) + msgC.run(); } } }; @@ -1290,7 +1290,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (ordered) processOrderedMessage(locNodeId, ioMsg, plc, null); else - processRegularMessage0(ioMsg, locNodeId); + processRegularMessage(locNodeId, ioMsg, plc, null); if (ackC != null) ackC.apply(null); @@ -1540,7 +1540,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackC); } - /** + /** * Sends a peer deployable user message. * * @param nodes Destination nodes. @@ -1775,7 +1775,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa GridMessageListener lsnrs; - for (;;) { + for (; ; ) { lsnrs = listenerPutIfAbsent0(topic, lsnr); if (lsnrs == null) { @@ -1888,7 +1888,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa msgSets = map.values(); } else { - for (;;) { + for (; ; ) { GridMessageListener lsnrs = listenerGet0(topic); // If removing listener before subscription happened. @@ -1952,8 +1952,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (rmv && log.isDebugEnabled()) log.debug("Removed message listener [topic=" + topic + ", lsnr=" + lsnr + ']'); - if (lsnr instanceof ArrayListener) - { + if (lsnr instanceof ArrayListener) { for (GridMessageListener childLsnr : ((ArrayListener)lsnr).arr) closeListener(childLsnr); } @@ -1965,6 +1964,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa /** * Closes a listener, if applicable. + * * @param lsnr Listener. */ private void closeListener(GridMessageListener lsnr) { @@ -2124,7 +2124,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa /** * @param l Listener. * @return {@code true} if listener was added. Add can fail if this instance is empty and is about to be removed - * from map. + * from map. */ synchronized boolean add(GridMessageListener l) { GridMessageListener[] arr0 = arr; @@ -2166,7 +2166,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa } /** {@inheritDoc} */ - @SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter", "ConstantConditions", + @SuppressWarnings({ + "SynchronizationOnLocalVariableOrMethodParameter", "ConstantConditions", "OverlyStrongTypeCast"}) @Override public void onMessage(UUID nodeId, Object msg) { if (!(msg instanceof GridIoUserMessage)) {
