Repository: ignite
Updated Branches:
  refs/heads/ignite-5075 0a98fc657 -> b53950ac9


ignite-5075


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b53950ac
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b53950ac
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b53950ac

Branch: refs/heads/ignite-5075
Commit: b53950ac9ab815072c5bc2005a626ea7af2b198d
Parents: 0a98fc6
Author: sboikov <[email protected]>
Authored: Fri May 12 13:20:27 2017 +0300
Committer: sboikov <[email protected]>
Committed: Fri May 12 13:20:27 2017 +0300

----------------------------------------------------------------------
 .../cache/CacheAffinitySharedManager.java       |   8 +-
 .../cache/CacheGroupInfrastructure.java         |   4 +-
 .../cache/GridCacheGroupIdMessage.java          |   5 +
 .../processors/cache/GridCacheIdMessage.java    |   5 +
 .../processors/cache/GridCacheIoManager.java    | 142 ++++++++++++++-----
 .../processors/cache/GridCacheMessage.java      |   5 +
 .../GridCachePartitionExchangeManager.java      |  16 +--
 .../cache/GridCacheSharedContext.java           |   4 +-
 .../GridChangeGlobalStateMessageResponse.java   |   5 +
 .../GridDistributedTxFinishResponse.java        |   5 +
 .../distributed/dht/GridDhtCacheAdapter.java    |   2 +-
 .../dht/GridDhtTransactionalCacheAdapter.java   |  14 +-
 .../dht/GridDhtTxOnePhaseCommitAckRequest.java  |   5 +
 .../dht/atomic/GridDhtAtomicCache.java          |  23 ++-
 .../dht/colocated/GridDhtColocatedCache.java    |   6 +-
 .../GridDhtPartitionsAbstractMessage.java       |   5 +
 .../dht/preloader/GridDhtPreloader.java         |   4 +-
 .../distributed/near/GridNearAtomicCache.java   |   2 +-
 .../near/GridNearTransactionalCache.java        |   4 +-
 .../query/GridCacheDistributedQueryManager.java |  10 +-
 .../continuous/CacheContinuousQueryManager.java |   2 +-
 .../cache/transactions/IgniteTxHandler.java     |  22 +--
 .../cache/transactions/TxLocksRequest.java      |   5 +
 .../cache/transactions/TxLocksResponse.java     |   5 +
 .../cluster/GridClusterStateProcessor.java      |   4 +-
 .../GridCacheConditionalDeploymentSelfTest.java |   5 +
 .../processors/cache/IgniteCacheGroupsTest.java |   4 +-
 .../communication/GridCacheMessageSelfTest.java |  15 ++
 28 files changed, 242 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 11253fc..d9c6071 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -310,7 +310,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
         final Integer grpId = grp.groupId();
 
         if (!grpHolders.containsKey(grp.groupId())) {
-            cctx.io().addHandler(grpId, 
GridDhtAffinityAssignmentResponse.class,
+            cctx.io().addHandler(true, grpId, 
GridDhtAffinityAssignmentResponse.class,
                 new IgniteBiInClosure<UUID, 
GridDhtAffinityAssignmentResponse>() {
                     @Override public void apply(UUID nodeId, 
GridDhtAffinityAssignmentResponse res) {
                         processAffinityAssignmentResponse(grpId, nodeId, res);
@@ -469,7 +469,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
 
                     stoppedGrps.add(cacheGrp.groupId());
 
-                    cctx.io().removeHandler(cacheGrp.groupId(), 
GridDhtAffinityAssignmentResponse.class);
+                    cctx.io().removeHandler(true, cacheGrp.groupId(), 
GridDhtAffinityAssignmentResponse.class);
                 }
             }
         }
@@ -1139,7 +1139,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                 CacheGroupInfrastructure grp = cctx.cache().cacheGroup(grpId);
 
                 if (grp == null) {
-                    cctx.io().addHandler(desc.groupId(), 
GridDhtAffinityAssignmentResponse.class,
+                    cctx.io().addHandler(true, desc.groupId(), 
GridDhtAffinityAssignmentResponse.class,
                         new IgniteBiInClosure<UUID, 
GridDhtAffinityAssignmentResponse>() {
                             @Override public void apply(UUID nodeId, 
GridDhtAffinityAssignmentResponse res) {
                                 processAffinityAssignmentResponse(grpId, 
nodeId, res);
@@ -1231,7 +1231,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
         final CacheGroupInfrastructure grp = 
cctx.cache().cacheGroup(desc.groupId());
 
         if (grp == null) {
-            cctx.io().addHandler(desc.groupId(), 
GridDhtAffinityAssignmentResponse.class,
+            cctx.io().addHandler(true, desc.groupId(), 
GridDhtAffinityAssignmentResponse.class,
                 new IgniteBiInClosure<UUID, 
GridDhtAffinityAssignmentResponse>() {
                     @Override public void apply(UUID nodeId, 
GridDhtAffinityAssignmentResponse res) {
                         processAffinityAssignmentResponse(desc.groupId(), 
nodeId, res);

http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
index 1b5187e..94d4357 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
@@ -378,6 +378,8 @@ public class CacheGroupInfrastructure {
      */
     void stopGroup() {
         offheapMgr.stop();
+
+        ctx.io().removeCacheGroupHandlers(grpId);
     }
 
     /**
@@ -406,7 +408,7 @@ public class CacheGroupInfrastructure {
             top = new GridDhtPartitionTopologyImpl(ctx, this, entryFactory);
 
             if (!ctx.kernalContext().clientNode()) {
-                ctx.io().addHandler(groupId(), 
GridDhtAffinityAssignmentRequest.class,
+                ctx.io().addHandler(true, groupId(), 
GridDhtAffinityAssignmentRequest.class,
                     new IgniteBiInClosure<UUID, 
GridDhtAffinityAssignmentRequest>() {
                         @Override public void apply(UUID nodeId, 
GridDhtAffinityAssignmentRequest msg) {
                             processAffinityAssignmentRequest(nodeId, msg);

http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGroupIdMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGroupIdMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGroupIdMessage.java
index 29a978e..67ca115 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGroupIdMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGroupIdMessage.java
@@ -39,6 +39,11 @@ public abstract class GridCacheGroupIdMessage extends 
GridCacheMessage {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean cacheGroupMessage() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
     @Override public final int handlerId() {
         return grpId;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIdMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIdMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIdMessage.java
index 25a553b..f27f3c2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIdMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIdMessage.java
@@ -38,6 +38,11 @@ public abstract class GridCacheIdMessage extends 
GridCacheMessage {
         return cacheId;
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean cacheGroupMessage() {
+        return false;
+    }
+
     /**
      * @param cacheId Cache ID.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index fb2ac3d..d505825 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -110,16 +110,11 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
     /** Number of retries using to send messages. */
     private int retryCnt;
 
-    /** Indexed class handlers. */
-    private volatile Map<Integer, IgniteBiInClosure[]> idxClsHandlers = new 
HashMap<>();
+    /** */
+    private final MessageHandlers cacheHandlers = new MessageHandlers();
 
-    /** Handler registry. */
-    private ConcurrentMap<ListenerKey, IgniteBiInClosure<UUID, 
GridCacheMessage>>
-        clsHandlers = new ConcurrentHashMap8<>();
-
-    /** Ordered handler registry. */
-    private ConcurrentMap<Object, IgniteBiInClosure<UUID, ? extends 
GridCacheMessage>> orderedHandlers =
-        new ConcurrentHashMap8<>();
+    /** */
+    private final MessageHandlers grpHandlers = new MessageHandlers();
 
     /** Stopping flag. */
     private boolean stopping;
@@ -259,12 +254,22 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
      */
     @SuppressWarnings("unchecked")
     private void handleMessage(UUID nodeId, GridCacheMessage cacheMsg) {
+        handleMessage(nodeId, cacheMsg, cacheMsg.cacheGroupMessage() ? 
grpHandlers : cacheHandlers);
+    }
+
+    /**
+     * @param nodeId Sender node ID.
+     * @param cacheMsg Message.
+     * @param msgHandlers Message handlers.
+     */
+    @SuppressWarnings("unchecked")
+    private void handleMessage(UUID nodeId, GridCacheMessage cacheMsg, 
MessageHandlers msgHandlers) {
         int msgIdx = cacheMsg.lookupIndex();
 
         IgniteBiInClosure<UUID, GridCacheMessage> c = null;
 
         if (msgIdx >= 0) {
-            Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = idxClsHandlers;
+            Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = 
msgHandlers.idxClsHandlers;
 
             IgniteBiInClosure[] cacheClsHandlers = 
idxClsHandlers0.get(cacheMsg.handlerId());
 
@@ -273,7 +278,7 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
         }
 
         if (c == null)
-            c = clsHandlers.get(new ListenerKey(cacheMsg.handlerId(), 
cacheMsg.getClass()));
+            c = msgHandlers.clsHandlers.get(new 
ListenerKey(cacheMsg.handlerId(), cacheMsg.getClass()));
 
         if (c == null) {
             IgniteLogger log = cacheMsg.messageLogger(cctx);
@@ -289,7 +294,7 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
 
             msg0.append(U.nl()).append("Registered listeners:");
 
-            Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = idxClsHandlers;
+            Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = 
msgHandlers.idxClsHandlers;
 
             for (Map.Entry<Integer, IgniteBiInClosure[]> e : 
idxClsHandlers0.entrySet())
                 
msg0.append(U.nl()).append(e.getKey()).append("=").append(Arrays.toString(e.getValue()));
@@ -322,7 +327,10 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
     @Override protected void onKernalStop0(boolean cancel) {
         cctx.gridIO().removeMessageListener(TOPIC_CACHE);
 
-        for (Object ordTopic : orderedHandlers.keySet())
+        for (Object ordTopic : cacheHandlers.orderedHandlers.keySet())
+            cctx.gridIO().removeMessageListener(ordTopic);
+
+        for (Object ordTopic : grpHandlers.orderedHandlers.keySet())
             cctx.gridIO().removeMessageListener(ordTopic);
 
         boolean interrupted = false;
@@ -1180,19 +1188,35 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
     /**
      * Adds message handler.
      *
+     * @param cacheGrp {@code True} if cache group message, {@code false} if 
cache message.
      * @param hndId Message handler ID.
      * @param type Type of message.
      * @param c Handler.
      */
-    @SuppressWarnings({"unchecked"})
     public void addHandler(
+        boolean cacheGrp,
         int hndId,
         Class<? extends GridCacheMessage> type,
         IgniteBiInClosure<UUID, ? extends GridCacheMessage> c) {
+        addHandler(hndId, type, c, cacheGrp ? grpHandlers : cacheHandlers);
+    }
+
+    /**
+     * @param hndId Message handler ID.
+     * @param type Type of message.
+     * @param c Handler.
+     * @param msgHandlers Message handlers.
+     */
+    @SuppressWarnings({"unchecked"})
+    private void addHandler(
+        int hndId,
+        Class<? extends GridCacheMessage> type,
+        IgniteBiInClosure<UUID, ? extends GridCacheMessage> c,
+        MessageHandlers msgHandlers) {
         int msgIdx = messageIndex(type);
 
         if (msgIdx != -1) {
-            Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = idxClsHandlers;
+            Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = 
msgHandlers.idxClsHandlers;
 
             IgniteBiInClosure[] cacheClsHandlers = idxClsHandlers0.get(hndId);
 
@@ -1208,17 +1232,17 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
 
             cacheClsHandlers[msgIdx] = c;
 
-            idxClsHandlers = idxClsHandlers0;
+            msgHandlers.idxClsHandlers = idxClsHandlers0;
 
             return;
         }
         else {
             ListenerKey key = new ListenerKey(hndId, type);
 
-            if (clsHandlers.putIfAbsent(key,
+            if (msgHandlers.clsHandlers.putIfAbsent(key,
                 (IgniteBiInClosure<UUID, GridCacheMessage>)c) != null)
                 assert false : "Handler for class already registered [hndId=" 
+ hndId + ", cls=" + type +
-                    ", old=" + clsHandlers.get(key) + ", new=" + c + ']';
+                    ", old=" + msgHandlers.clsHandlers.get(key) + ", new=" + c 
+ ']';
         }
 
         IgniteLogger log0 = log;
@@ -1232,25 +1256,43 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
     /**
      * @param cacheId Cache ID to remove handlers for.
      */
-    public void removeHandlers(int cacheId) {
-        assert cacheId != 0;
+    void removeCacheHandlers(int cacheId) {
+        removeHandlers(cacheHandlers, cacheId);
+    }
+
+    /**
+     * @param grpId Cache group ID to remove handlers for.
+     */
+    void removeCacheGroupHandlers(int grpId) {
+        removeHandlers(grpHandlers, grpId);
+    }
 
-        idxClsHandlers.remove(cacheId);
+    /**
+     * @param msgHandlers Handlers.
+     * @param hndId ID to remove handlers for.
+     */
+    private void removeHandlers(MessageHandlers msgHandlers, int hndId) {
+        assert hndId != 0;
 
-        for (Iterator<ListenerKey> iter = clsHandlers.keySet().iterator(); 
iter.hasNext(); ) {
+        msgHandlers.idxClsHandlers.remove(hndId);
+
+        for (Iterator<ListenerKey> iter = 
msgHandlers.clsHandlers.keySet().iterator(); iter.hasNext(); ) {
             ListenerKey key = iter.next();
 
-            if (key.cacheId == cacheId)
+            if (key.hndId == hndId)
                 iter.remove();
         }
     }
 
     /**
-     * @param cacheId Cache ID to remove handlers for.
+     * @param cacheGrp {@code True} if cache group handler, {@code false} if 
cache handler.
+     * @param hndId Handler ID.
      * @param type Message type.
      */
-    public void removeHandler(int cacheId, Class<? extends GridCacheMessage> 
type) {
-        clsHandlers.remove(new ListenerKey(cacheId, type));
+    public void removeHandler(boolean cacheGrp, int hndId, Class<? extends 
GridCacheMessage> type) {
+        MessageHandlers msgHandlers = cacheGrp ? grpHandlers : cacheHandlers;
+
+        msgHandlers.clsHandlers.remove(new ListenerKey(hndId, type));
     }
 
     /**
@@ -1274,14 +1316,17 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
     /**
      * Adds ordered message handler.
      *
+     * @param cacheGrp {@code True} if cache group message, {@code false} if 
cache message.
      * @param topic Topic.
      * @param c Handler.
      */
     @SuppressWarnings({"unchecked"})
-    public void addOrderedHandler(Object topic, IgniteBiInClosure<UUID, ? 
extends GridCacheMessage> c) {
+    public void addOrderedHandler(boolean cacheGrp, Object topic, 
IgniteBiInClosure<UUID, ? extends GridCacheMessage> c) {
+        MessageHandlers msgHandlers = cacheGrp ? grpHandlers : cacheHandlers;
+
         IgniteLogger log0 = log;
 
-        if (orderedHandlers.putIfAbsent(topic, c) == null) {
+        if (msgHandlers.orderedHandlers.putIfAbsent(topic, c) == null) {
             cctx.gridIO().addMessageListener(topic, new OrderedMessageListener(
                 (IgniteBiInClosure<UUID, GridCacheMessage>)c));
 
@@ -1296,10 +1341,13 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
     /**
      * Removed ordered message handler.
      *
+     * @param cacheGrp {@code True} if cache group message, {@code false} if 
cache message.
      * @param topic Topic.
      */
-    public void removeOrderedHandler(Object topic) {
-        if (orderedHandlers.remove(topic) != null) {
+    public void removeOrderedHandler(boolean cacheGrp, Object topic) {
+        MessageHandlers msgHandlers = cacheGrp ? grpHandlers : cacheHandlers;
+
+        if (msgHandlers.orderedHandlers.remove(topic) != null) {
             cctx.gridIO().removeMessageListener(topic);
 
             if (log != null && log.isDebugEnabled())
@@ -1354,8 +1402,26 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
     @Override public void printMemoryStats() {
         X.println(">>> ");
         X.println(">>> Cache IO manager memory stats [igniteInstanceName=" + 
cctx.igniteInstanceName() + ']');
-        X.println(">>>   clsHandlersSize: " + clsHandlers.size());
-        X.println(">>>   orderedHandlersSize: " + orderedHandlers.size());
+        X.println(">>>   cacheClsHandlersSize: " + 
cacheHandlers.clsHandlers.size());
+        X.println(">>>   cacheOrderedHandlersSize: " + 
cacheHandlers.orderedHandlers.size());
+        X.println(">>>   cacheGrpClsHandlersSize: " + 
grpHandlers.clsHandlers.size());
+        X.println(">>>   cacheGrpOrderedHandlersSize: " + 
grpHandlers.orderedHandlers.size());
+    }
+
+    /**
+     *
+     */
+    static class MessageHandlers {
+        /** Indexed class handlers. */
+        volatile Map<Integer, IgniteBiInClosure[]> idxClsHandlers = new 
HashMap<>();
+
+        /** Handler registry. */
+        ConcurrentMap<ListenerKey, IgniteBiInClosure<UUID, GridCacheMessage>>
+            clsHandlers = new ConcurrentHashMap8<>();
+
+        /** Ordered handler registry. */
+        ConcurrentMap<Object, IgniteBiInClosure<UUID, ? extends 
GridCacheMessage>> orderedHandlers =
+            new ConcurrentHashMap8<>();
     }
 
     /**
@@ -1389,17 +1455,17 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
      */
     private static class ListenerKey {
         /** Cache ID. */
-        private int cacheId;
+        private int hndId;
 
         /** Message class. */
         private Class<? extends GridCacheMessage> msgCls;
 
         /**
-         * @param cacheId Cache ID.
+         * @param hndId Handler ID.
          * @param msgCls Message class.
          */
-        private ListenerKey(int cacheId, Class<? extends GridCacheMessage> 
msgCls) {
-            this.cacheId = cacheId;
+        private ListenerKey(int hndId, Class<? extends GridCacheMessage> 
msgCls) {
+            this.hndId = hndId;
             this.msgCls = msgCls;
         }
 
@@ -1413,12 +1479,12 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
 
             ListenerKey that = (ListenerKey)o;
 
-            return cacheId == that.cacheId && msgCls.equals(that.msgCls);
+            return hndId == that.hndId && msgCls.equals(that.msgCls);
         }
 
         /** {@inheritDoc} */
         @Override public int hashCode() {
-            int res = cacheId;
+            int res = hndId;
 
             res = 31 * res + msgCls.hashCode();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
index ec5efad..6578bc8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
@@ -90,6 +90,11 @@ public abstract class GridCacheMessage implements Message {
     public abstract int handlerId();
 
     /**
+     * @return {@code True} if cache group message.
+     */
+    public abstract boolean cacheGroupMessage();
+
+    /**
      * @return Error, if any.
      */
     @Nullable public Throwable error() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 98ad758..a666297 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -308,21 +308,21 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
         cctx.gridEvents().addDiscoveryEventListener(discoLsnr, 
EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED,
             EVT_DISCOVERY_CUSTOM_EVT);
 
-        cctx.io().addHandler(0, GridDhtPartitionsSingleMessage.class,
+        cctx.io().addHandler(false, 0, GridDhtPartitionsSingleMessage.class,
             new MessageHandler<GridDhtPartitionsSingleMessage>() {
                 @Override public void onMessage(ClusterNode node, 
GridDhtPartitionsSingleMessage msg) {
                     processSinglePartitionUpdate(node, msg);
                 }
             });
 
-        cctx.io().addHandler(0, GridDhtPartitionsFullMessage.class,
+        cctx.io().addHandler(false, 0, GridDhtPartitionsFullMessage.class,
             new MessageHandler<GridDhtPartitionsFullMessage>() {
                 @Override public void onMessage(ClusterNode node, 
GridDhtPartitionsFullMessage msg) {
                     processFullPartitionUpdate(node, msg);
                 }
             });
 
-        cctx.io().addHandler(0, GridDhtPartitionsSingleRequest.class,
+        cctx.io().addHandler(false, 0, GridDhtPartitionsSingleRequest.class,
             new MessageHandler<GridDhtPartitionsSingleRequest>() {
                 @Override public void onMessage(ClusterNode node, 
GridDhtPartitionsSingleRequest msg) {
                     processSinglePartitionRequest(node, msg);
@@ -381,7 +381,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
             for (int cnt = 0; cnt < 
cctx.gridConfig().getRebalanceThreadPoolSize(); cnt++) {
                 final int idx = cnt;
 
-                cctx.io().addOrderedHandler(rebalanceTopic(cnt), new CI2<UUID, 
GridCacheMessage>() {
+                cctx.io().addOrderedHandler(true, rebalanceTopic(cnt), new 
CI2<UUID, GridCacheMessage>() {
                     @Override public void apply(final UUID id, final 
GridCacheMessage m) {
                         if (!enterBusy())
                             return;
@@ -498,9 +498,9 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
     @Override protected void onKernalStop0(boolean cancel) {
         cctx.gridEvents().removeDiscoveryEventListener(discoLsnr);
 
-        cctx.io().removeHandler(0, GridDhtPartitionsSingleMessage.class);
-        cctx.io().removeHandler(0, GridDhtPartitionsFullMessage.class);
-        cctx.io().removeHandler(0, GridDhtPartitionsSingleRequest.class);
+        cctx.io().removeHandler(false, 0, 
GridDhtPartitionsSingleMessage.class);
+        cctx.io().removeHandler(false, 0, GridDhtPartitionsFullMessage.class);
+        cctx.io().removeHandler(false, 0, 
GridDhtPartitionsSingleRequest.class);
 
         stopErr = cctx.kernalContext().clientDisconnected() ?
             new 
IgniteClientDisconnectedCheckedException(cctx.kernalContext().cluster().clientReconnectFuture(),
@@ -520,7 +520,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
 
         if (!cctx.kernalContext().clientNode()) {
             for (int cnt = 0; cnt < 
cctx.gridConfig().getRebalanceThreadPoolSize(); cnt++)
-                cctx.io().removeOrderedHandler(rebalanceTopic(cnt));
+                cctx.io().removeOrderedHandler(true, rebalanceTopic(cnt));
         }
 
         U.cancel(exchWorker);

http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 55f3c42..6be440d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -404,7 +404,7 @@ public class GridCacheSharedContext<K, V> {
     /**
      * @param cacheCtx Cache context to remove.
      */
-    public void removeCacheContext(GridCacheContext cacheCtx) {
+    void removeCacheContext(GridCacheContext cacheCtx) {
         int cacheId = cacheCtx.cacheId();
 
         ctxMap.remove(cacheId, cacheCtx);
@@ -415,7 +415,7 @@ public class GridCacheSharedContext<K, V> {
             locStoreCnt.decrementAndGet();
 
         // Safely clean up the message listeners.
-        ioMgr.removeHandlers(cacheId);
+        ioMgr.removeCacheHandlers(cacheId);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridChangeGlobalStateMessageResponse.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridChangeGlobalStateMessageResponse.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridChangeGlobalStateMessageResponse.java
index bfe6eee..a64c8bb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridChangeGlobalStateMessageResponse.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridChangeGlobalStateMessageResponse.java
@@ -60,6 +60,11 @@ public class GridChangeGlobalStateMessageResponse extends 
GridCacheMessage {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean cacheGroupMessage() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Override public int handlerId() {
         return 0;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java
index 79db810..c36e633 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java
@@ -76,6 +76,11 @@ public class GridDistributedTxFinishResponse extends 
GridCacheMessage {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean cacheGroupMessage() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Override public final int partition() {
         return part;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index d90ee6c..f58e0df 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -171,7 +171,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
     @Override public void start() throws IgniteCheckedException {
         super.start();
 
-        ctx.io().addHandler(ctx.cacheId(), GridCacheTtlUpdateRequest.class, 
new CI2<UUID, GridCacheTtlUpdateRequest>() {
+        ctx.io().addHandler(false, ctx.cacheId(), 
GridCacheTtlUpdateRequest.class, new CI2<UUID, GridCacheTtlUpdateRequest>() {
             @Override public void apply(UUID nodeId, GridCacheTtlUpdateRequest 
req) {
                 processTtlUpdateRequest(req);
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index 9b61f14..8f46f89 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -118,43 +118,43 @@ public abstract class GridDhtTransactionalCacheAdapter<K, 
V> extends GridDhtCach
     @Override public void start() throws IgniteCheckedException {
         super.start();
 
-        ctx.io().addHandler(ctx.cacheId(), GridNearGetRequest.class, new 
CI2<UUID, GridNearGetRequest>() {
+        ctx.io().addHandler(false, ctx.cacheId(), GridNearGetRequest.class, 
new CI2<UUID, GridNearGetRequest>() {
             @Override public void apply(UUID nodeId, GridNearGetRequest req) {
                 processNearGetRequest(nodeId, req);
             }
         });
 
-        ctx.io().addHandler(ctx.cacheId(), GridNearSingleGetRequest.class, new 
CI2<UUID, GridNearSingleGetRequest>() {
+        ctx.io().addHandler(false, ctx.cacheId(), 
GridNearSingleGetRequest.class, new CI2<UUID, GridNearSingleGetRequest>() {
             @Override public void apply(UUID nodeId, GridNearSingleGetRequest 
req) {
                 processNearSingleGetRequest(nodeId, req);
             }
         });
 
-        ctx.io().addHandler(ctx.cacheId(), GridNearLockRequest.class, new 
CI2<UUID, GridNearLockRequest>() {
+        ctx.io().addHandler(false, ctx.cacheId(), GridNearLockRequest.class, 
new CI2<UUID, GridNearLockRequest>() {
             @Override public void apply(UUID nodeId, GridNearLockRequest req) {
                 processNearLockRequest(nodeId, req);
             }
         });
 
-        ctx.io().addHandler(ctx.cacheId(), GridDhtLockRequest.class, new 
CI2<UUID, GridDhtLockRequest>() {
+        ctx.io().addHandler(false, ctx.cacheId(), GridDhtLockRequest.class, 
new CI2<UUID, GridDhtLockRequest>() {
             @Override public void apply(UUID nodeId, GridDhtLockRequest req) {
                 processDhtLockRequest(nodeId, req);
             }
         });
 
-        ctx.io().addHandler(ctx.cacheId(), GridDhtLockResponse.class, new 
CI2<UUID, GridDhtLockResponse>() {
+        ctx.io().addHandler(false, ctx.cacheId(), GridDhtLockResponse.class, 
new CI2<UUID, GridDhtLockResponse>() {
             @Override public void apply(UUID nodeId, GridDhtLockResponse req) {
                 processDhtLockResponse(nodeId, req);
             }
         });
 
-        ctx.io().addHandler(ctx.cacheId(), GridNearUnlockRequest.class, new 
CI2<UUID, GridNearUnlockRequest>() {
+        ctx.io().addHandler(false, ctx.cacheId(), GridNearUnlockRequest.class, 
new CI2<UUID, GridNearUnlockRequest>() {
             @Override public void apply(UUID nodeId, GridNearUnlockRequest 
req) {
                 processNearUnlockRequest(nodeId, req);
             }
         });
 
-        ctx.io().addHandler(ctx.cacheId(), GridDhtUnlockRequest.class, new 
CI2<UUID, GridDhtUnlockRequest>() {
+        ctx.io().addHandler(false, ctx.cacheId(), GridDhtUnlockRequest.class, 
new CI2<UUID, GridDhtUnlockRequest>() {
             @Override public void apply(UUID nodeId, GridDhtUnlockRequest req) 
{
                 processDhtUnlockRequest(nodeId, req);
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java
index 3b68a5a..67eacd3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java
@@ -52,6 +52,11 @@ public class GridDhtTxOnePhaseCommitAckRequest extends 
GridCacheMessage {
         return 0;
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean cacheGroupMessage() {
+        return false;
+    }
+
     /**
      *
      * @param vers Near Tx xid Versions.

http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 2d855fe..c080470 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -239,6 +239,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
         metrics = m;
 
         ctx.io().addHandler(
+            false,
             ctx.cacheId(),
             GridNearGetRequest.class,
             new CI2<UUID, GridNearGetRequest>() {
@@ -253,6 +254,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
             });
 
         ctx.io().addHandler(
+            false,
             ctx.cacheId(),
             GridNearSingleGetRequest.class,
             new CI2<UUID, GridNearSingleGetRequest>() {
@@ -267,6 +269,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
             });
 
         ctx.io().addHandler(
+            false,
             ctx.cacheId(),
             GridNearAtomicAbstractUpdateRequest.class,
             new CI2<UUID, GridNearAtomicAbstractUpdateRequest>() {
@@ -285,7 +288,9 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                 }
             });
 
-        ctx.io().addHandler(ctx.cacheId(),
+        ctx.io().addHandler(
+            false,
+            ctx.cacheId(),
             GridNearAtomicUpdateResponse.class,
             new CI2<UUID, GridNearAtomicUpdateResponse>() {
                 @Override public void apply(
@@ -304,6 +309,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
             });
 
         ctx.io().addHandler(
+            false,
             ctx.cacheId(),
             GridDhtAtomicAbstractUpdateRequest.class,
             new CI2<UUID, GridDhtAtomicAbstractUpdateRequest>() {
@@ -323,6 +329,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
             });
 
         ctx.io().addHandler(
+            false,
             ctx.cacheId(),
             GridDhtAtomicUpdateResponse.class,
             new CI2<UUID, GridDhtAtomicUpdateResponse>() {
@@ -341,7 +348,9 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                 }
             });
 
-        ctx.io().addHandler(ctx.cacheId(),
+        ctx.io().addHandler(
+            false,
+            ctx.cacheId(),
             GridDhtAtomicDeferredUpdateResponse.class,
             new CI2<UUID, GridDhtAtomicDeferredUpdateResponse>() {
                 @Override public void apply(
@@ -359,7 +368,9 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                 }
             });
 
-        ctx.io().addHandler(ctx.cacheId(),
+        ctx.io().addHandler(
+            false,
+            ctx.cacheId(),
             GridDhtAtomicNearResponse.class,
             new CI2<UUID, GridDhtAtomicNearResponse>() {
             @Override public void apply(UUID uuid, GridDhtAtomicNearResponse 
msg) {
@@ -372,7 +383,9 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
             }
         });
 
-        ctx.io().addHandler(ctx.cacheId(),
+        ctx.io().addHandler(
+            false,
+            ctx.cacheId(),
             GridNearAtomicCheckUpdateRequest.class,
             new CI2<UUID, GridNearAtomicCheckUpdateRequest>() {
                 @Override public void apply(UUID uuid, 
GridNearAtomicCheckUpdateRequest msg) {
@@ -387,6 +400,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
 
         if (near == null) {
             ctx.io().addHandler(
+                false,
                 ctx.cacheId(),
                 GridNearGetResponse.class,
                 new CI2<UUID, GridNearGetResponse>() {
@@ -401,6 +415,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                 });
 
             ctx.io().addHandler(
+                false,
                 ctx.cacheId(),
                 GridNearSingleGetResponse.class,
                 new CI2<UUID, GridNearSingleGetResponse>() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 9ee701a..6ad1d9d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -132,19 +132,19 @@ public class GridDhtColocatedCache<K, V> extends 
GridDhtTransactionalCacheAdapte
     @Override public void start() throws IgniteCheckedException {
         super.start();
 
-        ctx.io().addHandler(ctx.cacheId(), GridNearGetResponse.class, new 
CI2<UUID, GridNearGetResponse>() {
+        ctx.io().addHandler(false, ctx.cacheId(), GridNearGetResponse.class, 
new CI2<UUID, GridNearGetResponse>() {
             @Override public void apply(UUID nodeId, GridNearGetResponse res) {
                 processNearGetResponse(nodeId, res);
             }
         });
 
-        ctx.io().addHandler(ctx.cacheId(), GridNearSingleGetResponse.class, 
new CI2<UUID, GridNearSingleGetResponse>() {
+        ctx.io().addHandler(false, ctx.cacheId(), 
GridNearSingleGetResponse.class, new CI2<UUID, GridNearSingleGetResponse>() {
             @Override public void apply(UUID nodeId, GridNearSingleGetResponse 
res) {
                 processNearSingleGetResponse(nodeId, res);
             }
         });
 
-        ctx.io().addHandler(ctx.cacheId(), GridNearLockResponse.class, new 
CI2<UUID, GridNearLockResponse>() {
+        ctx.io().addHandler(false, ctx.cacheId(), GridNearLockResponse.class, 
new CI2<UUID, GridNearLockResponse>() {
             @Override public void apply(UUID nodeId, GridNearLockResponse res) 
{
                 processLockResponse(nodeId, res);
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
index d5a60ef..441952d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
@@ -65,6 +65,11 @@ public abstract class GridDhtPartitionsAbstractMessage 
extends GridCacheMessage
     }
 
     /** {@inheritDoc} */
+    @Override public boolean cacheGroupMessage() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Override public int partition() {
         return GridIoMessage.STRIPE_DISABLED_PART;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 3d62c2f..8dfb4d2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -161,14 +161,14 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
         if (log.isDebugEnabled())
             log.debug("Starting DHT rebalancer...");
 
-        ctx.io().addHandler(grp.groupId(), GridDhtForceKeysRequest.class,
+        ctx.io().addHandler(true, grp.groupId(), GridDhtForceKeysRequest.class,
             new MessageHandler<GridDhtForceKeysRequest>() {
                 @Override public void onMessage(ClusterNode node, 
GridDhtForceKeysRequest msg) {
                     processForceKeysRequest(node, msg);
                 }
             });
 
-        ctx.io().addHandler(grp.groupId(), GridDhtForceKeysResponse.class,
+        ctx.io().addHandler(true, grp.groupId(), 
GridDhtForceKeysResponse.class,
             new MessageHandler<GridDhtForceKeysResponse>() {
                 @Override public void onMessage(ClusterNode node, 
GridDhtForceKeysResponse msg) {
                     processForceKeyResponse(node, msg);

http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 2d5c8a5..0c33edc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -103,7 +103,7 @@ public class GridNearAtomicCache<K, V> extends 
GridNearCacheAdapter<K, V> {
     @Override public void start() throws IgniteCheckedException {
         super.start();
 
-        ctx.io().addHandler(ctx.cacheId(), GridNearGetResponse.class, new 
CI2<UUID, GridNearGetResponse>() {
+        ctx.io().addHandler(false, ctx.cacheId(), GridNearGetResponse.class, 
new CI2<UUID, GridNearGetResponse>() {
             @Override public void apply(UUID nodeId, GridNearGetResponse res) {
                 processGetResponse(nodeId, res);
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index cc90be0..0b9a1c8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -87,13 +87,13 @@ public class GridNearTransactionalCache<K, V> extends 
GridNearCacheAdapter<K, V>
     @Override public void start() throws IgniteCheckedException {
         super.start();
 
-        ctx.io().addHandler(ctx.cacheId(), GridNearGetResponse.class, new 
CI2<UUID, GridNearGetResponse>() {
+        ctx.io().addHandler(false, ctx.cacheId(), GridNearGetResponse.class, 
new CI2<UUID, GridNearGetResponse>() {
             @Override public void apply(UUID nodeId, GridNearGetResponse res) {
                 processGetResponse(nodeId, res);
             }
         });
 
-        ctx.io().addHandler(ctx.cacheId(), GridNearLockResponse.class, new 
CI2<UUID, GridNearLockResponse>() {
+        ctx.io().addHandler(false, ctx.cacheId(), GridNearLockResponse.class, 
new CI2<UUID, GridNearLockResponse>() {
             @Override public void apply(UUID nodeId, GridNearLockResponse res) 
{
                 processLockResponse(nodeId, res);
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
index 06a3416..bb525bb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
@@ -104,7 +104,7 @@ public class GridCacheDistributedQueryManager<K, V> extends 
GridCacheQueryManage
 
         assert cctx.config().getCacheMode() != LOCAL;
 
-        cctx.io().addHandler(cctx.cacheId(), GridCacheQueryRequest.class, new 
CI2<UUID, GridCacheQueryRequest>() {
+        cctx.io().addHandler(false, cctx.cacheId(), 
GridCacheQueryRequest.class, new CI2<UUID, GridCacheQueryRequest>() {
             @Override public void apply(UUID nodeId, GridCacheQueryRequest 
req) {
                 processQueryRequest(nodeId, req);
             }
@@ -560,11 +560,11 @@ public class GridCacheDistributedQueryManager<K, V> 
extends GridCacheQueryManage
 
             final Object topic = topic(cctx.nodeId(), req.id());
 
-            cctx.io().addOrderedHandler(topic, resHnd);
+            cctx.io().addOrderedHandler(false, topic, resHnd);
 
             fut.listen(new CI1<IgniteInternalFuture<?>>() {
                 @Override public void apply(IgniteInternalFuture<?> fut) {
-                    cctx.io().removeOrderedHandler(topic);
+                    cctx.io().removeOrderedHandler(false, topic);
                 }
             });
 
@@ -744,11 +744,11 @@ public class GridCacheDistributedQueryManager<K, V> 
extends GridCacheQueryManage
 
             final Object topic = topic(cctx.nodeId(), req.id());
 
-            cctx.io().addOrderedHandler(topic, resHnd);
+            cctx.io().addOrderedHandler(false, topic, resHnd);
 
             fut.listen(new CI1<IgniteInternalFuture<?>>() {
                 @Override public void apply(IgniteInternalFuture<?> fut) {
-                    cctx.io().removeOrderedHandler(topic);
+                    cctx.io().removeOrderedHandler(false, topic);
                 }
             });
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index a1e0bad..03e1e1c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -127,7 +127,7 @@ public class CacheContinuousQueryManager extends 
GridCacheManagerAdapter {
         topicPrefix = "CONTINUOUS_QUERY" + (cctx.name() == null ? "" : "_" + 
cctx.name());
 
         if (cctx.affinityNode()) {
-            cctx.io().addHandler(cctx.cacheId(), 
CacheContinuousQueryBatchAck.class,
+            cctx.io().addHandler(false, cctx.cacheId(), 
CacheContinuousQueryBatchAck.class,
                 new CI2<UUID, CacheContinuousQueryBatchAck>() {
                     @Override public void apply(UUID uuid, 
CacheContinuousQueryBatchAck msg) {
                         CacheContinuousQueryListener lsnr = 
lsnrs.get(msg.routineId());

http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index c6dc114..e686252 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -136,68 +136,68 @@ public class IgniteTxHandler {
         txPrepareMsgLog = ctx.logger(CU.TX_MSG_PREPARE_LOG_CATEGORY);
         txFinishMsgLog = ctx.logger(CU.TX_MSG_FINISH_LOG_CATEGORY);
 
-        ctx.io().addHandler(0, GridNearTxPrepareRequest.class, new CI2<UUID, 
GridCacheMessage>() {
+        ctx.io().addHandler(false, 0, GridNearTxPrepareRequest.class, new 
CI2<UUID, GridCacheMessage>() {
             @Override public void apply(UUID nodeId, GridCacheMessage msg) {
                 processNearTxPrepareRequest(nodeId, 
(GridNearTxPrepareRequest)msg);
             }
         });
 
-        ctx.io().addHandler(0, GridNearTxPrepareResponse.class, new CI2<UUID, 
GridCacheMessage>() {
+        ctx.io().addHandler(false, 0, GridNearTxPrepareResponse.class, new 
CI2<UUID, GridCacheMessage>() {
             @Override public void apply(UUID nodeId, GridCacheMessage msg) {
                 processNearTxPrepareResponse(nodeId, 
(GridNearTxPrepareResponse)msg);
             }
         });
 
-        ctx.io().addHandler(0, GridNearTxFinishRequest.class, new CI2<UUID, 
GridCacheMessage>() {
+        ctx.io().addHandler(false, 0, GridNearTxFinishRequest.class, new 
CI2<UUID, GridCacheMessage>() {
             @Override public void apply(UUID nodeId, GridCacheMessage msg) {
                 processNearTxFinishRequest(nodeId, 
(GridNearTxFinishRequest)msg);
             }
         });
 
-        ctx.io().addHandler(0, GridNearTxFinishResponse.class, new CI2<UUID, 
GridCacheMessage>() {
+        ctx.io().addHandler(false, 0, GridNearTxFinishResponse.class, new 
CI2<UUID, GridCacheMessage>() {
             @Override public void apply(UUID nodeId, GridCacheMessage msg) {
                 processNearTxFinishResponse(nodeId, 
(GridNearTxFinishResponse)msg);
             }
         });
 
-        ctx.io().addHandler(0, GridDhtTxPrepareRequest.class, new CI2<UUID, 
GridCacheMessage>() {
+        ctx.io().addHandler(false, 0, GridDhtTxPrepareRequest.class, new 
CI2<UUID, GridCacheMessage>() {
             @Override public void apply(UUID nodeId, GridCacheMessage msg) {
                 processDhtTxPrepareRequest(nodeId, 
(GridDhtTxPrepareRequest)msg);
             }
         });
 
-        ctx.io().addHandler(0, GridDhtTxPrepareResponse.class, new CI2<UUID, 
GridCacheMessage>() {
+        ctx.io().addHandler(false, 0, GridDhtTxPrepareResponse.class, new 
CI2<UUID, GridCacheMessage>() {
             @Override public void apply(UUID nodeId, GridCacheMessage msg) {
                 processDhtTxPrepareResponse(nodeId, 
(GridDhtTxPrepareResponse)msg);
             }
         });
 
-        ctx.io().addHandler(0, GridDhtTxFinishRequest.class, new CI2<UUID, 
GridCacheMessage>() {
+        ctx.io().addHandler(false, 0, GridDhtTxFinishRequest.class, new 
CI2<UUID, GridCacheMessage>() {
             @Override public void apply(UUID nodeId, GridCacheMessage msg) {
                 processDhtTxFinishRequest(nodeId, (GridDhtTxFinishRequest)msg);
             }
         });
 
-        ctx.io().addHandler(0, GridDhtTxOnePhaseCommitAckRequest.class, new 
CI2<UUID, GridCacheMessage>() {
+        ctx.io().addHandler(false, 0, GridDhtTxOnePhaseCommitAckRequest.class, 
new CI2<UUID, GridCacheMessage>() {
             @Override public void apply(UUID nodeId, GridCacheMessage msg) {
                 processDhtTxOnePhaseCommitAckRequest(nodeId, 
(GridDhtTxOnePhaseCommitAckRequest)msg);
             }
         });
 
-        ctx.io().addHandler(0, GridDhtTxFinishResponse.class, new CI2<UUID, 
GridCacheMessage>() {
+        ctx.io().addHandler(false, 0, GridDhtTxFinishResponse.class, new 
CI2<UUID, GridCacheMessage>() {
             @Override public void apply(UUID nodeId, GridCacheMessage msg) {
                 processDhtTxFinishResponse(nodeId, 
(GridDhtTxFinishResponse)msg);
             }
         });
 
-        ctx.io().addHandler(0, GridCacheTxRecoveryRequest.class,
+        ctx.io().addHandler(false, 0, GridCacheTxRecoveryRequest.class,
             new CI2<UUID, GridCacheTxRecoveryRequest>() {
                 @Override public void apply(UUID nodeId, 
GridCacheTxRecoveryRequest req) {
                     processCheckPreparedTxRequest(nodeId, req);
                 }
             });
 
-        ctx.io().addHandler(0, GridCacheTxRecoveryResponse.class,
+        ctx.io().addHandler(false, 0, GridCacheTxRecoveryResponse.class,
             new CI2<UUID, GridCacheTxRecoveryResponse>() {
                 @Override public void apply(UUID nodeId, 
GridCacheTxRecoveryResponse res) {
                     processCheckPreparedTxResponse(nodeId, res);

http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksRequest.java
index a1fb9a3..94fe005 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksRequest.java
@@ -75,6 +75,11 @@ public class TxLocksRequest extends GridCacheMessage {
         return 0;
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean cacheGroupMessage() {
+        return false;
+    }
+
     /**
      * @return Future ID.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksResponse.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksResponse.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksResponse.java
index ab2e579..a5c8f09 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksResponse.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksResponse.java
@@ -78,6 +78,11 @@ public class TxLocksResponse extends GridCacheMessage {
         return 0;
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean cacheGroupMessage() {
+        return false;
+    }
+
     /**
      * @return Future ID.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
index b25b229..dbdc47c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
@@ -134,7 +134,7 @@ public class GridClusterStateProcessor extends 
GridProcessorAdapter {
         cacheProc = ctx.cache();
         sharedCtx = cacheProc.context();
 
-        sharedCtx.io().addHandler(0,
+        sharedCtx.io().addHandler(false, 0,
             GridChangeGlobalStateMessageResponse.class,
             new CI2<UUID, GridChangeGlobalStateMessageResponse>() {
                 @Override public void apply(UUID nodeId, 
GridChangeGlobalStateMessageResponse msg) {
@@ -194,7 +194,7 @@ public class GridClusterStateProcessor extends 
GridProcessorAdapter {
     @Override public void stop(boolean cancel) throws IgniteCheckedException {
         super.stop(cancel);
 
-        sharedCtx.io().removeHandler(0, 
GridChangeGlobalStateMessageResponse.class);
+        sharedCtx.io().removeHandler(false, 0, 
GridChangeGlobalStateMessageResponse.class);
         ctx.event().removeLocalEventListener(lsr, EVT_NODE_LEFT, 
EVT_NODE_FAILED);
 
         IgniteCheckedException stopErr = new IgniteInterruptedCheckedException(

http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConditionalDeploymentSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConditionalDeploymentSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConditionalDeploymentSelfTest.java
index f5cf177..3e68260 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConditionalDeploymentSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConditionalDeploymentSelfTest.java
@@ -188,6 +188,11 @@ public class GridCacheConditionalDeploymentSelfTest 
extends GridCommonAbstractTe
         }
 
         /** {@inheritDoc} */
+        @Override public boolean cacheGroupMessage() {
+            return false;
+        }
+
+        /** {@inheritDoc} */
         @Override public short directType() {
             return DIRECT_TYPE;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
index bf99013..4ba4cfe 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
@@ -177,7 +177,7 @@ public class IgniteCacheGroupsTest extends 
GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testCreateCache1() throws Exception {
-        Ignite srv0 = ignite(0);
+        Ignite srv0 = startGrid(0);
 
         {
             IgniteCache<Object, Object> cache1 = 
srv0.createCache(cacheConfiguration("grp1", "cache1", ATOMIC, 2));
@@ -215,7 +215,7 @@ public class IgniteCacheGroupsTest extends 
GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testCreateCache2() throws Exception {
-        Ignite srv0 = ignite(0);
+        Ignite srv0 = startGrid(0);
 
         {
             IgniteCache<Object, Object> cache1 = 
srv0.createCache(cacheConfiguration("grp1", "cache1", ATOMIC, 0));

http://git-wip-us.apache.org/repos/asf/ignite/blob/b53950ac/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
index ab5bfd5..4a6b765 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
@@ -221,6 +221,11 @@ public class GridCacheMessageSelfTest extends 
GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
+        @Override public boolean cacheGroupMessage() {
+            return false;
+        }
+
+        /** {@inheritDoc} */
         @Override public boolean addDeploymentInfo() {
             return false;
         }
@@ -321,6 +326,11 @@ public class GridCacheMessageSelfTest extends 
GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
+        @Override public boolean cacheGroupMessage() {
+            return false;
+        }
+
+        /** {@inheritDoc} */
         @Override public boolean addDeploymentInfo() {
             return false;
         }
@@ -449,6 +459,11 @@ public class GridCacheMessageSelfTest extends 
GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
+        @Override public boolean cacheGroupMessage() {
+            return false;
+        }
+
+        /** {@inheritDoc} */
         @Override public boolean addDeploymentInfo() {
             return false;
         }

Reply via email to