Repository: ignite
Updated Branches:
  refs/heads/master 74d342d66 -> 18aaee039


http://git-wip-us.apache.org/repos/asf/ignite/blob/18aaee03/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index 54c3cae..09445f7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -23,10 +23,9 @@ import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
@@ -38,8 +37,6 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
 import 
org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
-import 
org.apache.ignite.internal.processors.cache.GridCacheUtils.BackupPostProcessingClosure;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.CacheDistributedGetFutureAdapter;
@@ -50,12 +47,7 @@ import 
org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.GridLeanMap;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.C1;
-import org.apache.ignite.internal.util.typedef.CIX1;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.P1;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -66,17 +58,8 @@ import org.jetbrains.annotations.Nullable;
  *
  */
 public final class GridNearGetFuture<K, V> extends 
CacheDistributedGetFutureAdapter<K, V> {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Logger reference. */
-    private static final AtomicReference<IgniteLogger> logRef = new 
AtomicReference<>();
-
-    /** Logger. */
-    private static IgniteLogger log;
-
     /** Transaction. */
-    private IgniteTxLocalEx tx;
+    private final IgniteTxLocalEx tx;
 
     /** */
     private GridCacheVersion ver;
@@ -111,7 +94,8 @@ public final class GridNearGetFuture<K, V> extends 
CacheDistributedGetFutureAdap
         boolean keepCacheObjects,
         boolean recovery
     ) {
-        super(cctx,
+        super(
+            cctx,
             keys,
             readThrough,
             forcePrimary,
@@ -122,18 +106,16 @@ public final class GridNearGetFuture<K, V> extends 
CacheDistributedGetFutureAdap
             skipVals,
             needVer,
             keepCacheObjects,
-            recovery);
+            recovery
+        );
 
         assert !F.isEmpty(keys);
 
         this.tx = tx;
 
-        futId = IgniteUuid.randomUuid();
-
         ver = tx == null ? cctx.versions().next() : tx.xidVersion();
 
-        if (log == null)
-            log = U.logger(cctx.kernalContext(), logRef, 
GridNearGetFuture.class);
+        initLogger(GridNearGetFuture.class);
     }
 
     /**
@@ -147,74 +129,22 @@ public final class GridNearGetFuture<K, V> extends 
CacheDistributedGetFutureAdap
         if (lockedTopVer != null) {
             canRemap = false;
 
-            map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, 
Boolean>>emptyMap(), lockedTopVer);
+            map(keys, Collections.emptyMap(), lockedTopVer);
         }
         else {
             AffinityTopologyVersion mapTopVer = topVer;
 
             if (mapTopVer == null) {
-                mapTopVer = tx == null ?
-                    (canRemap ? cctx.affinity().affinityTopologyVersion() : 
cctx.shared().exchange().readyAffinityVersion()) :
-                    tx.topologyVersion();
+                mapTopVer = tx == null ? 
cctx.affinity().affinityTopologyVersion() : tx.topologyVersion();
             }
 
-            map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, 
Boolean>>emptyMap(), mapTopVer);
+            map(keys, Collections.emptyMap(), mapTopVer);
         }
 
         markInitialized();
     }
 
     /** {@inheritDoc} */
-    @Override public boolean trackable() {
-        return trackable;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void markNotTrackable() {
-        // Should not flip trackable flag from true to false since get future 
can be remapped.
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteUuid futureId() {
-        return futId;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean onNodeLeft(UUID nodeId) {
-        boolean found = false;
-
-        for (IgniteInternalFuture<Map<K, V>> fut : futures())
-            if (isMini(fut)) {
-                MiniFuture f = (MiniFuture)fut;
-
-                if (f.node().id().equals(nodeId)) {
-                    found = true;
-
-                    f.onNodeLeft();
-                }
-            }
-
-        return found;
-    }
-
-    /**
-     * @param nodeId Sender.
-     * @param res Result.
-     */
-    @Override public void onResult(UUID nodeId, GridNearGetResponse res) {
-        for (IgniteInternalFuture<Map<K, V>> fut : futures())
-            if (isMini(fut)) {
-                MiniFuture f = (MiniFuture)fut;
-
-                if (f.futureId().equals(res.miniId())) {
-                    assert f.node().id().equals(nodeId);
-
-                    f.onResult(res);
-                }
-            }
-    }
-
-    /** {@inheritDoc} */
     @Override public boolean onDone(Map<K, V> res, Throwable err) {
         if (super.onDone(res, err)) {
             // Don't forget to clean up.
@@ -229,11 +159,8 @@ public final class GridNearGetFuture<K, V> extends 
CacheDistributedGetFutureAdap
         return false;
     }
 
-    /**
-     * @param f Future.
-     * @return {@code True} if mini-future.
-     */
-    private boolean isMini(IgniteInternalFuture<?> f) {
+    /** {@inheritDoc} */
+    @Override protected boolean isMini(IgniteInternalFuture<?> f) {
         return f.getClass().equals(MiniFuture.class);
     }
 
@@ -242,10 +169,10 @@ public final class GridNearGetFuture<K, V> extends 
CacheDistributedGetFutureAdap
      * @param mapped Mappings to check for duplicates.
      * @param topVer Topology version to map on.
      */
-    private void map(
+    @Override protected void map(
         Collection<KeyCacheObject> keys,
         Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mapped,
-        final AffinityTopologyVersion topVer
+        AffinityTopologyVersion topVer
     ) {
         Collection<ClusterNode> affNodes = CU.affinityNodes(cctx, topVer);
 
@@ -268,7 +195,7 @@ public final class GridNearGetFuture<K, V> extends 
CacheDistributedGetFutureAdap
             try {
                 // Assign keys to primary nodes.
                 for (KeyCacheObject key : keys)
-                    savedEntries = map(key, mappings, topVer, mapped, 
savedEntries);
+                    savedEntries = map(key, topVer, mappings, mapped, 
savedEntries);
 
                 success = true;
             }
@@ -292,23 +219,24 @@ public final class GridNearGetFuture<K, V> extends 
CacheDistributedGetFutureAdap
         if (isDone())
             return;
 
-        final Map<KeyCacheObject, GridNearCacheEntry> saved = savedEntries != 
null ? savedEntries :
-            Collections.<KeyCacheObject, GridNearCacheEntry>emptyMap();
+        Map<KeyCacheObject, GridNearCacheEntry> saved =
+            savedEntries != null ? savedEntries : Collections.emptyMap();
 
-        final int keysSize = keys.size();
+        int keysSize = keys.size();
 
         // Create mini futures.
         for (Map.Entry<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> 
entry : mappings.entrySet()) {
-            final ClusterNode n = entry.getKey();
+            ClusterNode n = entry.getKey();
 
-            final LinkedHashMap<KeyCacheObject, Boolean> mappedKeys = 
entry.getValue();
+            LinkedHashMap<KeyCacheObject, Boolean> mappedKeys = 
entry.getValue();
 
             assert !mappedKeys.isEmpty();
 
             // If this is the primary or backup node for the keys.
             if (n.isLocal()) {
-                final GridDhtFuture<Collection<GridCacheEntryInfo>> fut =
-                    dht().getDhtAsync(n.id(),
+                GridDhtFuture<Collection<GridCacheEntryInfo>> fut = dht()
+                    .getDhtAsync(
+                        n.id(),
                         -1,
                         mappedKeys,
                         false,
@@ -320,74 +248,52 @@ public final class GridNearGetFuture<K, V> extends 
CacheDistributedGetFutureAdap
                         skipVals,
                         recovery,
                         null,
-                        null); // TODO IGNITE-7371
+                        null
+                    ); // TODO IGNITE-7371
 
-                final Collection<Integer> invalidParts = 
fut.invalidPartitions();
+                Collection<Integer> invalidParts = fut.invalidPartitions();
 
                 if (!F.isEmpty(invalidParts)) {
                     Collection<KeyCacheObject> remapKeys = new 
ArrayList<>(keysSize);
 
                     for (KeyCacheObject key : keys) {
-                        if (key != null && 
invalidParts.contains(cctx.affinity().partition(key)))
+                        int part = cctx.affinity().partition(key);
+
+                        if (key != null && invalidParts.contains(part)) {
+                            addNodeAsInvalid(n, part, topVer);
+
                             remapKeys.add(key);
+                        }
                     }
 
                     AffinityTopologyVersion updTopVer = 
cctx.shared().exchange().readyAffinityVersion();
 
-                    assert updTopVer.compareTo(topVer) > 0 : "Got invalid 
partitions for local node but topology version did " +
-                        "not change [topVer=" + topVer + ", updTopVer=" + 
updTopVer +
-                        ", invalidParts=" + invalidParts + ']';
-
                     // Remap recursively.
                     map(remapKeys, mappings, updTopVer);
                 }
 
                 // Add new future.
-                add(fut.chain(new 
C1<IgniteInternalFuture<Collection<GridCacheEntryInfo>>, Map<K, V>>() {
-                    @Override public Map<K, V> 
apply(IgniteInternalFuture<Collection<GridCacheEntryInfo>> fut) {
-                        try {
-                            return loadEntries(n.id(), mappedKeys.keySet(), 
fut.get(), saved, topVer);
-                        }
-                        catch (Exception e) {
-                            U.error(log, "Failed to get values from dht cache 
[fut=" + fut + "]", e);
+                add(fut.chain(f -> {
+                    try {
+                        return loadEntries(n.id(), mappedKeys.keySet(), 
f.get(), saved, topVer);
+                    }
+                    catch (Exception e) {
+                        U.error(log, "Failed to get values from dht cache 
[fut=" + fut + "]", e);
 
-                            onDone(e);
+                        onDone(e);
 
-                            return Collections.emptyMap();
-                        }
+                        return Collections.emptyMap();
                     }
                 }));
             }
             else {
-                if (!trackable) {
-                    trackable = true;
+                registrateFutureInMvccManager(this);
 
-                    cctx.mvcc().addFuture(this, futId);
-                }
+                MiniFuture miniFuture = new MiniFuture(n, mappedKeys, saved, 
topVer);
 
-                MiniFuture fut = new MiniFuture(n, mappedKeys, saved, topVer,
-                    CU.createBackupPostProcessingClosure(topVer, log, cctx, 
null, expiryPlc, readThrough, skipVals));
-
-                GridCacheMessage req = new GridNearGetRequest(
-                    cctx.cacheId(),
-                    futId,
-                    fut.futureId(),
-                    ver,
-                    mappedKeys,
-                    readThrough,
-                    topVer,
-                    subjId,
-                    taskName == null ? 0 : taskName.hashCode(),
-                    expiryPlc != null ? expiryPlc.forCreate() : -1L,
-                    expiryPlc != null ? expiryPlc.forAccess() : -1L,
-                    true,
-                    skipVals,
-                    cctx.deploymentEnabled(),
-                    recovery,
-                    null,
-                    null); // TODO IGNITE-7371
-
-                add(fut); // Append new future.
+                GridNearGetRequest req = miniFuture.createGetRequest(futId);
+
+                add(miniFuture); // Append new future.
 
                 try {
                     cctx.io().send(n, req, cctx.ioPolicy());
@@ -395,16 +301,14 @@ public final class GridNearGetFuture<K, V> extends 
CacheDistributedGetFutureAdap
                 catch (IgniteCheckedException e) {
                     // Fail the whole thing.
                     if (e instanceof ClusterTopologyCheckedException)
-                        fut.onNodeLeft();
+                        
miniFuture.onNodeLeft((ClusterTopologyCheckedException)e);
                     else
-                        fut.onResult(e);
+                        miniFuture.onResult(e);
                 }
             }
         }
     }
 
-
-
     /**
      * @param mappings Mappings.
      * @param key Key to map.
@@ -415,8 +319,8 @@ public final class GridNearGetFuture<K, V> extends 
CacheDistributedGetFutureAdap
      */
     private Map<KeyCacheObject, GridNearCacheEntry> map(
         KeyCacheObject key,
-        Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mappings,
         AffinityTopologyVersion topVer,
+        Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mappings,
         Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mapped,
         Map<KeyCacheObject, GridNearCacheEntry> saved
     ) {
@@ -494,7 +398,9 @@ public final class GridNearGetFuture<K, V> extends 
CacheDistributedGetFutureAdap
                         }
                     }
 
-                    ClusterNode affNode = 
cctx.selectAffinityNodeBalanced(affNodes, part, canRemap);
+                    Set<ClusterNode> invalidNodesSet = getInvalidNodes(part, 
topVer);
+
+                    ClusterNode affNode = 
cctx.selectAffinityNodeBalanced(affNodes, invalidNodesSet, part, canRemap);
 
                     if (affNode == null) {
                         onDone(serverNotFoundError(part, topVer));
@@ -505,17 +411,8 @@ public final class GridNearGetFuture<K, V> extends 
CacheDistributedGetFutureAdap
                     if (cctx.statisticsEnabled() && !skipVals && 
!affNode.isLocal() && !isNear)
                         cache().metrics0().onRead(false);
 
-                    LinkedHashMap<KeyCacheObject, Boolean> keys = 
mapped.get(affNode);
-
-                    if (keys != null && keys.containsKey(key)) {
-                        if (REMAP_CNT_UPD.incrementAndGet(this) > 
MAX_REMAP_CNT) {
-                            onDone(new ClusterTopologyCheckedException("Failed 
to remap key to a new node after " +
-                                MAX_REMAP_CNT + " attempts (key got remapped 
to the same node) " +
-                                "[key=" + key + ", node=" + 
U.toShortString(affNode) + ", mappings=" + mapped + ']'));
-
-                            return saved;
-                        }
-                    }
+                    if (!checkRetryPermits(key,affNode,mapped))
+                        return saved;
 
                     if (!affNodes.contains(cctx.localNode())) {
                         GridNearCacheEntry nearEntry = entry != null ? entry : 
near.entryExx(key, topVer);
@@ -572,10 +469,12 @@ public final class GridNearGetFuture<K, V> extends 
CacheDistributedGetFutureAdap
      * @param nearRead {@code True} if already tried to read from near cache.
      * @return {@code True} if there is no need to further search value.
      */
-    private boolean localDhtGet(KeyCacheObject key,
+    private boolean localDhtGet(
+        KeyCacheObject key,
         int part,
         AffinityTopologyVersion topVer,
-        boolean nearRead) {
+        boolean nearRead
+    ) {
         GridDhtCacheAdapter<K, V> dht = cache().dht();
 
         assert dht.context().affinityNode() : this;
@@ -795,9 +694,11 @@ public final class GridNearGetFuture<K, V> extends 
CacheDistributedGetFutureAdap
      * @param saved Saved entries.
      * @param topVer Topology version.
      */
-    private void releaseEvictions(Collection<KeyCacheObject> keys,
+    private void releaseEvictions(
+        Collection<KeyCacheObject> keys,
         Map<KeyCacheObject, GridNearCacheEntry> saved,
-        AffinityTopologyVersion topVer) {
+        AffinityTopologyVersion topVer
+    ) {
         for (KeyCacheObject key : keys) {
             GridNearCacheEntry entry = saved.get(key);
 
@@ -812,101 +713,59 @@ public final class GridNearGetFuture<K, V> extends 
CacheDistributedGetFutureAdap
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        Collection<String> futs = F.viewReadOnly(futures(), new 
C1<IgniteInternalFuture<?>, String>() {
-            @SuppressWarnings("unchecked")
-            @Override public String apply(IgniteInternalFuture<?> f) {
-                if (isMini(f)) {
-                    return "[node=" + ((MiniFuture)f).node().id() +
-                        ", loc=" + ((MiniFuture)f).node().isLocal() +
-                        ", done=" + f.isDone() + "]";
-                }
-                else
-                    return "[loc=true, done=" + f.isDone() + "]";
-            }
-        });
-
         return S.toString(GridNearGetFuture.class, this,
-            "innerFuts", futs,
             "super", super.toString());
     }
 
     /**
-     * Mini-future for get operations. Mini-futures are only waiting on a 
single
-     * node as opposed to multiple nodes.
+     * Mini-future for get operations. Mini-futures are only waiting on a 
single node as opposed to multiple nodes.
      */
-    private class MiniFuture extends GridFutureAdapter<Map<K, V>> {
-        /** */
-        private final IgniteUuid futId = IgniteUuid.randomUuid();
-
-        /** Node ID. */
-        private ClusterNode node;
-
-        /** Keys. */
-        @GridToStringInclude
-        private LinkedHashMap<KeyCacheObject, Boolean> keys;
-
+    private class MiniFuture extends AbstractMiniFuture {
         /** Saved entry versions. */
-        private Map<KeyCacheObject, GridNearCacheEntry> savedEntries;
-
-        /** Topology version on which this future was mapped. */
-        private AffinityTopologyVersion topVer;
-
-        /** Post processing closure. */
-        private final BackupPostProcessingClosure postProcessingClos;
-
-        /** {@code True} if remapped after node left. */
-        private boolean remapped;
+        private final Map<KeyCacheObject, GridNearCacheEntry> savedEntries;
 
         /**
          * @param node Node.
          * @param keys Keys.
          * @param savedEntries Saved entries.
          * @param topVer Topology version.
-         * @param postProcessingClos Post processing closure.
          */
         MiniFuture(
             ClusterNode node,
             LinkedHashMap<KeyCacheObject, Boolean> keys,
             Map<KeyCacheObject, GridNearCacheEntry> savedEntries,
-            AffinityTopologyVersion topVer,
-            BackupPostProcessingClosure postProcessingClos) {
-            this.node = node;
-            this.keys = keys;
+            AffinityTopologyVersion topVer
+        ) {
+            super(node, keys, topVer);
             this.savedEntries = savedEntries;
-            this.topVer = topVer;
-            this.postProcessingClos = postProcessingClos;
         }
 
-        /**
-         * @return Future ID.
-         */
-        IgniteUuid futureId() {
-            return futId;
-        }
-
-        /**
-         * @return Node ID.
-         */
-        public ClusterNode node() {
-            return node;
-        }
-
-        /**
-         * @return Keys.
-         */
-        public Collection<KeyCacheObject> keys() {
-            return keys.keySet();
+        /** {@inheritDoc} */
+        @Override protected GridNearGetRequest createGetRequest0(IgniteUuid 
rootFutId, IgniteUuid futId) {
+            return new GridNearGetRequest(
+                cctx.cacheId(),
+                rootFutId,
+                futId,
+                ver,
+                keys,
+                readThrough,
+                topVer,
+                subjId,
+                taskName == null ? 0 : taskName.hashCode(),
+                expiryPlc != null ? expiryPlc.forCreate() : -1L,
+                expiryPlc != null ? expiryPlc.forAccess() : -1L,
+                true,
+                skipVals,
+                cctx.deploymentEnabled(),
+                recovery,
+                null,
+                null
+            ); // TODO IGNITE-7371
         }
 
-        /**
-         * @param e Error.
-         */
-        void onResult(Throwable e) {
-            if (log.isDebugEnabled())
-                log.debug("Failed to get future result [fut=" + this + ", 
err=" + e + ']');
-
-            // Fail.
-            onDone(e);
+        /** {@inheritDoc} */
+        @Override protected Map<K, V> 
createResultMap(Collection<GridCacheEntryInfo> entries) {
+            return loadEntries(node.id(), keys.keySet(), entries, 
savedEntries, topVer);
         }
 
         /** {@inheritDoc} */
@@ -920,136 +779,6 @@ public final class GridNearGetFuture<K, V> extends 
CacheDistributedGetFutureAdap
                 return false;
         }
 
-        /**
-         */
-        synchronized void onNodeLeft() {
-            if (remapped)
-                return;
-
-            remapped = true;
-
-            if (log.isDebugEnabled())
-                log.debug("Remote node left grid while sending or waiting for 
reply (will retry): " + this);
-
-            // Try getting value from alive nodes.
-            if (!canRemap) {
-                // Remap
-                map(keys.keySet(), F.t(node, keys), topVer);
-
-                onDone(Collections.<K, V>emptyMap());
-            }
-            else {
-                AffinityTopologyVersion updTopVer =
-                    new 
AffinityTopologyVersion(Math.max(topVer.topologyVersion() + 1, 
cctx.discovery().topologyVersion()));
-
-                
cctx.shared().exchange().affinityReadyFuture(updTopVer).listen(f -> {
-                    try {
-                        // Remap.
-                        map(keys.keySet(), F.t(node, keys), f.get());
-
-                        onDone(Collections.<K, V>emptyMap());
-
-                    }
-                    catch (IgniteCheckedException e) {
-                        GridNearGetFuture.this.onDone(e);
-                    }
-                });
-            }
-        }
-
-        /**
-         * @param res Result callback.
-         */
-        void onResult(final GridNearGetResponse res) {
-            final Collection<Integer> invalidParts = res.invalidPartitions();
-
-            // If error happened on remote node, fail the whole future.
-            if (res.error() != null) {
-                onDone(res.error());
-
-                return;
-            }
-
-            // Remap invalid partitions.
-            if (!F.isEmpty(invalidParts)) {
-                AffinityTopologyVersion rmtTopVer = res.topologyVersion();
-
-                assert rmtTopVer.topologyVersion() != 0;
-
-                if (rmtTopVer.compareTo(topVer) <= 0) {
-                    // Fail the whole get future.
-                    onDone(new IgniteCheckedException("Failed to process 
invalid partitions response (remote node reported " +
-                        "invalid partitions but remote topology version does 
not differ from local) " +
-                        "[topVer=" + topVer + ", rmtTopVer=" + rmtTopVer + ", 
invalidParts=" + invalidParts +
-                        ", nodeId=" + node.id() + ']'));
-
-                    return;
-                }
-
-                if (log.isDebugEnabled())
-                    log.debug("Remapping mini get future [invalidParts=" + 
invalidParts + ", fut=" + this + ']');
-
-                if (!canRemap) {
-                    map(F.view(keys.keySet(), new P1<KeyCacheObject>() {
-                        @Override public boolean apply(KeyCacheObject key) {
-                            return 
invalidParts.contains(cctx.affinity().partition(key));
-                        }
-                    }), F.t(node, keys), topVer);
-
-                    postProcessResultAndDone(res);
-
-                    return;
-                }
-
-                // Need to wait for next topology version to remap.
-                IgniteInternalFuture<AffinityTopologyVersion> topFut =  
cctx.shared().exchange().affinityReadyFuture(rmtTopVer);
-
-                topFut.listen(new 
CIX1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                    @Override public void applyx(
-                        IgniteInternalFuture<AffinityTopologyVersion> fut) 
throws IgniteCheckedException {
-                        AffinityTopologyVersion readyTopVer = fut.get();
-
-                        // This will append new futures to compound list.
-                        map(F.view(keys.keySet(), new P1<KeyCacheObject>() {
-                            @Override public boolean apply(KeyCacheObject key) 
{
-                                return 
invalidParts.contains(cctx.affinity().partition(key));
-                            }
-                        }), F.t(node, keys), readyTopVer);
-
-                        postProcessResultAndDone(res);
-                    }
-                });
-            }
-            else
-                postProcessResultAndDone(res);
-
-        }
-
-        /**
-         * Post processes result and done future.
-         *
-         * @param res Response.
-         */
-        private void postProcessResultAndDone(final GridNearGetResponse res){
-            try {
-                postProcessResult(res);
-
-                // It is critical to call onDone after adding futures to 
compound list.
-                onDone(loadEntries(node.id(), keys.keySet(), res.entries(), 
savedEntries, topVer));
-            }
-            catch (Exception ex) {
-                onDone(ex);
-            }
-        }
-
-        /**
-         * @param res Response.
-         */
-        private void postProcessResult(final GridNearGetResponse res) {
-            if (postProcessingClos != null)
-                postProcessingClos.apply(res.entries());
-        }
-
         /** {@inheritDoc} */
         @Override public String toString() {
             return S.toString(MiniFuture.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/18aaee03/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheGetReadFromBackupFailoverTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheGetReadFromBackupFailoverTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheGetReadFromBackupFailoverTest.java
new file mode 100644
index 0000000..71cb4ed
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheGetReadFromBackupFailoverTest.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import javax.cache.CacheException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteIllegalStateException;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.AbstractFailureHandler;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteNodeAttributes;
+import org.apache.ignite.internal.NodeStoppingException;
+import org.apache.ignite.internal.processors.cache.GridCacheFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheMvccManager;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Assert;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ * Test for getting values on unstable topology with read from backup enabled.
+ */
+public class CacheGetReadFromBackupFailoverTest extends GridCommonAbstractTest 
{
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new 
TcpDiscoveryVmIpFinder(true);
+    /** Tx cache name. */
+    private static final String TX_CACHE = "txCache";
+    /** Atomic cache name. */
+    private static final String ATOMIC_CACHE = "atomicCache";
+    /** Keys count. */
+    private static final int KEYS_CNT = 50000;
+    /** Stop load flag. */
+    private static final AtomicBoolean stop = new AtomicBoolean();
+    /** Error. */
+    private static final AtomicReference<Throwable> err = new 
AtomicReference<>();
+
+    /**
+     * @return Grid count.
+     */
+    public int gridCount() {
+        return 3;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setFailureHandler(new AbstractFailureHandler() {
+            @Override protected boolean handle(Ignite ignite, FailureContext 
failureCtx) {
+                err.compareAndSet(null, failureCtx.error());
+                stop.set(true);
+                return false;
+            }
+        });
+
+        cfg.setConsistentId(igniteInstanceName);
+
+        cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER));
+
+        CacheConfiguration<Long, Long> txCcfg = new CacheConfiguration<Long, 
Long>(TX_CACHE)
+            .setAtomicityMode(TRANSACTIONAL)
+            .setCacheMode(PARTITIONED)
+            .setBackups(1)
+            .setWriteSynchronizationMode(FULL_SYNC)
+            .setReadFromBackup(true);
+
+        CacheConfiguration<Long, Long> atomicCcfg = new 
CacheConfiguration<Long, Long>(ATOMIC_CACHE)
+            .setAtomicityMode(ATOMIC)
+            .setCacheMode(PARTITIONED)
+            .setBackups(1)
+            .setWriteSynchronizationMode(FULL_SYNC)
+            .setReadFromBackup(true);
+
+        cfg.setCacheConfiguration(txCcfg, atomicCcfg);
+
+        // Enforce different mac adresses to emulate distributed environment 
by default.
+        cfg.setUserAttributes(Collections.singletonMap(
+            IgniteNodeAttributes.ATTR_MACS_OVERRIDE, 
UUID.randomUUID().toString()));
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        stop.set(false);
+
+        err.set(null);
+
+        startGrids(gridCount());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testFailover() throws Exception {
+        Ignite ignite = ignite(0);
+
+        ignite.cluster().active(true);
+
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+        try (IgniteDataStreamer<Object, Object> stmr = 
ignite.dataStreamer(TX_CACHE)) {
+            for (int i = 0; i < KEYS_CNT; i++)
+                stmr.addData(i, rnd.nextLong());
+        }
+
+        try (IgniteDataStreamer<Object, Object> stmr = 
ignite.dataStreamer(ATOMIC_CACHE)) {
+            for (int i = 0; i < KEYS_CNT; i++)
+                stmr.addData(i, rnd.nextLong());
+        }
+
+        AtomicInteger idx = new AtomicInteger(-1);
+
+        AtomicInteger successGet = new AtomicInteger();
+
+        IgniteInternalFuture fut = GridTestUtils.runAsync(() -> {
+            ThreadLocalRandom rnd0 = ThreadLocalRandom.current();
+
+            while (!stop.get()) {
+                Ignite ig = null;
+
+                while (ig == null) {
+                    int n = rnd0.nextInt(gridCount());
+
+                    if (idx.get() != n) {
+                        try {
+                            ig = ignite(n);
+                        }
+                        catch (IgniteIllegalStateException e) {
+                            // No-op.
+                        }
+                    }
+                }
+
+                try {
+                    if (rnd.nextBoolean()) {
+                        ig.cache(TX_CACHE).get(rnd0.nextLong(KEYS_CNT));
+                        ig.cache(ATOMIC_CACHE).get(rnd0.nextLong(KEYS_CNT));
+                    }
+                    else {
+                        ig.cache(TX_CACHE).getAll(rnd.longs(16, 0, 
KEYS_CNT).boxed().collect(Collectors.toSet()));
+                        ig.cache(ATOMIC_CACHE).getAll(rnd.longs(16, 0, 
KEYS_CNT).boxed().collect(Collectors.toSet()));
+                    }
+
+                    successGet.incrementAndGet();
+                }
+                catch (CacheException e) {
+                    if (!X.hasCause(e, NodeStoppingException.class))
+                        throw e;
+                }
+
+            }
+        }, "load-thread");
+
+        long startTime = System.currentTimeMillis();
+
+        while (System.currentTimeMillis() - startTime < 30 * 1000L) {
+            int idx0 = idx.get();
+
+            if (idx0 >= 0)
+                startGrid(idx0);
+
+            U.sleep(500);
+
+            int next = rnd.nextInt(gridCount());
+
+            idx.set(next);
+
+            stopGrid(next);
+
+            U.sleep(500);
+        }
+
+        stop.set(true);
+
+        while (true){
+            try {
+                fut.get(10_000);
+
+                break;
+            }
+            catch (IgniteFutureTimeoutCheckedException e) {
+                for (Ignite i : G.allGrids()) {
+                    IgniteEx ex = (IgniteEx)i;
+
+                    log.info(">>>> " + ex.context().localNodeId());
+
+                    GridCacheMvccManager mvcc = 
ex.context().cache().context().mvcc();
+
+                    for (GridCacheFuture<?> fut0 : mvcc.activeFutures()) {
+                        log.info("activeFut - " + fut0);
+                    }
+
+                    for (GridCacheFuture<?> fut0 : mvcc.atomicFutures()) {
+                        log.info("atomicFut - " + fut0);
+                    }
+                }
+            }
+        }
+
+        Assert.assertTrue(String.valueOf(successGet.get()), successGet.get() > 
50);
+
+        Throwable e = err.get();
+
+        if (e != null) {
+            log.error("Test failed", e);
+
+            fail("Test failed");
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/18aaee03/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
index 0250d58..e367aad 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
@@ -66,6 +66,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.GridCacheTransfor
 import 
org.apache.ignite.internal.processors.cache.distributed.IgniteCacheClientNodeChangingTopologyTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.IgniteCacheClientNodePartitionsExchangeTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.IgniteCacheServerNodeConcurrentStart;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.CacheGetReadFromBackupFailoverTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.CachePartitionPartialCountersMapSelfTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheColocatedOptimisticTransactionSelfTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheColocatedPreloadRestartSelfTest;
@@ -246,6 +247,7 @@ public class IgniteCacheTestSuite2 extends TestSuite {
         GridTestUtils.addTestIfNeeded(suite, 
CacheLoadingConcurrentGridStartSelfTestAllowOverwrite.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, 
CacheTxLoadingConcurrentGridStartSelfTestAllowOverwrite.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, 
GridPartitionedBackupLoadSelfTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, 
CacheGetReadFromBackupFailoverTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, 
GridCachePartitionedLoadCacheSelfTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, 
GridCachePartitionedEventSelfTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, 
GridCachePartitionNotLoadedEventSelfTest.class, ignoredTests);

Reply via email to