IGNITE-3727 send message, local listeners will execute through thread pool

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7e604242
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7e604242
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7e604242

Branch: refs/heads/ignite-3727-2
Commit: 7e6042420b804fffaed771767ba1f00e8bb5413d
Parents: ccae9b0
Author: DmitriyGovorukhin <[email protected]>
Authored: Thu Sep 8 15:42:35 2016 +0300
Committer: DmitriyGovorukhin <[email protected]>
Committed: Thu Sep 8 15:42:35 2016 +0300

----------------------------------------------------------------------
 .../managers/communication/GridIoManager.java   | 23 ++++++++++----------
 1 file changed, 12 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7e604242/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 3fdda30..9d40bf9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -665,8 +665,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
                 case UTILITY_CACHE_POOL:
                 case MARSH_CACHE_POOL:
                 case IDX_POOL:
-                case IGFS_POOL:
-                {
+                case IGFS_POOL: {
                     if (msg.isOrdered())
                         processOrderedMessage(nodeId, msg, plc, msgC);
                     else
@@ -831,7 +830,8 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
                 finally {
                     threadProcessingMessage(false);
 
-                    msgC.run();
+                    if (msgC != null)
+                        msgC.run();
                 }
             }
         };
@@ -1290,7 +1290,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
             if (ordered)
                 processOrderedMessage(locNodeId, ioMsg, plc, null);
             else
-                processRegularMessage0(ioMsg, locNodeId);
+                processRegularMessage(locNodeId, ioMsg, plc, null);
 
             if (ackC != null)
                 ackC.apply(null);
@@ -1540,7 +1540,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
         send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, 
ackC);
     }
 
-     /**
+    /**
      * Sends a peer deployable user message.
      *
      * @param nodes Destination nodes.
@@ -1775,7 +1775,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
 
         GridMessageListener lsnrs;
 
-        for (;;) {
+        for (; ; ) {
             lsnrs = listenerPutIfAbsent0(topic, lsnr);
 
             if (lsnrs == null) {
@@ -1888,7 +1888,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
                 msgSets = map.values();
         }
         else {
-            for (;;) {
+            for (; ; ) {
                 GridMessageListener lsnrs = listenerGet0(topic);
 
                 // If removing listener before subscription happened.
@@ -1952,8 +1952,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
         if (rmv && log.isDebugEnabled())
             log.debug("Removed message listener [topic=" + topic + ", lsnr=" + 
lsnr + ']');
 
-        if (lsnr instanceof ArrayListener)
-        {
+        if (lsnr instanceof ArrayListener) {
             for (GridMessageListener childLsnr : ((ArrayListener)lsnr).arr)
                 closeListener(childLsnr);
         }
@@ -1965,6 +1964,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
 
     /**
      * Closes a listener, if applicable.
+     *
      * @param lsnr Listener.
      */
     private void closeListener(GridMessageListener lsnr) {
@@ -2124,7 +2124,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
         /**
          * @param l Listener.
          * @return {@code true} if listener was added. Add can fail if this 
instance is empty and is about to be removed
-         *         from map.
+         * from map.
          */
         synchronized boolean add(GridMessageListener l) {
             GridMessageListener[] arr0 = arr;
@@ -2166,7 +2166,8 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
         }
 
         /** {@inheritDoc} */
-        @SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter", 
"ConstantConditions",
+        @SuppressWarnings({
+            "SynchronizationOnLocalVariableOrMethodParameter", 
"ConstantConditions",
             "OverlyStrongTypeCast"})
         @Override public void onMessage(UUID nodeId, Object msg) {
             if (!(msg instanceof GridIoUserMessage)) {

Reply via email to