Repository: ignite
Updated Branches:
  refs/heads/ignite-2.7 f3460b787 -> dfaeede8f


IGNITE-9550 fix Get operation returns null for a lost partition with READ_SAFE 
policy - Fixes #4947.

Signed-off-by: Dmitriy Govorukhin <[email protected]>

(cherry picked from commit cbbd9ad4a329d69ca88517ce2c480bffcf413e78)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/dfaeede8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/dfaeede8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/dfaeede8

Branch: refs/heads/ignite-2.7
Commit: dfaeede8f278b79b6a24368cf4a844c666d138cc
Parents: f3460b7
Author: Dmitriy Govorukhin <[email protected]>
Authored: Thu Oct 11 12:42:27 2018 +0300
Committer: Dmitriy Govorukhin <[email protected]>
Committed: Thu Oct 11 12:54:02 2018 +0300

----------------------------------------------------------------------
 .../cache/CacheAffinitySharedManager.java       |  22 +-
 .../processors/cache/GridCacheAdapter.java      |   9 +-
 .../GridCachePartitionExchangeManager.java      |  15 +-
 .../processors/cache/GridCacheProcessor.java    | 238 +++++++++++++++----
 .../processors/cache/GridCacheProxyImpl.java    |   7 +-
 .../processors/cache/IgniteCacheProxyImpl.java  |  95 +++++++-
 .../dht/GridPartitionedGetFuture.java           |   4 +-
 .../dht/GridPartitionedSingleGetFuture.java     |   4 +-
 .../GridNearAtomicSingleUpdateFuture.java       |   2 +-
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |   2 +-
 .../GridDhtPartitionsExchangeFuture.java        |  55 ++++-
 .../distributed/near/GridNearGetFuture.java     |  27 +--
 .../cache/query/GridCacheQueryAdapter.java      |   2 +-
 .../continuous/CacheContinuousQueryHandler.java |   2 +-
 .../continuous/CacheContinuousQueryManager.java |   2 +-
 .../cluster/DiscoveryDataClusterState.java      |  17 ++
 .../cluster/GridClusterStateProcessor.java      |   2 +-
 .../GridAffinityProcessorMemoryLeakTest.java    |   2 +-
 ...CacheResultIsNotNullOnPartitionLossTest.java | 213 +++++++++++++++++
 .../testsuites/IgniteCacheTestSuite4.java       |   3 +
 .../kafka/KafkaIgniteStreamerSelfTest.java      |  22 +-
 21 files changed, 630 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/dfaeede8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index d009c5d..b57530b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -408,7 +408,8 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
         ClientCacheChangeDummyDiscoveryMessage msg,
         boolean crd,
         AffinityTopologyVersion topVer,
-        DiscoCache discoCache) {
+        DiscoCache discoCache
+    ) {
         Map<String, DynamicCacheChangeRequest> startReqs = msg.startRequests();
 
         if (startReqs == null)
@@ -434,11 +435,13 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
 
                 DynamicCacheChangeRequest startReq = 
startReqs.get(desc.cacheName());
 
-                cctx.cache().prepareCacheStart(desc.cacheConfiguration(),
+                cctx.cache().prepareCacheStart(
+                    desc.cacheConfiguration(),
                     desc,
                     startReq.nearCacheConfiguration(),
                     topVer,
-                    startReq.disabledAfterStart());
+                    startReq.disabledAfterStart()
+                );
 
                 startedInfos.put(desc.cacheId(), 
startReq.nearCacheConfiguration() != null);
 
@@ -564,6 +567,8 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
 
         cctx.cache().initCacheProxies(topVer, null);
 
+        startReqs.keySet().forEach(req -> 
cctx.cache().completeProxyInitialize(req));
+
         cctx.cache().completeClientCacheChangeFuture(msg.requestId(), null);
 
         return startedInfos;
@@ -578,7 +583,8 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
     private Set<Integer> processCacheCloseRequests(
         ClientCacheChangeDummyDiscoveryMessage msg,
         boolean crd,
-        AffinityTopologyVersion topVer) {
+        AffinityTopologyVersion topVer
+    ) {
         Set<String> cachesToClose = msg.cachesToClose();
 
         if (cachesToClose == null)
@@ -869,7 +875,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                 assert cctx.cacheContext(cacheDesc.cacheId()) == null
                     : "Starting cache has not null context: " + 
cacheDesc.cacheName();
 
-                IgniteCacheProxyImpl cacheProxy = (IgniteCacheProxyImpl) 
cctx.cache().jcacheProxy(req.cacheName());
+                IgniteCacheProxyImpl cacheProxy = 
cctx.cache().jcacheProxy(req.cacheName(), false);
 
                 // If it has proxy then try to start it
                 if (cacheProxy != null) {
@@ -887,11 +893,13 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
 
             try {
                 if (startCache) {
-                    
cctx.cache().prepareCacheStart(req.startCacheConfiguration(),
+                    cctx.cache().prepareCacheStart(
+                        req.startCacheConfiguration(),
                         cacheDesc,
                         nearCfg,
                         evts.topologyVersion(),
-                        req.disabledAfterStart());
+                        req.disabledAfterStart()
+                    );
 
                     if (fut.cacheAddedOnExchange(cacheDesc.cacheId(), 
cacheDesc.receivedFrom())) {
                         if 
(fut.events().discoveryCache().cacheGroupAffinityNodes(cacheDesc.groupId()).isEmpty())

http://git-wip-us.apache.org/repos/asf/ignite/blob/dfaeede8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index cf9337b..3156d6d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -4264,7 +4264,10 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
 
                             assert topVer != null && topVer.topologyVersion() 
> 0 : tx;
 
-                            
ctx.affinity().affinityReadyFuture(topVer.topologyVersion() + 1).get();
+                            AffinityTopologyVersion awaitVer = new 
AffinityTopologyVersion(
+                                topVer.topologyVersion() + 1, 0);
+
+                            
ctx.shared().exchange().affinityReadyFuture(awaitVer).get();
 
                             continue;
                         }
@@ -5031,8 +5034,10 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
 
                                 assert topVer != null && 
topVer.topologyVersion() > 0 : tx;
 
+                                AffinityTopologyVersion awaitVer = new 
AffinityTopologyVersion(topVer.topologyVersion() + 1, 0);
+
                                 IgniteInternalFuture<?> topFut =
-                                    
ctx.affinity().affinityReadyFuture(topVer.topologyVersion() + 1);
+                                    
ctx.shared().exchange().affinityReadyFuture(awaitVer);
 
                                 topFut.listen(new 
IgniteInClosure<IgniteInternalFuture<?>>() {
                                     @Override public void 
apply(IgniteInternalFuture<?> topFut) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/dfaeede8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index a18022e..dbfc3e4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -101,6 +101,7 @@ import org.apache.ignite.internal.util.GridListSet;
 import org.apache.ignite.internal.util.GridPartitionStateMap;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -898,7 +899,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
         while (true) {
             GridDhtTopologyFuture cur = lastFinishedFut.get();
 
-            if (cur == null || 
fut.topologyVersion().compareTo(cur.topologyVersion()) > 0) {
+            if (fut.topologyVersion() != null && (cur == null || 
fut.topologyVersion().compareTo(cur.topologyVersion()) > 0)) {
                 if (lastFinishedFut.compareAndSet(cur, fut))
                     break;
             }
@@ -912,23 +913,13 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
      * @return Future or {@code null} is future is already completed.
      */
     @Nullable public IgniteInternalFuture<AffinityTopologyVersion> 
affinityReadyFuture(AffinityTopologyVersion ver) {
-        GridDhtPartitionsExchangeFuture lastInitializedFut0 = 
lastInitializedFut;
-
-        if (lastInitializedFut0 != null && 
lastInitializedFut0.initialVersion().compareTo(ver) == 0) {
-            if (log.isTraceEnabled())
-                log.trace("Return lastInitializedFut for topology ready future 
" +
-                    "[ver=" + ver + ", fut=" + lastInitializedFut0 + ']');
-
-            return lastInitializedFut0;
-        }
-
         AffinityTopologyVersion topVer = exchFuts.readyTopVer();
 
         if (topVer.compareTo(ver) >= 0) {
             if (log.isTraceEnabled())
                 log.trace("Return finished future for topology ready future 
[ver=" + ver + ", topVer=" + topVer + ']');
 
-            return null;
+            return new GridFinishedFuture<>(topVer);
         }
 
         GridFutureAdapter<AffinityTopologyVersion> fut = 
F.addIfAbsent(readyFuts, ver,

http://git-wip-us.apache.org/repos/asf/ignite/blob/dfaeede8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index f595ecf..a6de1e5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -35,6 +35,7 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import javax.cache.configuration.FactoryBuilder;
 import javax.cache.expiry.EternalExpiryPolicy;
@@ -740,7 +741,6 @@ public class GridCacheProcessor extends 
GridProcessorAdapter implements Metastor
     }
 
     /**
-     *
      * @throws IgniteCheckedException If failed.
      */
     private void startCachesOnStart() throws IgniteCheckedException {
@@ -1216,6 +1216,9 @@ public class GridCacheProcessor extends 
GridProcessorAdapter implements Metastor
         sharedCtx.removeCacheContext(cctx);
 
         caches.remove(cctx.name());
+
+        completeProxyInitialize(cctx.name());
+
         jCacheProxies.remove(cctx.name());
 
         stoppedCaches.add(cctx.cache());
@@ -1415,9 +1418,10 @@ public class GridCacheProcessor extends 
GridProcessorAdapter implements Metastor
             if (destroy && (pageStore = sharedCtx.pageStore()) != null) {
                 try {
                     pageStore.removeCacheData(new 
StoredCacheData(ctx.config()));
-                } catch (IgniteCheckedException e) {
+                }
+                catch (IgniteCheckedException e) {
                     U.error(log, "Failed to delete cache configuration data 
while destroying cache" +
-                            "[cache=" + ctx.name() + "]", e);
+                        "[cache=" + ctx.name() + "]", e);
                 }
             }
 
@@ -1818,6 +1822,67 @@ public class GridCacheProcessor extends 
GridProcessorAdapter implements Metastor
     }
 
     /**
+     *
+     * @param reqs Cache requests to start.
+     * @param fut Completable future.
+     */
+    public void registrateProxyRestart(Map<String, DynamicCacheChangeRequest> 
reqs, GridFutureAdapter<?> fut) {
+        for (IgniteCacheProxyImpl<?, ?> proxy : jCacheProxies.values()) {
+            if (reqs.containsKey(proxy.getName()) &&
+                proxy.isRestarting() &&
+                !reqs.get(proxy.getName()).disabledAfterStart()
+            )
+                proxy.registrateFutureRestart(fut);
+        }
+    }
+
+    /**
+     *
+     * @param reqs Cache requests to start.
+     * @param initVer Init exchange version.
+     * @param doneVer Finish excahnge vertison.
+     */
+    public void completeProxyRestart(
+        Map<String, DynamicCacheChangeRequest> reqs,
+        AffinityTopologyVersion initVer,
+        AffinityTopologyVersion doneVer
+    ) {
+        if (initVer == null || doneVer == null)
+            return;
+
+        for (GridCacheAdapter<?, ?> cache : caches.values()) {
+            GridCacheContext<?, ?> cacheCtx = cache.context();
+
+            if (reqs.containsKey(cache.name()) ||
+                (cacheCtx.startTopologyVersion().compareTo(initVer) <= 0 ||
+                    cacheCtx.startTopologyVersion().compareTo(doneVer) <= 0))
+                completeProxyInitialize(cache.name());
+
+            if (
+                cacheCtx.startTopologyVersion().compareTo(initVer) >= 0 &&
+                    cacheCtx.startTopologyVersion().compareTo(doneVer) <= 0
+            ) {
+                IgniteCacheProxyImpl<?, ?> proxy = 
jCacheProxies.get(cache.name());
+
+                boolean canRestart = true;
+
+                DynamicCacheChangeRequest req = reqs.get(cache.name());
+
+                if (req != null) {
+                    canRestart = !req.disabledAfterStart();
+                }
+
+                if (proxy != null && proxy.isRestarting() && canRestart) {
+                    proxy.onRestarted(cacheCtx, cache);
+
+                    if (cacheCtx.dataStructuresCache())
+                        ctx.dataStructures().restart(proxy.internalProxy());
+                }
+            }
+        }
+    }
+
+    /**
      * Gets a collection of currently started caches.
      *
      * @return Collection of started cache names.
@@ -1832,8 +1897,8 @@ public class GridCacheProcessor extends 
GridProcessorAdapter implements Metastor
     }
 
     /**
-     * Gets public cache that can be used for query execution.
-     * If cache isn't created on current node it will be started.
+     * Gets public cache that can be used for query execution. If cache isn't 
created on current node it will be
+     * started.
      *
      * @param start Start cache.
      * @param inclLoc Include local caches.
@@ -2009,8 +2074,8 @@ public class GridCacheProcessor extends 
GridProcessorAdapter implements Metastor
      * @param desc Cache descriptor.
      * @param reqNearCfg Near configuration if specified for client cache 
start request.
      * @param exchTopVer Current exchange version.
-     * @param disabledAfterStart If true, then we will discard restarting 
state from proxies. If false then we will change
-     *  state of proxies to restarting
+     * @param disabledAfterStart If true, then we will discard restarting 
state from proxies. If false then we will
+     * change state of proxies to restarting
      * @throws IgniteCheckedException If failed.
      */
     void prepareCacheStart(
@@ -2097,20 +2162,11 @@ public class GridCacheProcessor extends 
GridProcessorAdapter implements Metastor
         grp.onCacheStarted(cacheCtx);
 
         onKernalStart(cache);
-
-        IgniteCacheProxyImpl<?, ?> proxy = jCacheProxies.get(ccfg.getName());
-
-        if (!disabledAfterStart && proxy != null && proxy.isRestarting()) {
-            proxy.onRestarted(cacheCtx, cache);
-
-            if (cacheCtx.dataStructuresCache())
-                ctx.dataStructures().restart(proxy.internalProxy());
-        }
     }
 
     /**
-     * Restarts proxies of caches if they was marked as restarting.
-     * Requires external synchronization - shouldn't be called concurrently 
with another caches restart.
+     * Restarts proxies of caches if they was marked as restarting. Requires 
external synchronization - shouldn't be
+     * called concurrently with another caches restart.
      */
     public void restartProxies() {
         for (IgniteCacheProxyImpl<?, ?> proxy : jCacheProxies.values()) {
@@ -2184,7 +2240,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter implements Metastor
 
         CacheGroupContext old = cacheGrps.put(desc.groupId(), grp);
 
-        if (!grp.systemCache()  && !U.IGNITE_MBEANS_DISABLED) {
+        if (!grp.systemCache() && !U.IGNITE_MBEANS_DISABLED) {
             try {
                 U.registerMBean(ctx.config().getMBeanServer(), 
ctx.igniteInstanceName(), CACHE_GRP_METRICS_MBEAN_GRP,
                     grp.cacheOrGroupName(), grp.mxBean(), 
CacheGroupMetricsMXBean.class);
@@ -2206,7 +2262,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter implements Metastor
      */
     void blockGateway(String cacheName, boolean stop, boolean restart) {
         // Break the proxy before exchange future is done.
-        IgniteCacheProxyImpl<?, ?> proxy = jCacheProxies.get(cacheName);
+        IgniteCacheProxyImpl<?, ?> proxy = jcacheProxy(cacheName, false);
 
         if (restart) {
             GridCacheAdapter<?, ?> cache = caches.get(cacheName);
@@ -2250,8 +2306,11 @@ public class GridCacheProcessor extends 
GridProcessorAdapter implements Metastor
             if (proxy != null)
                 proxy.restart();
         }
-        else
+        else {
+            completeProxyInitialize(req.cacheName());
+
             proxy = jCacheProxies.remove(req.cacheName());
+        }
 
         if (proxy != null)
             proxy.context().gate().onStopped();
@@ -2292,12 +2351,12 @@ public class GridCacheProcessor extends 
GridProcessorAdapter implements Metastor
 
             if (cacheCtx.startTopologyVersion().equals(startTopVer)) {
                 if (!jCacheProxies.containsKey(cacheCtx.name())) {
-                    IgniteCacheProxyImpl newProxy = new 
IgniteCacheProxyImpl(cache.context(), cache, false);
+                    IgniteCacheProxyImpl<?, ?> newProxy = new 
IgniteCacheProxyImpl(cache.context(), cache, false);
 
                     if (!cache.active())
                         newProxy.restart();
 
-                    jCacheProxies.putIfAbsent(cacheCtx.name(), newProxy);
+                    addjCacheProxy(cacheCtx.name(), newProxy);
                 }
 
                 if (cacheCtx.preloader() != null)
@@ -2315,6 +2374,8 @@ public class GridCacheProcessor extends 
GridProcessorAdapter implements Metastor
         Set<Integer> ids = null;
 
         for (String cacheName : cachesToClose) {
+            completeProxyInitialize(cacheName);
+
             blockGateway(cacheName, false, false);
 
             GridCacheContext ctx = 
sharedCtx.cacheContext(CU.cacheId(cacheName));
@@ -2345,6 +2406,8 @@ public class GridCacheProcessor extends 
GridProcessorAdapter implements Metastor
             assert cache != null : cctx.name();
 
             jCacheProxies.put(cctx.name(), new 
IgniteCacheProxyImpl(cache.context(), cache, false));
+
+            completeProxyInitialize(cctx.name());
         }
         else {
             cctx.gate().onStopped();
@@ -2356,6 +2419,8 @@ public class GridCacheProcessor extends 
GridProcessorAdapter implements Metastor
                 if (!cctx.affinityNode() && cctx.transactional())
                     
sharedCtx.tm().rollbackTransactionsForCache(cctx.cacheId());
 
+                completeProxyInitialize(cctx.name());
+
                 jCacheProxies.remove(cctx.name());
 
                 sharedCtx.database().checkpointReadLock();
@@ -2377,8 +2442,8 @@ public class GridCacheProcessor extends 
GridProcessorAdapter implements Metastor
     }
 
     /**
-     * Called during the rollback of the exchange partitions procedure
-     * in order to stop the given cache even if it's not fully initialized 
(e.g. failed on cache init stage).
+     * Called during the rollback of the exchange partitions procedure in 
order to stop the given cache even if it's not
+     * fully initialized (e.g. failed on cache init stage).
      *
      * @param exchActions Stop requests.
      */
@@ -2436,8 +2501,8 @@ public class GridCacheProcessor extends 
GridProcessorAdapter implements Metastor
                 CacheGroupContext grp = cacheGrps.get(groupId);
 
                 if (grp != null && grp.persistenceEnabled() && 
sharedCtx.database() instanceof GridCacheDatabaseSharedManager) {
-                    GridCacheDatabaseSharedManager mngr = 
(GridCacheDatabaseSharedManager) sharedCtx.database();
-                    mngr.removeCheckpointListener((DbCheckpointListener) 
grp.offheap());
+                    GridCacheDatabaseSharedManager mngr = 
(GridCacheDatabaseSharedManager)sharedCtx.database();
+                    
mngr.removeCheckpointListener((DbCheckpointListener)grp.offheap());
                 }
             }
         }
@@ -3484,7 +3549,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter implements Metastor
     IgniteInternalFuture<?> dynamicCloseCache(String cacheName) {
         assert cacheName != null;
 
-        IgniteCacheProxy<?, ?> proxy = jCacheProxies.get(cacheName);
+        IgniteCacheProxy<?, ?> proxy = jcacheProxy(cacheName, false);
 
         if (proxy == null || proxy.isProxyClosed())
             return new GridFinishedFuture<>(); // No-op.
@@ -3657,6 +3722,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter implements Metastor
 
     /**
      * Authorize creating cache.
+     *
      * @param cfg Cache configuration.
      * @param secCtx Optional security context.
      */
@@ -3670,6 +3736,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter implements Metastor
 
     /**
      * Authorize dynamic cache management for this node.
+     *
      * @param req start/stop cache request.
      */
     private void authorizeCacheChange(DynamicCacheChangeRequest req) {
@@ -3752,7 +3819,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter implements Metastor
             return 
cachesInfo.onCacheChangeRequested((DynamicCacheChangeBatch)msg, topVer);
 
         if (msg instanceof DynamicCacheChangeFailureMessage)
-            
cachesInfo.onCacheChangeRequested((DynamicCacheChangeFailureMessage) msg, 
topVer);
+            
cachesInfo.onCacheChangeRequested((DynamicCacheChangeFailureMessage)msg, 
topVer);
 
         if (msg instanceof ClientCacheChangeDiscoveryMessage)
             
cachesInfo.onClientCacheChange((ClientCacheChangeDiscoveryMessage)msg, node);
@@ -3995,12 +4062,61 @@ public class GridCacheProcessor extends 
GridProcessorAdapter implements Metastor
         if (log.isDebugEnabled())
             log.debug("Getting cache for name: " + name);
 
-        IgniteCacheProxy<K, V> jcache = (IgniteCacheProxy<K, 
V>)jCacheProxies.get(name);
+        IgniteCacheProxy<K, V> jcache = (IgniteCacheProxy<K, 
V>)jcacheProxy(name, true);
 
         return jcache == null ? null : jcache.internalProxy();
     }
 
     /**
+     * Await proxy initialization.
+     *
+     * @param jcache Cache proxy.
+     */
+    private void awaitInitializeProxy(IgniteCacheProxyImpl<?, ?> jcache) {
+        if (jcache != null) {
+            CountDownLatch initLatch = jcache.getInitLatch();
+
+            try {
+                while (initLatch.getCount() > 0) {
+                    initLatch.await(2000, TimeUnit.MILLISECONDS);
+
+                    if (log.isInfoEnabled())
+                        log.info("Failed to wait proxy initialization, cache=" 
+ jcache.getName() +
+                            ", localNodeId=" + ctx.localNodeId());
+                }
+            }
+            catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                // Ignore intteruption.
+            }
+        }
+    }
+
+    /**
+     * @param name Cache name.
+     */
+    public void completeProxyInitialize(String name) {
+        IgniteCacheProxyImpl<?, ?> jcache = jCacheProxies.get(name);
+
+        if (jcache != null) {
+            CountDownLatch proxyInitLatch = jcache.getInitLatch();
+
+            if (proxyInitLatch.getCount() > 0) {
+                if (log.isInfoEnabled())
+                    log.info("Finish proxy initialization, cacheName=" + name +
+                        ", localNodeId=" + ctx.localNodeId());
+
+                proxyInitLatch.countDown();
+            }
+        }
+        else {
+            if (log.isInfoEnabled())
+                log.info("Can not finish proxy initialization because proxy 
does not exist, cacheName=" + name +
+                    ", localNodeId=" + ctx.localNodeId());
+        }
+    }
+
+    /**
      * @param name Cache name.
      * @return Cache instance for given name.
      * @throws IgniteCheckedException If failed.
@@ -4016,18 +4132,21 @@ public class GridCacheProcessor extends 
GridProcessorAdapter implements Metastor
      * @throws IgniteCheckedException If failed.
      */
     @SuppressWarnings("unchecked")
-    public <K, V> IgniteInternalCache<K, V> getOrStartCache(String name, 
CacheConfiguration ccfg) throws IgniteCheckedException {
+    public <K, V> IgniteInternalCache<K, V> getOrStartCache(
+        String name,
+        CacheConfiguration ccfg
+    ) throws IgniteCheckedException {
         assert name != null;
 
         if (log.isDebugEnabled())
             log.debug("Getting cache for name: " + name);
 
-        IgniteCacheProxy<?, ?> cache = jCacheProxies.get(name);
+        IgniteCacheProxy<?, ?> cache = jcacheProxy(name, true);
 
         if (cache == null) {
             dynamicStartCache(ccfg, name, null, false, ccfg == null, 
true).get();
 
-            cache = jCacheProxies.get(name);
+            cache = jcacheProxy(name, true);
         }
 
         return cache == null ? null : (IgniteInternalCache<K, 
V>)cache.internalProxy();
@@ -4071,13 +4190,21 @@ public class GridCacheProcessor extends 
GridProcessorAdapter implements Metastor
      */
     private <K, V> IgniteInternalCache<K, V> internalCacheEx(String name) {
         if (ctx.discovery().localNode().isClient()) {
-            IgniteCacheProxy<K, V> proxy = (IgniteCacheProxy<K, 
V>)jCacheProxies.get(name);
+            IgniteCacheProxy<K, V> proxy = (IgniteCacheProxy<K, 
V>)jcacheProxy(name, true);
 
             if (proxy == null) {
                 GridCacheAdapter<?, ?> cacheAdapter = caches.get(name);
 
-                if (cacheAdapter != null)
+                if (cacheAdapter != null) {
                     proxy = new IgniteCacheProxyImpl(cacheAdapter.context(), 
cacheAdapter, false);
+
+                    IgniteCacheProxyImpl<?, ?> prev = addjCacheProxy(name, 
(IgniteCacheProxyImpl<?, ?>)proxy);
+
+                    if (prev != null)
+                        proxy = (IgniteCacheProxy<K, V>)prev;
+
+                    completeProxyInitialize(proxy.getName());
+                }
             }
 
             assert proxy != null : name;
@@ -4109,7 +4236,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter implements Metastor
         if (!desc.cacheType().userCache())
             throw new IllegalStateException("Failed to get cache because it is 
a system cache: " + name);
 
-        IgniteCacheProxy<K, V> jcache = (IgniteCacheProxy<K, 
V>)jCacheProxies.get(name);
+        IgniteCacheProxy<K, V> jcache = (IgniteCacheProxy<K, 
V>)jcacheProxy(name, true);
 
         if (jcache == null)
             throw new IllegalArgumentException("Cache is not started: " + 
name);
@@ -4150,16 +4277,16 @@ public class GridCacheProcessor extends 
GridProcessorAdapter implements Metastor
         if (desc != null && !desc.cacheType().userCache())
             throw new IllegalStateException("Failed to get cache because it is 
a system cache: " + cacheName);
 
-        IgniteCacheProxyImpl<?, ?> cache = jCacheProxies.get(cacheName);
+        IgniteCacheProxyImpl<?, ?> proxy = jcacheProxy(cacheName, true);
 
         // Try to start cache, there is no guarantee that cache will be 
instantiated.
-        if (cache == null) {
+        if (proxy == null) {
             dynamicStartCache(null, cacheName, null, false, failIfNotStarted, 
checkThreadTx).get();
 
-            cache = jCacheProxies.get(cacheName);
+            proxy = jcacheProxy(cacheName, true);
         }
 
-        return cache != null ? (IgniteCacheProxy<K, V>)cache.gatewayWrapper() 
: null;
+        return proxy != null ? (IgniteCacheProxy<K, V>)proxy.gatewayWrapper() 
: null;
     }
 
     /**
@@ -4276,13 +4403,21 @@ public class GridCacheProcessor extends 
GridProcessorAdapter implements Metastor
     public <K, V> IgniteCacheProxy<K, V> jcache(String name) {
         assert name != null;
 
-        IgniteCacheProxy<K, V> cache = (IgniteCacheProxy<K, V>) 
jCacheProxies.get(name);
+        IgniteCacheProxy<K, V> cache = (IgniteCacheProxy<K, 
V>)jcacheProxy(name, true);
 
         if (cache == null) {
             GridCacheAdapter<?, ?> cacheAdapter = caches.get(name);
 
-            if (cacheAdapter != null)
+            if (cacheAdapter != null) {
                 cache = new IgniteCacheProxyImpl(cacheAdapter.context(), 
cacheAdapter, false);
+
+                IgniteCacheProxyImpl<?, ?> prev = addjCacheProxy(name, 
(IgniteCacheProxyImpl<?, ?>)cache);
+
+                if (prev != null)
+                    cache = (IgniteCacheProxy<K, V>)prev;
+
+                completeProxyInitialize(cache.getName());
+            }
         }
 
         if (cache == null)
@@ -4293,10 +4428,25 @@ public class GridCacheProcessor extends 
GridProcessorAdapter implements Metastor
 
     /**
      * @param name Cache name.
+     * @param awaitInit Await proxy initialization.
      * @return Cache proxy.
      */
-    @Nullable public IgniteCacheProxy jcacheProxy(String name) {
-        return jCacheProxies.get(name);
+    @Nullable public IgniteCacheProxyImpl<?, ?> jcacheProxy(String name, 
boolean awaitInit) {
+        IgniteCacheProxyImpl<?, ?> cache = jCacheProxies.get(name);
+
+        if (awaitInit)
+            awaitInitializeProxy(cache);
+
+        return cache;
+    }
+
+    /**
+     * @param name Cache name.
+     * @param proxy Cache proxy.
+     * @return Previous cache proxy.
+     */
+    @Nullable public IgniteCacheProxyImpl<?, ?> addjCacheProxy(String name, 
IgniteCacheProxyImpl<?, ?> proxy) {
+        return jCacheProxies.putIfAbsent(name, proxy);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/dfaeede8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
index a9ce448..ebb8b5e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
@@ -90,8 +90,11 @@ public class GridCacheProxyImpl<K, V> implements 
IgniteInternalCache<K, V>, Exte
      * @param delegate Delegate object.
      * @param opCtx Optional operation context which will be passed to gateway.
      */
-    public GridCacheProxyImpl(GridCacheContext<K, V> ctx, 
IgniteInternalCache<K, V> delegate,
-        @Nullable CacheOperationContext opCtx) {
+    public GridCacheProxyImpl(
+        GridCacheContext<K, V> ctx,
+        IgniteInternalCache<K, V> delegate,
+        @Nullable CacheOperationContext opCtx
+    ) {
         assert ctx != null;
         assert delegate != null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/dfaeede8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
index 4989efb..776e1cb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
@@ -30,6 +30,7 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import javax.cache.Cache;
@@ -136,16 +137,23 @@ public class IgniteCacheProxyImpl<K, V> extends 
AsyncSupportAdapter<IgniteCache<
     private CacheManager cacheMgr;
 
     /** Future indicates that cache is under restarting. */
-    private final AtomicReference<GridFutureAdapter<Void>> restartFut;
+    private final AtomicReference<RestartFuture> restartFut;
 
     /** Flag indicates that proxy is closed. */
     private volatile boolean closed;
 
+    /** Proxy initialization latch used for await final completion after proxy 
created, as an example,
+     * a proxy may be created but the exchange is not completed and if we try 
to perform some cache
+     * the operation we get last finished exchange future (need for validation)
+     * for the previous version but not for current.
+     */
+    private final CountDownLatch initLatch = new CountDownLatch(1);
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
     public IgniteCacheProxyImpl() {
-        restartFut = new AtomicReference<GridFutureAdapter<Void>>(null);
+        restartFut = new AtomicReference<>(null);
     }
 
     /**
@@ -158,7 +166,7 @@ public class IgniteCacheProxyImpl<K, V> extends 
AsyncSupportAdapter<IgniteCache<
             @NotNull IgniteInternalCache<K, V> delegate,
             boolean async
     ) {
-        this(ctx, delegate, new 
AtomicReference<GridFutureAdapter<Void>>(null), async);
+        this(ctx, delegate, new AtomicReference<>(null), async);
     }
 
     /**
@@ -169,7 +177,7 @@ public class IgniteCacheProxyImpl<K, V> extends 
AsyncSupportAdapter<IgniteCache<
     private IgniteCacheProxyImpl(
         @NotNull GridCacheContext<K, V> ctx,
         @NotNull IgniteInternalCache<K, V> delegate,
-        @NotNull AtomicReference<GridFutureAdapter<Void>> restartFut,
+        @NotNull AtomicReference<RestartFuture> restartFut,
         boolean async
     ) {
         super(async);
@@ -184,6 +192,14 @@ public class IgniteCacheProxyImpl<K, V> extends 
AsyncSupportAdapter<IgniteCache<
     }
 
     /**
+     *
+     * @return Init latch.
+     */
+    public CountDownLatch getInitLatch(){
+        return initLatch;
+    }
+
+    /**
      * @return Context.
      */
     @Override public GridCacheContext<K, V> context() {
@@ -1766,6 +1782,8 @@ public class IgniteCacheProxyImpl<K, V> extends 
AsyncSupportAdapter<IgniteCache<
      * @return Internal proxy.
      */
     @Override public GridCacheProxyImpl<K, V> internalProxy() {
+        checkRestart();
+
         return new GridCacheProxyImpl<>(ctx, delegate, 
ctx.operationContextPerCall());
     }
 
@@ -1842,11 +1860,10 @@ public class IgniteCacheProxyImpl<K, V> extends 
AsyncSupportAdapter<IgniteCache<
      * Throws {@code IgniteCacheRestartingException} if proxy is restarting.
      */
     public void checkRestart() {
-        GridFutureAdapter<Void> currentFut = this.restartFut.get();
+        RestartFuture currentFut = restartFut.get();
 
         if (currentFut != null)
-            throw new IgniteCacheRestartingException(new 
IgniteFutureImpl<>(currentFut), "Cache is restarting: " +
-                    context().name());
+            currentFut.checkRestartOrAwait();
     }
 
     /**
@@ -1860,9 +1877,9 @@ public class IgniteCacheProxyImpl<K, V> extends 
AsyncSupportAdapter<IgniteCache<
      * Restarts this cache proxy.
      */
     public boolean restart() {
-        GridFutureAdapter<Void> restartFut = new GridFutureAdapter<>();
+        RestartFuture restartFut = new RestartFuture(ctx.name());
 
-        final GridFutureAdapter<Void> curFut = this.restartFut.get();
+         RestartFuture curFut = this.restartFut.get();
 
         boolean changed = this.restartFut.compareAndSet(curFut, restartFut);
 
@@ -1880,12 +1897,22 @@ public class IgniteCacheProxyImpl<K, V> extends 
AsyncSupportAdapter<IgniteCache<
     }
 
     /**
+     * @param fut Finish restart future.
+     */
+    public void registrateFutureRestart(GridFutureAdapter<?> fut){
+        RestartFuture currentFut = restartFut.get();
+
+        if (currentFut != null)
+            currentFut.addRestartFinishedFuture(fut);
+    }
+
+    /**
      * If proxy is already being restarted, returns future to wait on, else 
restarts this cache proxy.
      *
      * @return Future to wait on, or null.
      */
     public GridFutureAdapter<Void> opportunisticRestart() {
-        GridFutureAdapter<Void> restartFut = new GridFutureAdapter<>();
+        RestartFuture restartFut = new RestartFuture(ctx.name());
 
         while (true) {
             if (this.restartFut.compareAndSet(null, restartFut))
@@ -1905,7 +1932,7 @@ public class IgniteCacheProxyImpl<K, V> extends 
AsyncSupportAdapter<IgniteCache<
      * @param delegate New delegate.
      */
     public void onRestarted(GridCacheContext ctx, IgniteInternalCache 
delegate) {
-        GridFutureAdapter<Void> restartFut = this.restartFut.get();
+        RestartFuture restartFut = this.restartFut.get();
 
         assert restartFut != null;
 
@@ -1917,6 +1944,52 @@ public class IgniteCacheProxyImpl<K, V> extends 
AsyncSupportAdapter<IgniteCache<
         restartFut.onDone();
     }
 
+    /**
+     *
+     */
+    private class RestartFuture extends GridFutureAdapter<Void> {
+        /** */
+        private final String name;
+
+        /** */
+        private volatile GridFutureAdapter<?> restartFinishFut;
+
+        /** */
+        private RestartFuture(String name) {
+            this.name = name;
+        }
+
+        /**
+         *
+         */
+        void checkRestartOrAwait() {
+            GridFutureAdapter<?> fut = restartFinishFut;
+
+            if (fut != null) {
+                try {
+                    fut.get();
+                }
+                catch (IgniteCheckedException e) {
+                    throw U.convertException(e);
+                }
+
+                return;
+            }
+
+            throw new IgniteCacheRestartingException(
+                new IgniteFutureImpl<>(this),
+                "Cache is restarting: " + name
+            );
+        }
+
+        /**
+         *
+         */
+        void addRestartFinishedFuture(GridFutureAdapter<?> fut) {
+            restartFinishFut = fut;
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(IgniteCacheProxyImpl.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/dfaeede8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 1512963..e09a599 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -803,7 +803,7 @@ public class GridPartitionedGetFuture<K, V> extends 
CacheDistributedGetFutureAda
                 AffinityTopologyVersion updTopVer =
                     new 
AffinityTopologyVersion(Math.max(topVer.topologyVersion() + 1, 
cctx.discovery().topologyVersion()));
 
-                cctx.affinity().affinityReadyFuture(updTopVer).listen(
+                cctx.shared().exchange().affinityReadyFuture(updTopVer).listen(
                     new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
                         @Override public void 
apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
                             try {
@@ -869,7 +869,7 @@ public class GridPartitionedGetFuture<K, V> extends 
CacheDistributedGetFutureAda
                 }
 
                 // Need to wait for next topology version to remap.
-                IgniteInternalFuture<AffinityTopologyVersion> topFut = 
cctx.affinity().affinityReadyFuture(rmtTopVer);
+                IgniteInternalFuture<AffinityTopologyVersion> topFut = 
cctx.shared().exchange().affinityReadyFuture(rmtTopVer);
 
                 topFut.listen(new 
CIX1<IgniteInternalFuture<AffinityTopologyVersion>>() {
                     @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/ignite/blob/dfaeede8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index 8ed6695..4dc72b2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -615,7 +615,7 @@ public class GridPartitionedSingleGetFuture extends 
GridCacheFutureAdapter<Objec
             }
 
             if (canRemap) {
-                IgniteInternalFuture<AffinityTopologyVersion> topFut = 
cctx.affinity().affinityReadyFuture(rmtTopVer);
+                IgniteInternalFuture<AffinityTopologyVersion> topFut = 
cctx.shared().exchange().affinityReadyFuture(rmtTopVer);
 
                 topFut.listen(new 
CIX1<IgniteInternalFuture<AffinityTopologyVersion>>() {
                     @Override public void 
applyx(IgniteInternalFuture<AffinityTopologyVersion> fut) {
@@ -740,7 +740,7 @@ public class GridPartitionedSingleGetFuture extends 
GridCacheFutureAdapter<Objec
             AffinityTopologyVersion updTopVer = new AffinityTopologyVersion(
                 Math.max(topVer.topologyVersion() + 1, 
cctx.discovery().topologyVersion()));
 
-            cctx.affinity().affinityReadyFuture(updTopVer).listen(
+            cctx.shared().exchange().affinityReadyFuture(updTopVer).listen(
                 new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
                     @Override public void 
apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
                         try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/dfaeede8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index 82a7964..4c0d2db 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -357,7 +357,7 @@ public class GridNearAtomicSingleUpdateFuture extends 
GridNearAtomicAbstractUpda
             ClusterTopologyCheckedException cause = new 
ClusterTopologyCheckedException(
                 "Failed to update keys, topology changed while execute atomic 
update inside transaction.");
 
-            
cause.retryReadyFuture(cctx.affinity().affinityReadyFuture(remapTopVer));
+            
cause.retryReadyFuture(cctx.shared().exchange().affinityReadyFuture(remapTopVer));
 
             e.add(Collections.singleton(cctx.toCacheKeyObject(key)), cause);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/dfaeede8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index fd6b63e..28ebfb1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -485,7 +485,7 @@ public class GridNearAtomicUpdateFuture extends 
GridNearAtomicAbstractUpdateFutu
             ClusterTopologyCheckedException cause = new 
ClusterTopologyCheckedException(
                 "Failed to update keys, topology changed while execute atomic 
update inside transaction.");
 
-            
cause.retryReadyFuture(cctx.affinity().affinityReadyFuture(remapTopVer));
+            
cause.retryReadyFuture(cctx.shared().exchange().affinityReadyFuture(remapTopVer));
 
             e.add(remapKeys, cause);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/dfaeede8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 2052c36..514b81f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -72,6 +72,7 @@ import 
org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
 import 
org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask;
 import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
 import 
org.apache.ignite.internal.processors.cache.DynamicCacheChangeFailureMessage;
+import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
 import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.cache.ExchangeActions;
 import org.apache.ignite.internal.processors.cache.ExchangeContext;
@@ -333,6 +334,9 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     /** Latest (by update sequences) full message with exchangeId == null, 
need to be processed right after future is done. */
     private GridDhtPartitionsFullMessage delayedLatestMsg;
 
+    /** Future for wait all exchange listeners comepleted. */
+    private final GridFutureAdapter<?> afterLsnrCompleteFut = new 
GridFutureAdapter<>();
+
     /**
      * @param cctx Cache context.
      * @param busyLock Busy lock.
@@ -774,6 +778,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                 }
             }
 
+            
cctx.cache().registrateProxyRestart(resolveCacheRequests(exchActions), 
afterLsnrCompleteFut);
+
             updateTopologies(crdNode, 
cctx.coordinators().currentCoordinator());
 
             switch (exchange) {
@@ -1830,6 +1836,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
     /** {@inheritDoc} */
     @Override public boolean onDone(@Nullable AffinityTopologyVersion res, 
@Nullable Throwable err) {
+        assert res != null || err != null : "TopVer=" + res + ", err=" + err;
+
         if (isDone() || !done.compareAndSet(false, true))
             return false;
 
@@ -1907,18 +1915,11 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
             cctx.coordinators().onExchangeDone(exchCtx.newMvccCoordinator(), 
exchCtx.events().discoveryCache(),
                 exchCtx.activeQueries());
 
+        // Create and destory caches and cache proxies.
         cctx.cache().onExchangeDone(initialVersion(), exchActions, err);
 
-        cctx.exchange().onExchangeDone(res, initialVersion(), err);
-
         cctx.kernalContext().authentication().onActivate();
 
-        if (exchActions != null && err == null)
-            exchActions.completeRequestFutures(cctx, null);
-
-        if (stateChangeExchange() && err == null)
-            
cctx.kernalContext().state().onStateChangeExchangeDone(exchActions.stateChangeRequest());
-
         Map<T2<Integer, Integer>, Long> localReserved = 
partHistSuppliers.getReservations(cctx.localNodeId());
 
         if (localReserved != null) {
@@ -1946,7 +1947,29 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
             cctx.walState().changeLocalStatesOnExchangeDone(res);
         }
 
+        final Throwable err0 = err;
+
+        // Should execute this listener first, before any external listeners.
+        // Listeners use stack as data structure.
+        listen(f -> {
+            // Update last finished future in the first.
+            cctx.exchange().lastFinishedFuture(this);
+
+            // Complete any affReady futures and update last exchange done 
version.
+            cctx.exchange().onExchangeDone(res, initialVersion(), err0);
+
+            
cctx.cache().completeProxyRestart(resolveCacheRequests(exchActions), 
initialVersion(), res);
+
+            if (exchActions != null && err0 == null)
+                exchActions.completeRequestFutures(cctx, null);
+
+            if (stateChangeExchange() && err0 == null)
+                
cctx.kernalContext().state().onStateChangeExchangeDone(exchActions.stateChangeRequest());
+        });
+
         if (super.onDone(res, err)) {
+            afterLsnrCompleteFut.onDone();
+
             if (log.isDebugEnabled())
                 log.debug("Completed partition exchange [localNode=" + 
cctx.localNodeId() + ", exchange= " + this +
                     ", durationFromInit=" + (U.currentTimeMillis() - initTs) + 
']');
@@ -1973,8 +1996,6 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                 ((DiscoveryCustomEvent)firstDiscoEvt).customMessage(null);
 
             if (err == null) {
-                cctx.exchange().lastFinishedFuture(this);
-
                 if (exchCtx != null && (exchCtx.events().hasServerLeft() || 
exchCtx.events().hasServerJoin())) {
                     ExchangeDiscoveryEvents evts = exchCtx.events();
 
@@ -1993,6 +2014,20 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     }
 
     /**
+     * @param exchangeActions Exchange actions.
+     * @return Map of cache names and start descriptors.
+     */
+    private Map<String, DynamicCacheChangeRequest> 
resolveCacheRequests(ExchangeActions exchangeActions) {
+        if (exchangeActions == null)
+            return Collections.emptyMap();
+
+        return exchangeActions.cacheStartRequests()
+            .stream()
+            .map(ExchangeActions.CacheActionData::request)
+            .collect(Collectors.toMap(DynamicCacheChangeRequest::cacheName, r 
-> r));
+    }
+
+    /**
      * Method waits for new caches registration and cache configuration 
persisting to disk.
      */
     private void waitUntilNewCachesAreRegistered() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/dfaeede8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index 87801a9..d7800ed 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -942,21 +942,18 @@ public final class GridNearGetFuture<K, V> extends 
CacheDistributedGetFutureAdap
                 AffinityTopologyVersion updTopVer =
                     new 
AffinityTopologyVersion(Math.max(topVer.topologyVersion() + 1, 
cctx.discovery().topologyVersion()));
 
-                cctx.affinity().affinityReadyFuture(updTopVer).listen(
-                    new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                        @Override public void 
apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
-                            try {
-                                // Remap.
-                                map(keys.keySet(), F.t(node, keys), fut.get());
-
-                                onDone(Collections.<K, V>emptyMap());
-                            }
-                            catch (IgniteCheckedException e) {
-                                GridNearGetFuture.this.onDone(e);
-                            }
-                        }
+                
cctx.shared().exchange().affinityReadyFuture(updTopVer).listen(f -> {
+                    try {
+                        // Remap.
+                        map(keys.keySet(), F.t(node, keys), f.get());
+
+                        onDone(Collections.<K, V>emptyMap());
+
+                    }
+                    catch (IgniteCheckedException e) {
+                        GridNearGetFuture.this.onDone(e);
                     }
-                );
+                });
             }
         }
 
@@ -1005,7 +1002,7 @@ public final class GridNearGetFuture<K, V> extends 
CacheDistributedGetFutureAdap
                 }
 
                 // Need to wait for next topology version to remap.
-                IgniteInternalFuture<AffinityTopologyVersion> topFut = 
cctx.affinity().affinityReadyFuture(rmtTopVer);
+                IgniteInternalFuture<AffinityTopologyVersion> topFut =  
cctx.shared().exchange().affinityReadyFuture(rmtTopVer);
 
                 topFut.listen(new 
CIX1<IgniteInternalFuture<AffinityTopologyVersion>>() {
                     @Override public void applyx(

http://git-wip-us.apache.org/repos/asf/ignite/blob/dfaeede8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
index 0e3ab43..ce9a175 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
@@ -864,7 +864,7 @@ public class GridCacheQueryAdapter<T> implements 
CacheQuery<T> {
 
                     assert waitVer != null;
 
-                    retryFut = cctx.affinity().affinityReadyFuture(waitVer);
+                    retryFut = 
cctx.shared().exchange().affinityReadyFuture(waitVer);
                 }
                 else if (e.hasCause(ClusterTopologyCheckedException.class)) {
                     ClusterTopologyCheckedException topEx = X.cause(e, 
ClusterTopologyCheckedException.class);

http://git-wip-us.apache.org/repos/asf/ignite/blob/dfaeede8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 307a4ea..d1640c6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -641,7 +641,7 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
         if (!cctx.isLocal()) {
             AffinityTopologyVersion topVer = initTopVer;
 
-            cacheContext(ctx).affinity().affinityReadyFuture(topVer).get();
+            
cacheContext(ctx).shared().exchange().affinityReadyFuture(topVer).get();
 
             for (int partId = 0; partId < 
cacheContext(ctx).affinity().partitions(); partId++)
                 getOrCreatePartitionRecovery(ctx, partId, topVer);

http://git-wip-us.apache.org/repos/asf/ignite/blob/dfaeede8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index ab60f47..3e7763c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -403,7 +403,7 @@ public class CacheContinuousQueryManager extends 
GridCacheManagerAdapter {
                 topVer,
                 (byte)0);
 
-            IgniteCacheProxy jcache = 
cctx.kernalContext().cache().jcacheProxy(cctx.name());
+            IgniteCacheProxy jcache = 
cctx.kernalContext().cache().jcacheProxy(cctx.name(), true);
 
             assert jcache != null : "Failed to get cache proxy [name=" + 
cctx.name() +
                 ", locStart=" + cctx.startTopologyVersion() +

http://git-wip-us.apache.org/repos/asf/ignite/blob/dfaeede8/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java
index a28bfc9..b3879ee 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java
@@ -201,6 +201,23 @@ public class DiscoveryDataClusterState implements 
Serializable {
     }
 
     /**
+     *
+     * @return {@code True} If baseLine changed, {@code False} if not.
+     */
+    public boolean baselineChanged() {
+        BaselineTopology prevBLT = previousBaselineTopology();
+        BaselineTopology curBLT = baselineTopology();
+
+        if (prevBLT == null && curBLT != null)
+            return true;
+
+        if (prevBLT!= null && curBLT != null)
+            return !prevBLT.equals(curBLT);
+
+        return false;
+    }
+
+    /**
      * @return {@code True} if baseline topology is set in the cluster. {@code 
False} otherwise.
      */
     public boolean hasBaselineTopology() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/dfaeede8/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
index e933757..d492846 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
@@ -205,7 +205,7 @@ public class GridClusterStateProcessor extends 
GridProcessorAdapter implements I
                     return transitionRes;
                 }
                 else
-                    return false;
+                    return globalState.baselineChanged();
             }
         }
         else

http://git-wip-us.apache.org/repos/asf/ignite/blob/dfaeede8/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorMemoryLeakTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorMemoryLeakTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorMemoryLeakTest.java
index 3b6857d..d6c5727 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorMemoryLeakTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorMemoryLeakTest.java
@@ -41,7 +41,7 @@ import static 
org.apache.ignite.IgniteSystemProperties.getInteger;
 @GridCommonTest(group = "Affinity Processor")
 public class GridAffinityProcessorMemoryLeakTest extends 
GridCommonAbstractTest {
     /** Max value for affinity history size name. Should be the same as in 
GridAffinityAssignmentCache.MAX_HIST_SIZE */
-    private final int MAX_HIST_SIZE = getInteger(IGNITE_AFFINITY_HISTORY_SIZE, 
100);
+    private final int MAX_HIST_SIZE = getInteger(IGNITE_AFFINITY_HISTORY_SIZE, 
500);
 
     /** Cache name. */
     private static final String CACHE_NAME = "cache";

http://git-wip-us.apache.org/repos/asf/ignite/blob/dfaeede8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheResultIsNotNullOnPartitionLossTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheResultIsNotNullOnPartitionLossTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheResultIsNotNullOnPartitionLossTest.java
new file mode 100644
index 0000000..ceafc9e
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheResultIsNotNullOnPartitionLossTest.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.PartitionLossPolicy;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.NodeStoppingException;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class CacheResultIsNotNullOnPartitionLossTest extends 
GridCommonAbstractTest {
+    /** IP finder. */
+    private static final TcpDiscoveryVmIpFinder IP_FINDER = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** Number of servers to be started. */
+    private static final int SERVERS = 10;
+
+    /** Index of node that is goning to be the only client node. */
+    private static final int CLIENT_IDX = SERVERS;
+
+    /** Number of cache entries to insert into the test cache. */
+    private static final int CACHE_ENTRIES_CNT = 10_000;
+
+    /** True if {@link #getConfiguration(String)} is expected to configure 
client node on next invocations. */
+    private boolean isClient;
+
+    /** Client Ignite instance. */
+    private IgniteEx client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER));
+
+        cfg.setIncludeEventTypes(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST);
+
+        cfg.setCacheConfiguration(
+            new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+                .setCacheMode(CacheMode.PARTITIONED)
+                .setBackups(0)
+                
.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
+                .setPartitionLossPolicy(PartitionLossPolicy.READ_WRITE_SAFE)
+        );
+
+        if (isClient)
+            cfg.setClientMode(true);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        stopAllGrids();
+
+        cleanPersistenceDir();
+
+        startGrids(SERVERS);
+
+        isClient = true;
+
+        client = startGrid(CLIENT_IDX);
+
+        try (IgniteDataStreamer<Integer, Integer> dataStreamer = 
client.dataStreamer(DEFAULT_CACHE_NAME)) {
+            dataStreamer.allowOverwrite(true);
+
+            for (int i = 0; i < CACHE_ENTRIES_CNT; i++)
+                dataStreamer.addData(i, i);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCacheResultIsNotNullOnClient() throws Exception {
+        testCacheResultIsNotNull0(client);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCacheResultIsNotNullOnLastServer() throws Exception {
+        testCacheResultIsNotNull0(grid(SERVERS - 1));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCacheResultIsNotNullOnServer() throws Exception {
+        testCacheResultIsNotNull0(grid(SERVERS - 2));
+    }
+    /**
+     * @throws Exception If failed.
+     */
+    private void testCacheResultIsNotNull0(IgniteEx ignite) throws Exception {
+        AtomicBoolean stopReading = new AtomicBoolean();
+
+        AtomicReference<Throwable> unexpectedThrowable = new 
AtomicReference<>();
+
+        IgniteCache<Integer, Integer> cache = ignite.cache(DEFAULT_CACHE_NAME);
+
+        CountDownLatch readerThreadStarted = new CountDownLatch(1);
+
+        IgniteInternalFuture<Boolean> nullCacheValFoundFut = 
GridTestUtils.runAsync(() -> {
+            readerThreadStarted.countDown();
+
+            while (!stopReading.get())
+                for (int i = 0; i < CACHE_ENTRIES_CNT && !stopReading.get(); 
i++) {
+                    try {
+                        if (cache.get(i) == null)
+                            return true;
+                    }
+                    catch (Throwable t) {
+                        if (expectedThrowableClass(t)) {
+                            try {
+                                cache.put(i, i);
+
+                                unexpectedThrowable.set(new 
RuntimeException("Cache put was successful for entry " + i));
+                            }
+                            catch (Throwable t2) {
+                                if (!expectedThrowableClass(t2))
+                                    unexpectedThrowable.set(t2);
+                            }
+                        }
+                        else
+                            unexpectedThrowable.set(t);
+
+                        break;
+                    }
+                }
+
+            return false;
+        });
+
+        try {
+            readerThreadStarted.await(1, TimeUnit.SECONDS);
+
+            for (int i = 0; i < SERVERS - 1; i++) {
+                Thread.sleep(50L);
+
+                grid(i).close();
+            }
+        }
+        finally {
+            // Ask reader thread to finish its execution.
+            stopReading.set(true);
+        }
+
+        assertFalse("Null value was returned by cache.get instead of 
exception.", nullCacheValFoundFut.get());
+
+        Throwable throwable = unexpectedThrowable.get();
+        if (throwable != null) {
+            throwable.printStackTrace();
+
+            fail(throwable.getMessage());
+        }
+    }
+
+    /**
+     *
+     */
+    private boolean expectedThrowableClass(Throwable throwable) {
+        return X.hasCause(
+            throwable,
+            CacheInvalidStateException.class,
+            ClusterTopologyCheckedException.class,
+            IllegalStateException.class,
+            NodeStoppingException.class
+        );
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/dfaeede8/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index 370fa49..cb4be14 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -97,6 +97,7 @@ import 
org.apache.ignite.internal.processors.cache.IgniteStartCacheInTransaction
 import 
org.apache.ignite.internal.processors.cache.IgniteStartCacheInTransactionSelfTest;
 import 
org.apache.ignite.internal.processors.cache.IgniteSystemCacheOnClientTest;
 import 
org.apache.ignite.internal.processors.cache.MarshallerCacheJobRunNodeRestartTest;
+import 
org.apache.ignite.internal.processors.cache.distributed.CacheResultIsNotNullOnPartitionLossTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.CacheAffinityEarlyTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.CacheDiscoveryDataConcurrentJoinTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.CacheGetFutureHangsSelfTest;
@@ -340,6 +341,8 @@ public class IgniteCacheTestSuite4 extends TestSuite {
 
         suite.addTestSuite(IgniteCacheContainsKeyAtomicTest.class);
 
+        suite.addTestSuite(CacheResultIsNotNullOnPartitionLossTest.class);
+
         return suite;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/dfaeede8/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
 
b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
index 48d4a8d..081add8 100644
--- 
a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
+++ 
b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
@@ -31,9 +31,13 @@ import java.util.concurrent.TimeoutException;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.events.CacheEvent;
+import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.LoggerResource;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -201,9 +205,24 @@ public class KafkaIgniteStreamerSelfTest extends 
GridCommonAbstractTest {
             final CountDownLatch latch = new CountDownLatch(CNT);
 
             IgniteBiPredicate<UUID, CacheEvent> locLsnr = new 
IgniteBiPredicate<UUID, CacheEvent>() {
+                @IgniteInstanceResource
+                private Ignite ig;
+
+                @LoggerResource
+                private IgniteLogger log;
+
+                /** {@inheritDoc} */
                 @Override public boolean apply(UUID uuid, CacheEvent evt) {
                     latch.countDown();
 
+                    if (log.isInfoEnabled()) {
+                        IgniteEx igEx = (IgniteEx)ig;
+
+                        UUID nodeId = igEx.localNode().id();
+
+                        log.info("Recive event=" + evt + ", nodeId=" + nodeId);
+                    }
+
                     return true;
                 }
             };
@@ -211,7 +230,8 @@ public class KafkaIgniteStreamerSelfTest extends 
GridCommonAbstractTest {
             
ignite.events(ignite.cluster().forCacheNodes(DEFAULT_CACHE_NAME)).remoteListen(locLsnr,
 null, EVT_CACHE_OBJECT_PUT);
 
             // Checks all events successfully processed in 10 seconds.
-            assertTrue(latch.await(10, TimeUnit.SECONDS));
+            assertTrue("Failed to wait latch completion, still wait " + 
latch.getCount() + " events",
+                latch.await(10, TimeUnit.SECONDS));
 
             for (Map.Entry<String, String> entry : keyValMap.entrySet())
                 assertEquals(entry.getValue(), cache.get(entry.getKey()));

Reply via email to