This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 910b7d58837 IGNITE-19115 Fixed handling cache messages for recreated 
cache (#10618)
910b7d58837 is described below

commit 910b7d5883711eb31224d03860f92e18470880b9
Author: Slava Koptilin <slava.kopti...@gmail.com>
AuthorDate: Fri Apr 14 11:29:55 2023 +0300

    IGNITE-19115 Fixed handling cache messages for recreated cache (#10618)
---
 .../processors/cache/GridCacheAdapter.java         |   2 +-
 .../processors/cache/GridCacheIoManager.java       | 125 ++++--
 .../processors/cache/GridCacheMessage.java         |   5 +-
 .../cache/GridCachePartitionExchangeManager.java   |   6 +-
 .../processors/cache/GridCacheProcessor.java       |   6 +
 .../cache/distributed/dht/GridDhtCache.java        |   2 -
 .../cache/distributed/dht/GridDhtCacheAdapter.java |   8 +-
 .../dht/GridDhtTransactionalCacheAdapter.java      | 117 ++----
 .../distributed/dht/atomic/GridDhtAtomicCache.java |  74 ++--
 .../dht/colocated/GridDhtColocatedCache.java       |  44 ++-
 .../distributed/near/GridNearAtomicCache.java      |  16 +-
 .../near/GridNearTransactionalCache.java           |  30 +-
 .../query/GridCacheDistributedQueryManager.java    |  34 +-
 .../cache/query/GridCacheQueryRequest.java         |   6 +-
 .../continuous/CacheContinuousQueryManager.java    |  17 +-
 .../cache/transactions/IgniteTxEntry.java          |  33 +-
 .../cache/transactions/IgniteTxHandler.java        |  48 +--
 .../cluster/GridClusterStateProcessor.java         |   2 +-
 .../distributed/dht/IgniteCacheRecreateTest.java   | 421 +++++++++++++++++++++
 .../communication/GridCacheMessageSelfTest.java    |   6 +-
 .../testsuites/IgniteCacheRestartTestSuite2.java   |   3 +
 21 files changed, 742 insertions(+), 263 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 9a5e378ced9..55cd09b4e94 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -646,7 +646,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
      *
      * @throws IgniteCheckedException If callback failed.
      */
-    protected void onKernalStart() throws IgniteCheckedException {
+    public void onKernalStart() throws IgniteCheckedException {
         // No-op.
     }
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index cb7bef39cf8..c9457994b16 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -102,6 +103,7 @@ import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
+import static 
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
 import static org.apache.ignite.internal.util.IgniteUtils.nl;
 
 /**
@@ -117,6 +119,9 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
     /** */
     private static final int MAX_STORED_PENDING_MESSAGES = 100;
 
+    /** Common message handler identifier that does not correspond to any 
particular cache. */
+    public static final int COMMON_MESSAGE_HANDLER_ID = 0;
+
     /** Delay in milliseconds between retries. */
     private long retryDelay;
 
@@ -334,19 +339,27 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
         try {
             int msgIdx = cacheMsg.lookupIndex();
 
+            AffinityTopologyVersion msgTopVer = cacheMsg.topologyVersion();
+
             IgniteBiInClosure<UUID, GridCacheMessage> c = null;
 
             if (msgIdx >= 0) {
-                Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = 
msgHandlers.idxClsHandlers;
+                Map<Integer, IndexedClassHandler> idxClsHandlers0 = 
msgHandlers.idxClsHandlers;
 
-                IgniteBiInClosure[] cacheClsHandlers = 
idxClsHandlers0.get(cacheMsg.handlerId());
+                IndexedClassHandler cacheClsHandlers = 
idxClsHandlers0.get(cacheMsg.handlerId());
 
-                if (cacheClsHandlers != null)
-                    c = cacheClsHandlers[msgIdx];
+                if (cacheClsHandlers != null &&
+                    (NONE.equals(msgTopVer) || 
!msgTopVer.before(cacheClsHandlers.startTopVer)))
+                    c = cacheClsHandlers.hndls[msgIdx];
             }
 
-            if (c == null)
-                c = msgHandlers.clsHandlers.get(new 
ListenerKey(cacheMsg.handlerId(), cacheMsg.getClass()));
+            if (c == null) {
+                RegularClassHandler rHnd = msgHandlers.clsHandlers.get(
+                        new ListenerKey(cacheMsg.handlerId(), 
cacheMsg.getClass()));
+
+                if (rHnd != null && (NONE.equals(msgTopVer) || 
!msgTopVer.before(rHnd.startTopVer)))
+                    c = rHnd.hnd;
+            }
 
             if (c == null) {
                 if (processMissedHandler(nodeId, cacheMsg))
@@ -365,10 +378,10 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
 
                 msg0.append(nl()).append("Registered listeners:");
 
-                Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = 
msgHandlers.idxClsHandlers;
+                Map<Integer, IndexedClassHandler> idxClsHandlers0 = 
msgHandlers.idxClsHandlers;
 
-                for (Map.Entry<Integer, IgniteBiInClosure[]> e : 
idxClsHandlers0.entrySet())
-                    
msg0.append(nl()).append(e.getKey()).append("=").append(Arrays.toString(e.getValue()));
+                for (Map.Entry<Integer, IndexedClassHandler> e : 
idxClsHandlers0.entrySet())
+                    
msg0.append(nl()).append(e.getKey()).append("=").append(Arrays.toString(e.getValue().hndls));
 
                 if (cctx.kernalContext().isStopping()) {
                     if (log.isDebugEnabled())
@@ -1402,12 +1415,30 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
      */
     public <Msg extends GridCacheMessage> void addCacheHandler(
         int hndId,
+        AffinityTopologyVersion startTopVer,
         Class<Msg> type,
         IgniteBiInClosure<UUID, ? super Msg> c
     ) {
         assert !type.isAssignableFrom(GridCacheGroupIdMessage.class) : type;
 
-        addHandler(hndId, type, c, cacheHandlers);
+        addHandler(hndId, startTopVer, type, c, cacheHandlers);
+    }
+
+    /**
+     * Registers a new message handler for the given message {@code type}.
+     * This method is equivalent to {@link #addCacheHandler(int, 
AffinityTopologyVersion, Class, IgniteBiInClosure)} where is
+     * the handler id equals to {@link #COMMON_MESSAGE_HANDLER_ID} and 
deployment id is {@code null}.
+     *
+     * @param type Type of message.
+     * @param c Handler.
+     */
+    public <Msg extends GridCacheMessage> void addCacheHandler(
+        Class<Msg> type,
+        IgniteBiInClosure<UUID, ? super Msg> c
+    ) {
+        assert !type.isAssignableFrom(GridCacheGroupIdMessage.class) : type;
+
+        addHandler(COMMON_MESSAGE_HANDLER_ID, NONE, type, c, cacheHandlers);
     }
 
     /**
@@ -1421,7 +1452,9 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
         int hndId,
         Class<? extends GridCacheMessage> msgCls
     ) {
-        return cacheHandlers.clsHandlers.get(new ListenerKey(hndId, msgCls));
+        RegularClassHandler clsHnd = cacheHandlers.clsHandlers.get(new 
ListenerKey(hndId, msgCls));
+
+        return (clsHnd != null) ? clsHnd.hnd : null;
     }
 
     /**
@@ -1436,7 +1469,7 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
     ) {
         assert !type.isAssignableFrom(GridCacheIdMessage.class) : type;
 
-        addHandler(hndId, type, c, grpHandlers);
+        addHandler(hndId, NONE, type, c, grpHandlers);
     }
 
     /**
@@ -1447,6 +1480,7 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
      */
     private <Msg extends GridCacheMessage> void addHandler(
         int hndId,
+        AffinityTopologyVersion startTopVer,
         Class<Msg> type,
         IgniteBiInClosure<UUID, ? super Msg> c,
         MessageHandlers msgHandlers
@@ -1454,16 +1488,16 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
         int msgIdx = messageIndex(type);
 
         if (msgIdx != -1) {
-            Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = 
msgHandlers.idxClsHandlers;
+            Map<Integer, IndexedClassHandler> idxClsHandlers0 = 
msgHandlers.idxClsHandlers;
 
-            IgniteBiInClosure[] cacheClsHandlers = 
idxClsHandlers0.compute(hndId, (key, clsHandlers) -> {
+            IndexedClassHandler cacheClsHandlers = 
idxClsHandlers0.compute(hndId, (key, clsHandlers) -> {
                 if (clsHandlers == null)
-                    clsHandlers = new 
IgniteBiInClosure[GridCacheMessage.MAX_CACHE_MSG_LOOKUP_INDEX];
+                    clsHandlers = new IndexedClassHandler(startTopVer);
 
-                if (clsHandlers[msgIdx] != null)
+                if (clsHandlers.hndls[msgIdx] != null || 
!Objects.equals(clsHandlers.startTopVer, startTopVer))
                     return null;
 
-                clsHandlers[msgIdx] = c;
+                clsHandlers.hndls[msgIdx] = c;
 
                 return clsHandlers;
             });
@@ -1476,11 +1510,12 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
         }
         else {
             ListenerKey key = new ListenerKey(hndId, type);
+            RegularClassHandler regHnd = new RegularClassHandler(startTopVer, 
(IgniteBiInClosure<UUID, GridCacheMessage>)c);
 
-            if (msgHandlers.clsHandlers.putIfAbsent(key,
-                (IgniteBiInClosure<UUID, GridCacheMessage>)c) != null)
+            if (msgHandlers.clsHandlers.putIfAbsent(key, regHnd) != null) {
                 assert false : "Handler for class already registered [hndId=" 
+ hndId + ", cls=" + type +
                     ", old=" + msgHandlers.clsHandlers.get(key) + ", new=" + c 
+ ']';
+            }
         }
 
         IgniteLogger log0 = log;
@@ -1694,16 +1729,62 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
         X.println(">>>   cacheGrpOrderedHandlersSize: " + 
grpHandlers.orderedHandlers.size());
     }
 
+    /** */
+    abstract static class MessageHandler {
+        /** Start topology version. */
+        final AffinityTopologyVersion startTopVer;
+
+        /**
+         * Creates a new message handler descriptor.
+         *
+         * @param startTopVer Start affinity topology version.
+         */
+        MessageHandler(AffinityTopologyVersion startTopVer) {
+            this.startTopVer = startTopVer;
+        }
+    }
+
+    /** */
+    static class IndexedClassHandler extends MessageHandler {
+        /** Actual handlers. */
+        final IgniteBiInClosure[] hndls = new 
IgniteBiInClosure[GridCacheMessage.MAX_CACHE_MSG_LOOKUP_INDEX];
+
+        /**
+         * Creates a new message handler descriptor.
+         *
+         * @param startTopVer Start affinity topology version.
+         */
+        IndexedClassHandler(AffinityTopologyVersion startTopVer) {
+            super(startTopVer);
+        }
+    }
+
+    /** */
+    static class RegularClassHandler extends MessageHandler {
+        /** Actual handler. */
+        IgniteBiInClosure<UUID, GridCacheMessage> hnd;
+
+        /**
+         * Creates a new message handler descriptor.
+         *
+         * @param startTopVer Start affinity topology version.
+         */
+        RegularClassHandler(AffinityTopologyVersion startTopVer, 
IgniteBiInClosure<UUID, GridCacheMessage> hnd) {
+            super(startTopVer);
+
+            this.hnd = hnd;
+        }
+    }
+
     /**
      *
      */
     static class MessageHandlers {
         /** Indexed class handlers. */
-        volatile Map<Integer, IgniteBiInClosure[]> idxClsHandlers = new 
ConcurrentHashMap<>();
+        volatile Map<Integer, IndexedClassHandler> idxClsHandlers = new 
ConcurrentHashMap<>();
 
         /** Handler registry. */
-        ConcurrentMap<ListenerKey, IgniteBiInClosure<UUID, GridCacheMessage>>
-            clsHandlers = new ConcurrentHashMap<>();
+        ConcurrentMap<ListenerKey, RegularClassHandler> clsHandlers = new 
ConcurrentHashMap<>();
 
         /** Ordered handler registry. */
         ConcurrentMap<Object, IgniteBiInClosure<UUID, ? extends 
GridCacheMessage>> orderedHandlers =
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
index a9685a1eb8d..c655dc487ef 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
@@ -442,8 +442,11 @@ public abstract class GridCacheMessage implements Message {
         assert ctx != null;
 
         if (txEntries != null) {
-            for (IgniteTxEntry e : txEntries)
+            for (IgniteTxEntry e : txEntries) {
+                e.prepareUnmarshal(ctx, topologyVersion(), near);
+
                 e.unmarshal(ctx, near, ldr);
+            }
         }
     }
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 4115e56fa87..3957151db29 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -413,7 +413,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
         cctx.gridEvents().addDiscoveryEventListener(discoLsnr, 
EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED,
             EVT_DISCOVERY_CUSTOM_EVT);
 
-        cctx.io().addCacheHandler(0, GridDhtPartitionsSingleMessage.class,
+        cctx.io().addCacheHandler(GridDhtPartitionsSingleMessage.class,
             new MessageHandler<GridDhtPartitionsSingleMessage>() {
                 @Override public void onMessage(final ClusterNode node, final 
GridDhtPartitionsSingleMessage msg) {
                     GridDhtPartitionExchangeId exchangeId = msg.exchangeId();
@@ -450,7 +450,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                 }
             });
 
-        cctx.io().addCacheHandler(0, GridDhtPartitionsFullMessage.class,
+        cctx.io().addCacheHandler(GridDhtPartitionsFullMessage.class,
             new MessageHandler<GridDhtPartitionsFullMessage>() {
                 @Override public void onMessage(ClusterNode node, 
GridDhtPartitionsFullMessage msg) {
                     if (msg.exchangeId() == null) {
@@ -470,7 +470,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                 }
             });
 
-        cctx.io().addCacheHandler(0, GridDhtPartitionsSingleRequest.class,
+        cctx.io().addCacheHandler(GridDhtPartitionsSingleRequest.class,
             new MessageHandler<GridDhtPartitionsSingleRequest>() {
                 @Override public void onMessage(ClusterNode node, 
GridDhtPartitionsSingleRequest msg) {
                     processSinglePartitionRequest(node, msg);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index d8bbc50a7f7..3ad724da2c1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -2074,6 +2074,12 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
 
         cacheContext.finishRecovery(cacheStartVer, updatedDescriptor);
 
+        if (isNearEnabled(cacheContext)) {
+            GridDhtCacheAdapter dht = cacheContext.near().dht();
+
+            dht.context().finishRecovery(cacheStartVer, updatedDescriptor);
+        }
+
         if (cacheContext.config().getAtomicityMode() == TRANSACTIONAL_SNAPSHOT 
&& groupContext.affinityNode())
             sharedCtx.coordinators().ensureStarted();
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCache.java
index 7889163f177..455224bdd51 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCache.java
@@ -76,8 +76,6 @@ public class GridDhtCache<K, V> extends 
GridDhtTransactionalCacheAdapter<K, V> {
         metrics.delegate(ctx.dht().near().metrics0());
 
         ctx.dr().resetMetrics();
-
-        super.start();
     }
 
     /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 557153c0495..a785bb6bc80 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -387,8 +387,12 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
     }
 
     /** {@inheritDoc} */
-    @Override public void start() throws IgniteCheckedException {
-        ctx.io().addCacheHandler(ctx.cacheId(), 
GridCacheTtlUpdateRequest.class,
+    @Override public void onKernalStart() throws IgniteCheckedException {
+        super.onKernalStart();
+
+        assert !ctx.isRecoveryMode() : "Registering message handlers in 
recovery mode [cacheName=" + name() + ']';
+
+        ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(), 
GridCacheTtlUpdateRequest.class,
             (CI2<UUID, 
GridCacheTtlUpdateRequest>)this::processTtlUpdateRequest);
 
         ctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_LEFT, 
EVT_NODE_FAILED);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index fcd7ab1d3be..5c2e241e247 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -141,125 +141,80 @@ public abstract class 
GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
     }
 
     /** {@inheritDoc} */
-    @Override public void start() throws IgniteCheckedException {
-        super.start();
+    @Override public void onKernalStart() throws IgniteCheckedException {
+        super.onKernalStart();
 
-        ctx.io().addCacheHandler(ctx.cacheId(), GridNearGetRequest.class, new 
CI2<UUID, GridNearGetRequest>() {
-            @Override public void apply(UUID nodeId, GridNearGetRequest req) {
-                processNearGetRequest(nodeId, req);
-            }
-        });
+        assert !ctx.isRecoveryMode() : "Registering message handlers in 
recovery mode [cacheName=" + name() + ']';
 
-        ctx.io().addCacheHandler(ctx.cacheId(), 
GridNearSingleGetRequest.class, new CI2<UUID, GridNearSingleGetRequest>() {
-            @Override public void apply(UUID nodeId, GridNearSingleGetRequest 
req) {
-                processNearSingleGetRequest(nodeId, req);
-            }
-        });
+        ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(), 
GridNearGetRequest.class,
+            (CI2<UUID, GridNearGetRequest>)this::processNearGetRequest);
 
-        ctx.io().addCacheHandler(ctx.cacheId(), GridNearLockRequest.class, new 
CI2<UUID, GridNearLockRequest>() {
-            @Override public void apply(UUID nodeId, GridNearLockRequest req) {
-                processNearLockRequest(nodeId, req);
-            }
-        });
+        ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(), 
GridNearSingleGetRequest.class,
+            (CI2<UUID, 
GridNearSingleGetRequest>)this::processNearSingleGetRequest);
 
-        ctx.io().addCacheHandler(ctx.cacheId(), GridDhtLockRequest.class, new 
CI2<UUID, GridDhtLockRequest>() {
-            @Override public void apply(UUID nodeId, GridDhtLockRequest req) {
-                processDhtLockRequest(nodeId, req);
-            }
-        });
+        ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(), 
GridNearLockRequest.class,
+            (CI2<UUID, GridNearLockRequest>)this::processNearLockRequest);
 
-        ctx.io().addCacheHandler(ctx.cacheId(), GridDhtLockResponse.class, new 
CI2<UUID, GridDhtLockResponse>() {
-            @Override public void apply(UUID nodeId, GridDhtLockResponse req) {
-                processDhtLockResponse(nodeId, req);
-            }
-        });
+        ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(), 
GridDhtLockRequest.class,
+            (CI2<UUID, GridDhtLockRequest>)this::processDhtLockRequest);
 
-        ctx.io().addCacheHandler(ctx.cacheId(), GridNearUnlockRequest.class, 
new CI2<UUID, GridNearUnlockRequest>() {
-            @Override public void apply(UUID nodeId, GridNearUnlockRequest 
req) {
-                processNearUnlockRequest(nodeId, req);
-            }
-        });
+        ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(), 
GridDhtLockResponse.class,
+            (CI2<UUID, GridDhtLockResponse>)this::processDhtLockResponse);
 
-        ctx.io().addCacheHandler(ctx.cacheId(), GridDhtUnlockRequest.class, 
new CI2<UUID, GridDhtUnlockRequest>() {
-            @Override public void apply(UUID nodeId, GridDhtUnlockRequest req) 
{
-                processDhtUnlockRequest(nodeId, req);
-            }
-        });
+        ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(), 
GridNearUnlockRequest.class,
+            (CI2<UUID, GridNearUnlockRequest>)this::processNearUnlockRequest);
 
-        ctx.io().addCacheHandler(ctx.cacheId(), 
GridNearTxQueryEnlistRequest.class, new CI2<UUID, 
GridNearTxQueryEnlistRequest>() {
-            @Override public void apply(UUID nodeId, 
GridNearTxQueryEnlistRequest req) {
-                processNearTxQueryEnlistRequest(nodeId, req);
-            }
-        });
+        ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(), 
GridDhtUnlockRequest.class,
+            (CI2<UUID, GridDhtUnlockRequest>)this::processDhtUnlockRequest);
 
-        ctx.io().addCacheHandler(ctx.cacheId(), 
GridNearTxQueryEnlistResponse.class, new CI2<UUID, 
GridNearTxQueryEnlistResponse>() {
-            @Override public void apply(UUID nodeId, 
GridNearTxQueryEnlistResponse req) {
-                processNearTxQueryEnlistResponse(nodeId, req);
-            }
-        });
+        ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(), 
GridNearTxQueryEnlistRequest.class,
+            (CI2<UUID, 
GridNearTxQueryEnlistRequest>)this::processNearTxQueryEnlistRequest);
+
+        ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(), 
GridNearTxQueryEnlistResponse.class,
+            (CI2<UUID, 
GridNearTxQueryEnlistResponse>)this::processNearTxQueryEnlistResponse);
 
-        ctx.io().addCacheHandler(ctx.cacheId(), GridDhtForceKeysRequest.class,
+        ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(), 
GridDhtForceKeysRequest.class,
             new MessageHandler<GridDhtForceKeysRequest>() {
                 @Override public void onMessage(ClusterNode node, 
GridDhtForceKeysRequest msg) {
                     processForceKeysRequest(node, msg);
                 }
             });
 
-        ctx.io().addCacheHandler(ctx.cacheId(), GridDhtForceKeysResponse.class,
+        ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(), 
GridDhtForceKeysResponse.class,
             new MessageHandler<GridDhtForceKeysResponse>() {
                 @Override public void onMessage(ClusterNode node, 
GridDhtForceKeysResponse msg) {
                     processForceKeyResponse(node, msg);
                 }
             });
 
-        ctx.io().addCacheHandler(ctx.cacheId(), 
GridNearTxQueryResultsEnlistRequest.class,
-            new CI2<UUID, GridNearTxQueryResultsEnlistRequest>() {
-                @Override public void apply(UUID nodeId, 
GridNearTxQueryResultsEnlistRequest req) {
-                    processNearTxQueryResultsEnlistRequest(nodeId, req);
-                }
-            });
+        ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(), 
GridNearTxQueryResultsEnlistRequest.class,
+            (CI2<UUID, 
GridNearTxQueryResultsEnlistRequest>)this::processNearTxQueryResultsEnlistRequest);
 
-        ctx.io().addCacheHandler(ctx.cacheId(), 
GridNearTxQueryResultsEnlistResponse.class,
-            new CI2<UUID, GridNearTxQueryResultsEnlistResponse>() {
-                @Override public void apply(UUID nodeId, 
GridNearTxQueryResultsEnlistResponse req) {
-                    processNearTxQueryResultsEnlistResponse(nodeId, req);
-                }
-            });
+        ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(), 
GridNearTxQueryResultsEnlistResponse.class,
+            (CI2<UUID, 
GridNearTxQueryResultsEnlistResponse>)this::processNearTxQueryResultsEnlistResponse);
 
-        ctx.io().addCacheHandler(ctx.cacheId(), GridNearTxEnlistRequest.class,
-            new CI2<UUID, GridNearTxEnlistRequest>() {
-                @Override public void apply(UUID nodeId, 
GridNearTxEnlistRequest req) {
-                    processNearTxEnlistRequest(nodeId, req);
-                }
-            });
+        ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(), 
GridNearTxEnlistRequest.class,
+            (CI2<UUID, 
GridNearTxEnlistRequest>)this::processNearTxEnlistRequest);
 
-        ctx.io().addCacheHandler(ctx.cacheId(), GridNearTxEnlistResponse.class,
-            new CI2<UUID, GridNearTxEnlistResponse>() {
-                @Override public void apply(UUID nodeId, 
GridNearTxEnlistResponse msg) {
-                    processNearTxEnlistResponse(nodeId, msg);
-                }
-            });
+        ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(), 
GridNearTxEnlistResponse.class,
+            (CI2<UUID, 
GridNearTxEnlistResponse>)this::processNearTxEnlistResponse);
 
-        ctx.io().addCacheHandler(ctx.cacheId(), 
GridDhtTxQueryEnlistRequest.class,
+        ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(), 
GridDhtTxQueryEnlistRequest.class,
             new CI2<UUID, GridDhtTxQueryEnlistRequest>() {
                 @Override public void apply(UUID nodeId, 
GridDhtTxQueryEnlistRequest msg) {
                     processDhtTxQueryEnlistRequest(nodeId, msg, false);
                 }
             });
 
-        ctx.io().addCacheHandler(ctx.cacheId(), 
GridDhtTxQueryFirstEnlistRequest.class,
+        ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(), 
GridDhtTxQueryFirstEnlistRequest.class,
             new CI2<UUID, GridDhtTxQueryEnlistRequest>() {
                 @Override public void apply(UUID nodeId, 
GridDhtTxQueryEnlistRequest msg) {
                     processDhtTxQueryEnlistRequest(nodeId, msg, true);
                 }
             });
 
-        ctx.io().addCacheHandler(ctx.cacheId(), 
GridDhtTxQueryEnlistResponse.class,
-            new CI2<UUID, GridDhtTxQueryEnlistResponse>() {
-                @Override public void apply(UUID nodeId, 
GridDhtTxQueryEnlistResponse msg) {
-                    processDhtTxQueryEnlistResponse(nodeId, msg);
-                }
-            });
+        ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(), 
GridDhtTxQueryEnlistResponse.class,
+            (CI2<UUID, 
GridDhtTxQueryEnlistResponse>)this::processDhtTxQueryEnlistResponse);
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 6b00da0ecfa..c3b2da0a9b0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -241,44 +241,26 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
     }
 
     /** {@inheritDoc} */
-    @Override public void start() throws IgniteCheckedException {
-        super.start();
-
-        assert metrics != null : "Cache metrics instance isn't initialized.";
+    @Override public void onKernalStart() throws IgniteCheckedException {
+        super.onKernalStart();
 
-        if (ctx.dht().near() != null)
-            metrics.delegate(ctx.dht().near().metrics0());
+        assert !ctx.isRecoveryMode() : "Registering message handlers in 
recovery mode [cacheName=" + name() + ']';
 
         ctx.io().addCacheHandler(
             ctx.cacheId(),
+            ctx.startTopologyVersion(),
             GridNearGetRequest.class,
-            new CI2<UUID, GridNearGetRequest>() {
-                @Override public void apply(
-                    UUID nodeId,
-                    GridNearGetRequest req
-                ) {
-                    processNearGetRequest(
-                        nodeId,
-                        req);
-                }
-            });
+            (CI2<UUID, GridNearGetRequest>)this::processNearGetRequest);
 
         ctx.io().addCacheHandler(
             ctx.cacheId(),
+            ctx.startTopologyVersion(),
             GridNearSingleGetRequest.class,
-            new CI2<UUID, GridNearSingleGetRequest>() {
-                @Override public void apply(
-                    UUID nodeId,
-                    GridNearSingleGetRequest req
-                ) {
-                    processNearSingleGetRequest(
-                        nodeId,
-                        req);
-                }
-            });
+            (CI2<UUID, 
GridNearSingleGetRequest>)this::processNearSingleGetRequest);
 
         ctx.io().addCacheHandler(
             ctx.cacheId(),
+            ctx.startTopologyVersion(),
             GridNearAtomicAbstractUpdateRequest.class,
             new CI2<UUID, GridNearAtomicAbstractUpdateRequest>() {
                 @Override public void apply(
@@ -298,6 +280,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
 
         ctx.io().addCacheHandler(
             ctx.cacheId(),
+            ctx.startTopologyVersion(),
             GridNearAtomicUpdateResponse.class,
             new CI2<UUID, GridNearAtomicUpdateResponse>() {
                 @Override public void apply(
@@ -317,6 +300,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
 
         ctx.io().addCacheHandler(
             ctx.cacheId(),
+            ctx.startTopologyVersion(),
             GridDhtAtomicAbstractUpdateRequest.class,
             new CI2<UUID, GridDhtAtomicAbstractUpdateRequest>() {
                 @Override public void apply(
@@ -336,6 +320,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
 
         ctx.io().addCacheHandler(
             ctx.cacheId(),
+            ctx.startTopologyVersion(),
             GridDhtAtomicUpdateResponse.class,
             new CI2<UUID, GridDhtAtomicUpdateResponse>() {
                 @Override public void apply(
@@ -355,6 +340,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
 
         ctx.io().addCacheHandler(
             ctx.cacheId(),
+            ctx.startTopologyVersion(),
             GridDhtAtomicDeferredUpdateResponse.class,
             new CI2<UUID, GridDhtAtomicDeferredUpdateResponse>() {
                 @Override public void apply(
@@ -374,6 +360,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
 
         ctx.io().addCacheHandler(
             ctx.cacheId(),
+            ctx.startTopologyVersion(),
             GridDhtAtomicNearResponse.class,
             new CI2<UUID, GridDhtAtomicNearResponse>() {
                 @Override public void apply(UUID uuid, 
GridDhtAtomicNearResponse msg) {
@@ -388,6 +375,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
 
         ctx.io().addCacheHandler(
             ctx.cacheId(),
+            ctx.startTopologyVersion(),
             GridNearAtomicCheckUpdateRequest.class,
             new CI2<UUID, GridNearAtomicCheckUpdateRequest>() {
                 @Override public void apply(UUID uuid, 
GridNearAtomicCheckUpdateRequest msg) {
@@ -402,6 +390,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
 
         ctx.io().addCacheHandler(
             ctx.cacheId(),
+            ctx.startTopologyVersion(),
             GridDhtForceKeysRequest.class,
             new MessageHandler<GridDhtForceKeysRequest>() {
                 @Override public void onMessage(ClusterNode node, 
GridDhtForceKeysRequest msg) {
@@ -411,6 +400,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
 
         ctx.io().addCacheHandler(
             ctx.cacheId(),
+            ctx.startTopologyVersion(),
             GridDhtForceKeysResponse.class,
             new MessageHandler<GridDhtForceKeysResponse>() {
                 @Override public void onMessage(ClusterNode node, 
GridDhtForceKeysResponse msg) {
@@ -421,34 +411,26 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
         if (near == null) {
             ctx.io().addCacheHandler(
                 ctx.cacheId(),
+                ctx.startTopologyVersion(),
                 GridNearGetResponse.class,
-                new CI2<UUID, GridNearGetResponse>() {
-                    @Override public void apply(
-                        UUID nodeId,
-                        GridNearGetResponse res
-                    ) {
-                        processNearGetResponse(
-                            nodeId,
-                            res);
-                    }
-                });
+                (CI2<UUID, GridNearGetResponse>)this::processNearGetResponse);
 
             ctx.io().addCacheHandler(
                 ctx.cacheId(),
+                ctx.startTopologyVersion(),
                 GridNearSingleGetResponse.class,
-                new CI2<UUID, GridNearSingleGetResponse>() {
-                    @Override public void apply(
-                        UUID nodeId,
-                        GridNearSingleGetResponse res
-                    ) {
-                        processNearSingleGetResponse(
-                            nodeId,
-                            res);
-                    }
-                });
+                (CI2<UUID, 
GridNearSingleGetResponse>)this::processNearSingleGetResponse);
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public void start() throws IgniteCheckedException {
+        assert metrics != null : "Cache metrics instance isn't initialized.";
+
+        if (ctx.dht().near() != null)
+            metrics.delegate(ctx.dht().near().metrics0());
+    }
+
     /**
      * @param near Near cache.
      */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index f6601a79e8e..4da88bef33a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -120,26 +120,32 @@ public class GridDhtColocatedCache<K, V> extends 
GridDhtTransactionalCacheAdapte
     }
 
     /** {@inheritDoc} */
-    @Override public void start() throws IgniteCheckedException {
-        super.start();
-
-        ctx.io().addCacheHandler(ctx.cacheId(), GridNearGetResponse.class, new 
CI2<UUID, GridNearGetResponse>() {
-            @Override public void apply(UUID nodeId, GridNearGetResponse res) {
-                processNearGetResponse(nodeId, res);
-            }
-        });
-
-        ctx.io().addCacheHandler(ctx.cacheId(), 
GridNearSingleGetResponse.class, new CI2<UUID, GridNearSingleGetResponse>() {
-            @Override public void apply(UUID nodeId, GridNearSingleGetResponse 
res) {
-                processNearSingleGetResponse(nodeId, res);
-            }
-        });
+    @Override public void onKernalStart() throws IgniteCheckedException {
+        super.onKernalStart();
+
+        assert !ctx.isRecoveryMode() : "Registering message handlers in 
recovery mode [cacheName=" + name() + ']';
+
+        ctx.io().addCacheHandler(
+            ctx.cacheId(),
+            ctx.startTopologyVersion(),
+            GridNearGetResponse.class,
+            (CI2<UUID, GridNearGetResponse>)this::processNearGetResponse);
+
+        ctx.io().addCacheHandler(
+            ctx.cacheId(),
+            ctx.startTopologyVersion(),
+            GridNearSingleGetResponse.class,
+            (CI2<UUID, 
GridNearSingleGetResponse>)this::processNearSingleGetResponse);
+
+        ctx.io().addCacheHandler(
+            ctx.cacheId(),
+            ctx.startTopologyVersion(),
+            GridNearLockResponse.class,
+            (CI2<UUID, GridNearLockResponse>)this::processNearLockResponse);
+    }
 
-        ctx.io().addCacheHandler(ctx.cacheId(), GridNearLockResponse.class, 
new CI2<UUID, GridNearLockResponse>() {
-            @Override public void apply(UUID nodeId, GridNearLockResponse res) 
{
-                processNearLockResponse(nodeId, res);
-            }
-        });
+    /** {@inheritDoc} */
+    @Override public void start() throws IgniteCheckedException {
     }
 
     /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 6068f0fe6e0..2a3cf58b139 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -104,14 +104,16 @@ public class GridNearAtomicCache<K, V> extends 
GridNearCacheAdapter<K, V> {
     }
 
     /** {@inheritDoc} */
-    @Override public void start() throws IgniteCheckedException {
-        super.start();
+    @Override public void onKernalStart() throws IgniteCheckedException {
+        super.onKernalStart();
 
-        ctx.io().addCacheHandler(ctx.cacheId(), GridNearGetResponse.class, new 
CI2<UUID, GridNearGetResponse>() {
-            @Override public void apply(UUID nodeId, GridNearGetResponse res) {
-                processGetResponse(nodeId, res);
-            }
-        });
+        assert !ctx.isRecoveryMode() : "Registering message handlers in 
recovery mode [cacheName=" + name() + ']';
+
+        ctx.io().addCacheHandler(
+            ctx.cacheId(),
+            ctx.startTopologyVersion(),
+            GridNearGetResponse.class,
+            (CI2<UUID, GridNearGetResponse>)this::processGetResponse);
     }
 
     /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index 5887b8d95d8..26789218f92 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -86,20 +86,22 @@ public class GridNearTransactionalCache<K, V> extends 
GridNearCacheAdapter<K, V>
     }
 
     /** {@inheritDoc} */
-    @Override public void start() throws IgniteCheckedException {
-        super.start();
-
-        ctx.io().addCacheHandler(ctx.cacheId(), GridNearGetResponse.class, new 
CI2<UUID, GridNearGetResponse>() {
-            @Override public void apply(UUID nodeId, GridNearGetResponse res) {
-                processGetResponse(nodeId, res);
-            }
-        });
-
-        ctx.io().addCacheHandler(ctx.cacheId(), GridNearLockResponse.class, 
new CI2<UUID, GridNearLockResponse>() {
-            @Override public void apply(UUID nodeId, GridNearLockResponse res) 
{
-                processLockResponse(nodeId, res);
-            }
-        });
+    @Override public void onKernalStart() throws IgniteCheckedException {
+        super.onKernalStart();
+
+        assert !ctx.isRecoveryMode() : "Registering message handlers in 
recovery mode [cacheName=" + name() + ']';
+
+        ctx.io().addCacheHandler(
+            ctx.cacheId(),
+            ctx.startTopologyVersion(),
+            GridNearGetResponse.class,
+            (CI2<UUID, GridNearGetResponse>)this::processGetResponse);
+
+        ctx.io().addCacheHandler(
+            ctx.cacheId(),
+            ctx.startTopologyVersion(),
+            GridNearLockResponse.class,
+            (CI2<UUID, GridNearLockResponse>)this::processLockResponse);
     }
 
     /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
index 0c6dbe3e1b9..f7b3f7a8b8f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
@@ -54,6 +54,7 @@ import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteReducer;
 import org.jetbrains.annotations.Nullable;
+
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
@@ -98,26 +99,27 @@ public class GridCacheDistributedQueryManager<K, V> extends 
GridCacheQueryManage
     };
 
     /** Event listener. */
-    private GridLocalEventListener lsnr;
+    private GridLocalEventListener lsnr = new GridLocalEventListener() {
+        /** {@inheritDoc} */
+        @Override public void onEvent(Event evt) {
+            DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
 
-    /** {@inheritDoc} */
-    @Override public void start0() throws IgniteCheckedException {
-        super.start0();
+            for (GridCacheDistributedQueryFuture fut : futs.values())
+                fut.onNodeLeft(discoEvt.eventNode().id());
+        }
+    };
 
-        cctx.io().addCacheHandler(cctx.cacheId(), GridCacheQueryRequest.class, 
new CI2<UUID, GridCacheQueryRequest>() {
-            @Override public void apply(UUID nodeId, GridCacheQueryRequest 
req) {
-                processQueryRequest(nodeId, req);
-            }
-        });
+    /** {@inheritDoc} */
+    @Override public void onKernalStart0() throws IgniteCheckedException {
+        super.onKernalStart0();
 
-        lsnr = new GridLocalEventListener() {
-            @Override public void onEvent(Event evt) {
-                DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
+        assert !cctx.isRecoveryMode() : "Registering message handlers in 
recovery mode [cacheName=" + cctx.name() + ']';
 
-                for (GridCacheDistributedQueryFuture fut : futs.values())
-                    fut.onNodeLeft(discoEvt.eventNode().id());
-            }
-        };
+        cctx.io().addCacheHandler(
+            cctx.cacheId(),
+            cctx.startTopologyVersion(),
+            GridCacheQueryRequest.class,
+            (CI2<UUID, GridCacheQueryRequest>)this::processQueryRequest);
 
         cctx.events().addListener(lsnr, EVT_NODE_LEFT, EVT_NODE_FAILED);
     }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
index 0912db3c876..55e86fe9d24 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
@@ -195,7 +195,7 @@ public class GridCacheQueryRequest extends 
GridCacheIdMessage implements GridCac
             qry.includeMetadata(),
             qry.keepBinary(),
             qry.taskHash(),
-            cctx.startTopologyVersion(),
+            cctx.affinity().affinityTopologyVersion(),
             qry.mvccSnapshot(),
             // Force deployment anyway if scan query is used.
             cctx.deploymentEnabled() || deployFilterOrTransformer,
@@ -220,7 +220,7 @@ public class GridCacheQueryRequest extends 
GridCacheIdMessage implements GridCac
             false,
             qry.keepBinary(),
             qry.taskHash(),
-            cctx.startTopologyVersion(),
+            cctx.affinity().affinityTopologyVersion(),
             // Force deployment anyway if scan query is used.
             cctx.deploymentEnabled() || (qry.scanFilter() != null && 
cctx.gridDeploy().enabled()),
             qry.isDataPageScanEnabled());
@@ -236,7 +236,7 @@ public class GridCacheQueryRequest extends 
GridCacheIdMessage implements GridCac
         return new GridCacheQueryRequest(cctx.cacheId(),
             reqId,
             fieldsQry,
-            cctx.startTopologyVersion(),
+            cctx.affinity().affinityTopologyVersion(),
             cctx.deploymentEnabled());
     }
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 6d5253c2e90..8fbd4c979ce 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -158,11 +158,22 @@ public class CacheContinuousQueryManager<K, V> extends 
GridCacheManagerAdapter<K
 
     /** {@inheritDoc} */
     @Override protected void start0() throws IgniteCheckedException {
+        super.start0();
+
         // Append cache name to the topic.
         topicPrefix = "CONTINUOUS_QUERY" + (cctx.name() == null ? "" : "_" + 
cctx.name());
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected void onKernalStart0() throws IgniteCheckedException {
+        assert !cctx.isRecoveryMode() : "Registering message handlers in 
recovery mode [cacheName=" + cctx.name() + ']';
 
         if (cctx.affinityNode()) {
-            cctx.io().addCacheHandler(cctx.cacheId(), 
CacheContinuousQueryBatchAck.class,
+            cctx.io().addCacheHandler(
+                cctx.cacheId(),
+                cctx.startTopologyVersion(),
+                CacheContinuousQueryBatchAck.class,
                 new CI2<UUID, CacheContinuousQueryBatchAck>() {
                     @Override public void apply(UUID uuid, 
CacheContinuousQueryBatchAck msg) {
                         CacheContinuousQueryListener lsnr = 
lsnrs.get(msg.routineId());
@@ -174,11 +185,7 @@ public class CacheContinuousQueryManager<K, V> extends 
GridCacheManagerAdapter<K
 
             cancelableTask = cctx.time().schedule(new BackupCleaner(lsnrs, 
cctx.kernalContext()), BACKUP_ACK_FREQ, BACKUP_ACK_FREQ);
         }
-    }
 
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override protected void onKernalStart0() throws IgniteCheckedException {
         Iterable<CacheEntryListenerConfiguration> cfgs = 
cctx.config().getCacheEntryListenerConfigurations();
 
         if (cfgs != null) {
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 3780d842fc0..14647675e3d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -29,6 +29,7 @@ import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.IgniteCodeGeneratingFail;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
 import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
 import org.apache.ignite.internal.processors.cache.CacheInvokeEntry;
@@ -951,23 +952,23 @@ public class IgniteTxEntry implements 
GridPeerDeployAware, Message {
     }
 
     /**
-     * Unmarshalls entry.
+     * Prepares this entry to unmarshall. In particular, this method 
initialize a cache context.
      *
      * @param ctx Cache context.
+     * @param topVer Topology version that is used to validate a cache context.
+     *               If this parameter is {@code null} then validation will be 
skipped.
      * @param near Near flag.
-     * @param clsLdr Class loader.
      * @throws IgniteCheckedException If un-marshalling failed.
      */
-    public void unmarshal(
+    public void prepareUnmarshal(
         GridCacheSharedContext<?, ?> ctx,
-        boolean near,
-        ClassLoader clsLdr
+        AffinityTopologyVersion topVer,
+        boolean near
     ) throws IgniteCheckedException {
-
         if (this.ctx == null) {
             GridCacheContext<?, ?> cacheCtx = ctx.cacheContext(cacheId);
 
-            if (cacheCtx == null)
+            if (cacheCtx == null || (topVer != null && 
topVer.before(cacheCtx.startTopologyVersion())))
                 throw new CacheInvalidStateException(
                     "Failed to perform cache operation (cache is stopped), 
cacheId=" + cacheId);
 
@@ -978,6 +979,24 @@ public class IgniteTxEntry implements GridPeerDeployAware, 
Message {
 
             this.ctx = cacheCtx;
         }
+    }
+
+    /**
+     * Unmarshalls entry.
+     *
+     * @param ctx Cache context.
+     * @param near Near flag.
+     * @param clsLdr Class loader.
+     * @throws IgniteCheckedException If un-marshalling failed.
+     */
+    public void unmarshal(
+        GridCacheSharedContext<?, ?> ctx,
+        boolean near,
+        ClassLoader clsLdr
+    ) throws IgniteCheckedException {
+
+        if (this.ctx == null)
+            prepareUnmarshal(ctx, null, near);
 
         CacheObjectValueContext coctx = this.ctx.cacheObjectContext();
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 506b589b8e4..4f8ef80316e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -108,6 +108,7 @@ import org.jetbrains.annotations.Nullable;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
 import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.UTILITY_CACHE_POOL;
+import static 
org.apache.ignite.internal.processors.cache.GridCacheIoManager.COMMON_MESSAGE_HANDLER_ID;
 import static 
org.apache.ignite.internal.processors.cache.GridCacheOperation.NOOP;
 import static 
org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
 import static 
org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled;
@@ -134,9 +135,6 @@ import static 
org.apache.ignite.transactions.TransactionState.ROLLING_BACK;
  * Isolated logic to process cache messages.
  */
 public class IgniteTxHandler {
-    /** */
-    private static final int TX_MSG_HND_ID = 0;
-
     /** Logger. */
     private IgniteLogger log;
 
@@ -222,80 +220,68 @@ public class IgniteTxHandler {
         txPrepareMsgLog = ctx.logger(CU.TX_MSG_PREPARE_LOG_CATEGORY);
         txFinishMsgLog = ctx.logger(CU.TX_MSG_FINISH_LOG_CATEGORY);
 
-        ctx.io().addCacheHandler(TX_MSG_HND_ID, 
GridNearTxPrepareRequest.class, new CI2<UUID, GridCacheMessage>() {
+        ctx.io().addCacheHandler(GridNearTxPrepareRequest.class, new CI2<UUID, 
GridCacheMessage>() {
             @Override public void apply(UUID nodeId, GridCacheMessage msg) {
                 processNearTxPrepareRequest(nodeId, 
(GridNearTxPrepareRequest)msg);
             }
         });
 
-        ctx.io().addCacheHandler(TX_MSG_HND_ID, 
GridNearTxPrepareResponse.class, new CI2<UUID, GridCacheMessage>() {
+        ctx.io().addCacheHandler(GridNearTxPrepareResponse.class, new 
CI2<UUID, GridCacheMessage>() {
             @Override public void apply(UUID nodeId, GridCacheMessage msg) {
                 processNearTxPrepareResponse(nodeId, 
(GridNearTxPrepareResponse)msg);
             }
         });
 
-        ctx.io().addCacheHandler(TX_MSG_HND_ID, GridNearTxFinishRequest.class, 
new CI2<UUID, GridCacheMessage>() {
+        ctx.io().addCacheHandler(GridNearTxFinishRequest.class, new CI2<UUID, 
GridCacheMessage>() {
             @Override public void apply(UUID nodeId, GridCacheMessage msg) {
                 processNearTxFinishRequest(nodeId, 
(GridNearTxFinishRequest)msg);
             }
         });
 
-        ctx.io().addCacheHandler(TX_MSG_HND_ID, 
GridNearTxFinishResponse.class, new CI2<UUID, GridCacheMessage>() {
+        ctx.io().addCacheHandler(GridNearTxFinishResponse.class, new CI2<UUID, 
GridCacheMessage>() {
             @Override public void apply(UUID nodeId, GridCacheMessage msg) {
                 processNearTxFinishResponse(nodeId, 
(GridNearTxFinishResponse)msg);
             }
         });
 
-        ctx.io().addCacheHandler(TX_MSG_HND_ID, GridDhtTxPrepareRequest.class, 
new CI2<UUID, GridCacheMessage>() {
+        ctx.io().addCacheHandler(GridDhtTxPrepareRequest.class, new CI2<UUID, 
GridCacheMessage>() {
             @Override public void apply(UUID nodeId, GridCacheMessage msg) {
                 processDhtTxPrepareRequest(nodeId, 
(GridDhtTxPrepareRequest)msg);
             }
         });
 
-        ctx.io().addCacheHandler(TX_MSG_HND_ID, 
GridDhtTxPrepareResponse.class, new CI2<UUID, GridCacheMessage>() {
+        ctx.io().addCacheHandler(GridDhtTxPrepareResponse.class, new CI2<UUID, 
GridCacheMessage>() {
             @Override public void apply(UUID nodeId, GridCacheMessage msg) {
                 processDhtTxPrepareResponse(nodeId, 
(GridDhtTxPrepareResponse)msg);
             }
         });
 
-        ctx.io().addCacheHandler(TX_MSG_HND_ID, GridDhtTxFinishRequest.class, 
new CI2<UUID, GridCacheMessage>() {
+        ctx.io().addCacheHandler(GridDhtTxFinishRequest.class, new CI2<UUID, 
GridCacheMessage>() {
             @Override public void apply(UUID nodeId, GridCacheMessage msg) {
                 processDhtTxFinishRequest(nodeId, (GridDhtTxFinishRequest)msg);
             }
         });
 
-        ctx.io().addCacheHandler(TX_MSG_HND_ID, 
GridDhtTxOnePhaseCommitAckRequest.class, new CI2<UUID, GridCacheMessage>() {
+        ctx.io().addCacheHandler(GridDhtTxOnePhaseCommitAckRequest.class, new 
CI2<UUID, GridCacheMessage>() {
             @Override public void apply(UUID nodeId, GridCacheMessage msg) {
                 processDhtTxOnePhaseCommitAckRequest(nodeId, 
(GridDhtTxOnePhaseCommitAckRequest)msg);
             }
         });
 
-        ctx.io().addCacheHandler(TX_MSG_HND_ID, GridDhtTxFinishResponse.class, 
new CI2<UUID, GridCacheMessage>() {
+        ctx.io().addCacheHandler(GridDhtTxFinishResponse.class, new CI2<UUID, 
GridCacheMessage>() {
             @Override public void apply(UUID nodeId, GridCacheMessage msg) {
                 processDhtTxFinishResponse(nodeId, 
(GridDhtTxFinishResponse)msg);
             }
         });
 
-        ctx.io().addCacheHandler(TX_MSG_HND_ID, 
GridCacheTxRecoveryRequest.class,
-            new CI2<UUID, GridCacheTxRecoveryRequest>() {
-                @Override public void apply(UUID nodeId, 
GridCacheTxRecoveryRequest req) {
-                    processCheckPreparedTxRequest(nodeId, req);
-                }
-            });
+        ctx.io().addCacheHandler(GridCacheTxRecoveryRequest.class,
+            (CI2<UUID, 
GridCacheTxRecoveryRequest>)this::processCheckPreparedTxRequest);
 
-        ctx.io().addCacheHandler(TX_MSG_HND_ID, 
GridCacheTxRecoveryResponse.class,
-            new CI2<UUID, GridCacheTxRecoveryResponse>() {
-                @Override public void apply(UUID nodeId, 
GridCacheTxRecoveryResponse res) {
-                    processCheckPreparedTxResponse(nodeId, res);
-                }
-            });
+        ctx.io().addCacheHandler(GridCacheTxRecoveryResponse.class,
+            (CI2<UUID, 
GridCacheTxRecoveryResponse>)this::processCheckPreparedTxResponse);
 
-        ctx.io().addCacheHandler(TX_MSG_HND_ID, 
IncrementalSnapshotAwareMessage.class,
-            new CI2<UUID, IncrementalSnapshotAwareMessage>() {
-                @Override public void apply(UUID nodeId, 
IncrementalSnapshotAwareMessage msg) {
-                    processIncrementalSnapshotAwareMessage(nodeId, msg);
-                }
-            });
+        ctx.io().addCacheHandler(IncrementalSnapshotAwareMessage.class,
+            (CI2<UUID, 
IncrementalSnapshotAwareMessage>)this::processIncrementalSnapshotAwareMessage);
     }
 
     /** */
@@ -310,7 +296,7 @@ public class IgniteTxHandler {
         GridCacheMessage cacheMsg = msg.payload();
 
         ctx.io()
-            .cacheHandler(TX_MSG_HND_ID, cacheMsg.getClass())
+            .cacheHandler(COMMON_MESSAGE_HANDLER_ID, cacheMsg.getClass())
             .apply(nodeId, cacheMsg);
     }
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
index 14ac86ef782..da485b02294 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
@@ -892,7 +892,7 @@ public class GridClusterStateProcessor extends 
GridProcessorAdapter implements I
         sharedCtx = cacheProc.context();
 
         sharedCtx.io().addCacheHandler(
-            0, GridChangeGlobalStateMessageResponse.class,
+            GridChangeGlobalStateMessageResponse.class,
             this::processChangeGlobalStateResponse
         );
     }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheRecreateTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheRecreateTest.java
new file mode 100755
index 00000000000..aca4448f420
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheRecreateTest.java
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import javax.cache.CacheException;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
+import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
+import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest;
+import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
+import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.junit.Test;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+import static 
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
+
+/**
+ * Tests handling of ppending cache messages/operations when the required 
cache was re created.
+ * */
+public class IgniteCacheRecreateTest extends GridCommonAbstractTest {
+    /** Cache name to be used in tests. */
+    private static final String CACHE_NAME = "test-recreate-cache";
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrids(1);
+
+        startClientGrid(1);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        grid(0).destroyCache(CACHE_NAME);
+
+        awaitPartitionMapExchange();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testAtomicPutAndCacheRecreate() throws Exception {
+        testCacheOperationAndCacheRecreate(
+            ATOMIC,
+            GridNearAtomicAbstractUpdateRequest.class,
+            (cache, keys) -> cache.put(keys.get(0), 42));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testAtomicGetAndCacheRecreate() throws Exception {
+        testCacheOperationAndCacheRecreate(
+            ATOMIC,
+            GridNearSingleGetRequest.class,
+            (cache, keys) -> cache.get(keys.get(0)));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testAtomicPutAllAndCacheRecreate() throws Exception {
+        testCacheOperationAndCacheRecreate(
+            ATOMIC,
+            GridNearAtomicFullUpdateRequest.class,
+            (cache, keys) -> {
+                Map<Integer, Integer> vals = new TreeMap<>();
+                vals.put(keys.get(0), 24);
+                vals.put(keys.get(1), 42);
+
+                cache.putAll(vals);
+            });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testAtomicGetAllAndCacheRecreate() throws Exception {
+        testCacheOperationAndCacheRecreate(
+            ATOMIC,
+            GridNearGetRequest.class,
+            (cache, keys) -> {
+                Set<Integer> vals = new TreeSet<>();
+                vals.add(keys.get(0));
+                vals.add(keys.get(1));
+
+                cache.getAll(vals);
+            });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testImplicitInvokeAndCacheRecreate() throws Exception {
+        testCacheOperationAndCacheRecreate(
+            ATOMIC,
+            GridNearAtomicAbstractUpdateRequest.class,
+            (cache, keys) -> {
+                cache.invoke(keys.get(0), new CacheEntryProcessor<Integer, 
Integer, Integer>() {
+                    @Override public Integer process(
+                        MutableEntry<Integer, Integer> entry,
+                        Object... arguments
+                    ) throws EntryProcessorException {
+                        if (entry.exists())
+                            return entry.getValue();
+
+                        entry.setValue(123);
+
+                        return entry.getValue();
+                    }
+                });
+            });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testImplicitOptimisticTxPutAndCacheRecreate() throws Exception 
{
+        testCacheOperationAndCacheRecreate(
+            TRANSACTIONAL,
+            GridNearTxPrepareRequest.class,
+            (cache, keys) -> cache.put(keys.get(0), 42));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testImplicitOptimisticTxGetAndCacheRecreate() throws Exception 
{
+        testCacheOperationAndCacheRecreate(
+            TRANSACTIONAL,
+            GridNearSingleGetRequest.class,
+            (cache, keys) -> cache.get(keys.get(0)));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testImplicitOptimisticTxPutAllAndCacheRecreate() throws 
Exception {
+        testCacheOperationAndCacheRecreate(
+            TRANSACTIONAL,
+            GridNearTxPrepareRequest.class,
+            (cache, keys) -> {
+                Map<Integer, Integer> vals = new TreeMap<>();
+                vals.put(keys.get(0), 24);
+                vals.put(keys.get(1), 42);
+
+                cache.putAll(vals);
+            });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testImplicitTxInvokeAndCacheRecreate() throws Exception {
+        testCacheOperationAndCacheRecreate(
+            TRANSACTIONAL,
+            GridNearTxPrepareRequest.class,
+            (cache, keys) -> {
+                cache.invoke(keys.get(0), new CacheEntryProcessor<Integer, 
Integer, Integer>() {
+                    @Override public Integer process(
+                        MutableEntry<Integer, Integer> entry,
+                        Object... arguments
+                    ) throws EntryProcessorException {
+                        if (entry.exists())
+                            return entry.getValue();
+
+                        entry.setValue(123);
+
+                        return entry.getValue();
+                    }
+                });
+            });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testPessimisticTxPutAndCacheRecreate() throws Exception {
+        testCacheOperationAndCacheRecreate(
+            TRANSACTIONAL,
+            GridNearLockRequest.class,
+            (cache, keys) -> {
+                try (Transaction tx = 
grid(1).transactions().txStart(PESSIMISTIC, SERIALIZABLE)) {
+                    cache.put(keys.get(0), 42);
+
+                    tx.commit();
+                }
+            });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testPessimisticTxPutAllAndCacheRecreate() throws Exception {
+        testCacheOperationAndCacheRecreate(
+            TRANSACTIONAL,
+            GridNearLockRequest.class,
+            (cache, keys) -> {
+                try (Transaction tx = 
grid(1).transactions().txStart(PESSIMISTIC, SERIALIZABLE)) {
+                    Map<Integer, Integer> vals = new TreeMap<>();
+                    vals.put(keys.get(0), 24);
+                    vals.put(keys.get(1), 42);
+
+                    cache.putAll(vals);
+
+                    tx.commit();
+                }
+            });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testPessimisticTxGetAndCacheRecreate() throws Exception {
+        testCacheOperationAndCacheRecreate(
+            TRANSACTIONAL,
+            GridNearLockRequest.class,
+            (cache, keys) -> {
+                try (Transaction tx = 
grid(1).transactions().txStart(PESSIMISTIC, SERIALIZABLE)) {
+                    cache.get(keys.get(0));
+
+                    tx.commit();
+                }
+            });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testAtomicScanAndCacheRecreate() throws Exception {
+        testCacheOperationAndCacheRecreate(
+            ATOMIC,
+            GridCacheQueryRequest.class,
+            (cache, keys) -> {
+                ScanQuery<Integer, Integer> scanQuery = new ScanQuery<>();
+                scanQuery.setPageSize(1);
+
+                try (QueryCursor qry = cache.query(scanQuery)) {
+                    for (Object o : qry.getAll()) {
+                        IgniteBiTuple<Integer, Integer> tuple = 
(IgniteBiTuple<Integer, Integer>)o;
+
+                        throw new RuntimeException("Succesfully read 
unexpected value [k=" + tuple.getKey() + ", v=" + tuple.getValue());
+                    }
+                }
+            });
+    }
+
+    /**
+     *
+     * @param mode Cache atomicity mode.
+     * @param clazz Cache message type to be blocked before re-creating a 
cache.
+     * @param cacheOp Cache operation.
+     * @throws Exception If failed.
+     */
+    private void testCacheOperationAndCacheRecreate(
+        CacheAtomicityMode mode,
+        Class<? extends GridCacheIdMessage> clazz,
+        IgniteBiInClosure<IgniteCache<Integer, Integer>, List<Integer>> cacheOp
+    ) throws Exception {
+        IgniteEx g0 = grid(0);
+        IgniteEx client = grid(1);
+
+        // Initial loading.
+        IgniteCache<Integer, Integer> clientCache = createCache(client, mode);
+        for (int i = 0; i < 100; i++)
+            clientCache.put(i, i);
+
+        TestRecordingCommunicationSpi clientSpi = 
TestRecordingCommunicationSpi.spi(client);
+        TestRecordingCommunicationSpi crdSpi = 
TestRecordingCommunicationSpi.spi(g0);
+
+        // Block cache operation.
+        clientSpi.blockMessages((node, msg) -> {
+            if (clazz.isAssignableFrom(msg.getClass())) {
+                GridCacheIdMessage msg0 = (GridCacheIdMessage)msg;
+
+                if (msg0.cacheId() == 0 || msg0.cacheId() == 
CU.cacheId(CACHE_NAME))
+                    return true;
+            }
+
+            return false;
+        });
+
+        // Block notifying the client node about upcoming changes.
+        crdSpi.blockMessages((node, msg) -> {
+            if (msg instanceof GridDhtPartitionsFullMessage)
+                return true;
+
+            return false;
+        });
+
+        List<Integer> primaryKeys = primaryKeys(g0.cache(CACHE_NAME), 2, 1);
+
+        // Initiate cache operation.
+        IgniteInternalFuture<?> updFut = runAsync(() -> 
cacheOp.apply(clientCache, primaryKeys));
+
+        // Wait for operation is initiated on the client node.
+        clientSpi.waitForBlocked();
+
+        // Destoy the existing cache and re-create it once again in order to 
deliver the blocked cache message to the server node
+        // when the reqired cache is destroyed and new cache handlers are 
registered.
+        g0.destroyCache(clientCache.getName());
+
+        // Create a new cache with the same name.
+        IgniteCache newCache = createCache(g0, mode);
+
+        // Upload new values.
+        for (int i = 0; i < 100; i++)
+            newCache.put(i, i + 1_000);
+
+        // Unblock cache operation.
+        clientSpi.stopBlock();
+
+        try {
+            updFut.get(10, TimeUnit.SECONDS);
+
+            fail("Exception was not thrown.");
+        }
+        catch (Exception e) {
+            assertTrue("Unexpected exception [err=" + e + ']', X.hasCause(e, 
CacheException.class));
+        }
+        finally {
+            crdSpi.stopBlock();
+        }
+    }
+
+    /**
+     * Creates a cache using the given node as initiator node and the given 
atomicity mode.
+     *
+     * @param ignite Node to be used to initiate creating a new cache.
+     * @param mode Cache atomicity mode.
+     * @return Ignite cache.
+     */
+    private IgniteCache<Integer, Integer> createCache(IgniteEx ignite, 
CacheAtomicityMode mode) {
+        CacheConfiguration<Integer, Integer> cfg = new 
CacheConfiguration<>(CACHE_NAME);
+
+        cfg.setBackups(1)
+            .setReadFromBackup(false)
+            .setAtomicityMode(mode)
+            .setAffinity(new RendezvousAffinityFunction(false, 32));
+
+        return ignite.getOrCreateCache(cfg);
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
index b6c14912874..09dc5e841b5 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
@@ -121,14 +121,16 @@ public class GridCacheMessageSelfTest extends 
GridCommonAbstractTest {
             IgniteEx ignite1 = grid(1);
 
             ignite0.context().cache().context().io().addCacheHandler(
-                0, TestBadMessage.class, new CI2<UUID, GridCacheMessage>() {
+                TestBadMessage.class,
+                new CI2<UUID, GridCacheMessage>() {
                     @Override public void apply(UUID nodeId, GridCacheMessage 
msg) {
                         throw new RuntimeException("Test bad message 
exception");
                     }
                 });
 
             ignite1.context().cache().context().io().addCacheHandler(
-                0, TestBadMessage.class, new CI2<UUID, GridCacheMessage>() {
+                TestBadMessage.class,
+                new CI2<UUID, GridCacheMessage>() {
                     @Override public void apply(UUID nodeId, GridCacheMessage 
msg) {
                         throw new RuntimeException("Test bad message 
exception");
                     }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java
index 354c26c048d..2a9c2c6208f 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java
@@ -23,6 +23,7 @@ import 
org.apache.ignite.internal.processors.cache.IgniteCachePutAllRestartTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.IgniteBinaryMetadataUpdateNodeRestartTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.IgniteCacheAtomicNodeRestartTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.IgniteCacheGetRestartTest;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheRecreateTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheAtomicReplicatedNodeRestartSelfTest;
 import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
@@ -41,6 +42,8 @@ import org.junit.runners.Suite;
 
     IgniteBinaryMetadataUpdateNodeRestartTest.class,
 
+    IgniteCacheRecreateTest.class,
+
     IgniteCacheGetRestartTest.class
 })
 public class IgniteCacheRestartTestSuite2 {

Reply via email to