Repository: ignite
Updated Branches:
  refs/heads/ignite-single-op-get 72f6ac484 -> 4b9eae88c


ignite-single-op-get


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4b9eae88
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4b9eae88
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4b9eae88

Branch: refs/heads/ignite-single-op-get
Commit: 4b9eae88cdafb474b2a2cebe251ca3fd87e9c960
Parents: 72f6ac4
Author: sboikov <[email protected]>
Authored: Sun Nov 22 20:04:17 2015 +0300
Committer: sboikov <[email protected]>
Committed: Sun Nov 22 20:04:17 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCachePreloader.java    |  11 +-
 .../cache/GridCachePreloaderAdapter.java        |   8 +-
 .../cache/distributed/dht/GridDhtGetFuture.java |  66 +++---
 .../dht/GridDhtTransactionalCacheAdapter.java   |   8 +-
 .../distributed/dht/GridDhtTxPrepareFuture.java |   2 +-
 .../dht/atomic/GridDhtAtomicCache.java          |   2 +-
 .../dht/colocated/GridDhtColocatedCache.java    |  11 +-
 .../dht/preloader/GridDhtPreloader.java         |  45 +---
 .../distributed/dht/GridCacheDhtTestUtils.java  | 227 -------------------
 .../near/GridCacheNearReadersSelfTest.java      |  16 +-
 10 files changed, 68 insertions(+), 328 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/4b9eae88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index 8b84b0b..d1b85a7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -134,20 +134,11 @@ public interface GridCachePreloader {
     public IgniteInternalFuture<Boolean> rebalanceFuture();
 
     /**
-     * Requests that preloader sends the request for the key.
-     *
-     * @param keys Keys to request.
-     * @param topVer Topology version, {@code -1} if not required.
-     * @return Future to complete when all keys are preloaded.
-     */
-    public IgniteInternalFuture<Object> request(Collection<KeyCacheObject> 
keys, AffinityTopologyVersion topVer);
-
-    /**
      * @param keys Keys.
      * @param topVer Topology.
      * @return Future.
      */
-    @Nullable public IgniteInternalFuture<?> 
requestEx(Collection<KeyCacheObject> keys,
+    @Nullable public IgniteInternalFuture<?> 
request(Collection<KeyCacheObject> keys,
         AffinityTopologyVersion topVer, boolean waitTop);
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b9eae88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index 508c76c..e938cce 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -138,13 +138,7 @@ public class GridCachePreloaderAdapter implements 
GridCachePreloader {
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<Object> 
request(Collection<KeyCacheObject> keys,
-        AffinityTopologyVersion topVer) {
-        return new GridFinishedFuture<>();
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public IgniteInternalFuture<?> 
requestEx(Collection<KeyCacheObject> keys,
+    @Nullable @Override public IgniteInternalFuture<?> 
request(Collection<KeyCacheObject> keys,
         AffinityTopologyVersion topVer, boolean waitTop) {
         return null;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b9eae88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
index 7108da6..08e20c2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
@@ -208,43 +208,51 @@ public final class GridDhtGetFuture<K, V> extends 
GridCompoundIdentityFuture<Col
      * @param keys Keys.
      */
     private void map(final LinkedHashMap<KeyCacheObject, Boolean> keys) {
-        GridDhtFuture<Object> fut = 
cctx.dht().dhtPreloader().request(keys.keySet(), topVer);
+        GridDhtFuture<Object> fut = 
(GridDhtFuture)cctx.dht().dhtPreloader().request(keys.keySet(), topVer, true);
 
-        if (!F.isEmpty(fut.invalidPartitions()))
-            retries.addAll(fut.invalidPartitions());
+        if (fut != null) {
+            if (!F.isEmpty(fut.invalidPartitions()))
+                retries.addAll(fut.invalidPartitions());
 
-        add(new GridEmbeddedFuture<>(
-            new IgniteBiClosure<Object, Exception, 
Collection<GridCacheEntryInfo>>() {
-                @Override public Collection<GridCacheEntryInfo> apply(Object 
o, Exception e) {
-                    if (e != null) { // Check error first.
-                        if (log.isDebugEnabled())
-                            log.debug("Failed to request keys from preloader 
[keys=" + keys + ", err=" + e + ']');
-
-                        onDone(e);
-                    }
+            add(new GridEmbeddedFuture<>(
+                new IgniteBiClosure<Object, Exception, 
Collection<GridCacheEntryInfo>>() {
+                    @Override public Collection<GridCacheEntryInfo> 
apply(Object o, Exception e) {
+                        if (e != null) { // Check error first.
+                            if (log.isDebugEnabled())
+                                log.debug("Failed to request keys from 
preloader [keys=" + keys + ", err=" + e + ']');
 
-                    LinkedHashMap<KeyCacheObject, Boolean> mappedKeys = 
U.newLinkedHashMap(keys.size());
+                            onDone(e);
+                        }
 
-                    // Assign keys to primary nodes.
-                    for (Map.Entry<KeyCacheObject, Boolean> key : 
keys.entrySet()) {
-                        int part = cctx.affinity().partition(key.getKey());
+                        map0(keys);
 
-                        if (!retries.contains(part)) {
-                            if (!map(key.getKey(), parts))
-                                retries.add(part);
-                            else
-                                mappedKeys.put(key.getKey(), key.getValue());
-                        }
+                        // Finish this one.
+                        return Collections.emptyList();
                     }
+                },
+                fut));
+        }
+        else
+            map0(keys);
+    }
 
-                    // Add new future.
-                    add(getAsync(mappedKeys));
+    private void map0(LinkedHashMap<KeyCacheObject, Boolean> keys) {
+        LinkedHashMap<KeyCacheObject, Boolean> mappedKeys = 
U.newLinkedHashMap(keys.size());
 
-                    // Finish this one.
-                    return Collections.emptyList();
-                }
-            },
-            fut));
+        // Assign keys to primary nodes.
+        for (Map.Entry<KeyCacheObject, Boolean> key : keys.entrySet()) {
+            int part = cctx.affinity().partition(key.getKey());
+
+            if (!retries.contains(part)) {
+                if (!map(key.getKey(), parts))
+                    retries.add(part);
+                else
+                    mappedKeys.put(key.getKey(), key.getValue());
+            }
+        }
+
+        // Add new future.
+        add(getAsync(mappedKeys));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b9eae88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index ae24ed1..43f8139 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -374,14 +374,14 @@ public abstract class GridDhtTransactionalCacheAdapter<K, 
V> extends GridDhtCach
      * @param req Request.
      */
     protected final void processDhtLockRequest(final UUID nodeId, final 
GridDhtLockRequest req) {
-        IgniteInternalFuture<Object> keyFut = F.isEmpty(req.keys()) ? null :
-            ctx.dht().dhtPreloader().request(req.keys(), 
req.topologyVersion());
+        IgniteInternalFuture<?> keyFut = F.isEmpty(req.keys()) ? null :
+            ctx.dht().dhtPreloader().request(req.keys(), 
req.topologyVersion(), false);
 
         if (keyFut == null || keyFut.isDone())
             processDhtLockRequest0(nodeId, req);
         else {
-            keyFut.listen(new CI1<IgniteInternalFuture<Object>>() {
-                @Override public void apply(IgniteInternalFuture<Object> t) {
+            keyFut.listen(new CI1<IgniteInternalFuture<?>>() {
+                @Override public void apply(IgniteInternalFuture<?> t) {
                     processDhtLockRequest0(nodeId, req);
                 }
             });

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b9eae88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index ba0f0fd..ab341f3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -917,7 +917,7 @@ public final class GridDhtTxPrepareFuture extends 
GridCompoundFuture<IgniteInter
 
             Collection<KeyCacheObject> keys = entry.getValue();
 
-            IgniteInternalFuture fut0 = 
cctx.cacheContext(cacheId).preloader().requestEx(keys,
+            IgniteInternalFuture fut0 = 
cctx.cacheContext(cacheId).preloader().request(keys,
                 tx.topologyVersion(),
                 false);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b9eae88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index e8ff79f..12360a9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1160,7 +1160,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
         final GridNearAtomicUpdateRequest req,
         final CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> 
completionCb
     ) {
-        IgniteInternalFuture<?> forceFut = preldr.requestEx(req.keys(), 
req.topologyVersion(), false);
+        IgniteInternalFuture<?> forceFut = preldr.request(req.keys(), 
req.topologyVersion(), false);
 
         if (forceFut == null || forceFut.isDone())
             updateAllAsyncInternal0(nodeId, req, completionCb);

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b9eae88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index b69b42c..8aae7ef 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -874,6 +874,7 @@ public class GridDhtColocatedCache<K, V> extends 
GridDhtTransactionalCacheAdapte
      * @param skipStore Skip store flag.
      * @return Lock future.
      */
+    @SuppressWarnings("unchecked")
     IgniteInternalFuture<Exception> lockAllAsync(
         final GridCacheContext<?, ?> cacheCtx,
         @Nullable final GridNearTxLocal tx,
@@ -891,13 +892,17 @@ public class GridDhtColocatedCache<K, V> extends 
GridDhtTransactionalCacheAdapte
     ) {
         assert keys != null;
 
-        IgniteInternalFuture<Object> keyFut = 
ctx.dht().dhtPreloader().request(keys, topVer);
+        IgniteInternalFuture<Object> keyFut = 
(IgniteInternalFuture)ctx.dht().dhtPreloader().request(
+            keys,
+            topVer,
+            true);
 
         // Prevent embedded future creation if possible.
-        if (keyFut.isDone()) {
+        if (keyFut == null || keyFut.isDone()) {
             try {
                 // Check for exception.
-                keyFut.get();
+                if (keyFut != null)
+                    keyFut.get();
 
                 return lockAllAsync0(cacheCtx,
                     tx,

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b9eae88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 829eb55..48fe04b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -693,7 +693,7 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public IgniteInternalFuture<?> 
requestEx(Collection<KeyCacheObject> keys,
+    @Nullable @Override public IgniteInternalFuture<?> 
request(Collection<KeyCacheObject> keys,
         AffinityTopologyVersion topVer, boolean waitTop) {
         IgniteInternalFuture<?> topReadyFut = waitTop ? 
cctx.affinity().affinityReadyFuturex(topVer) : null;
 
@@ -728,49 +728,6 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
         }
     }
 
-    /**
-     * @param keys Keys to request.
-     * @return Future for request.
-     */
-    @SuppressWarnings( {"unchecked", "RedundantCast"})
-    @Override public GridDhtFuture<Object> request(Collection<KeyCacheObject> 
keys, AffinityTopologyVersion topVer) {
-        final GridDhtForceKeysFuture<?, ?> fut = new 
GridDhtForceKeysFuture<>(cctx, topVer, keys, this);
-
-        IgniteInternalFuture<?> topReadyFut = 
cctx.affinity().affinityReadyFuturex(topVer);
-
-        if (startFut.isDone() && topReadyFut == null)
-            fut.init();
-        else {
-            if (topReadyFut == null)
-                startFut.listen(new CI1<IgniteInternalFuture<?>>() {
-                    @Override public void apply(IgniteInternalFuture<?> 
syncFut) {
-                        cctx.kernalContext().closure().runLocalSafe(
-                            new GridPlainRunnable() {
-                                @Override public void run() {
-                                    fut.init();
-                                }
-                            });
-                    }
-                });
-            else {
-                GridCompoundFuture<Object, Object> compound = new 
GridCompoundFuture<>();
-
-                compound.add((IgniteInternalFuture<Object>)startFut);
-                compound.add((IgniteInternalFuture<Object>)topReadyFut);
-
-                compound.markInitialized();
-
-                compound.listen(new CI1<IgniteInternalFuture<?>>() {
-                    @Override public void apply(IgniteInternalFuture<?> 
syncFut) {
-                        fut.init();
-                    }
-                });
-            }
-        }
-
-        return (GridDhtFuture)fut;
-    }
-
     /** {@inheritDoc} */
     @Override public void forcePreload() {
         demander.forcePreload();

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b9eae88/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtTestUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtTestUtils.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtTestUtils.java
deleted file mode 100644
index dd46e23..0000000
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtTestUtils.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht;
-
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cache.affinity.Affinity;
-import org.apache.ignite.cache.affinity.AffinityFunction;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
-import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMap;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.P1;
-import org.apache.ignite.internal.util.typedef.internal.CU;
-
-import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
-
-/**
- * Utility methods for dht preloader testing.
- */
-public class GridCacheDhtTestUtils {
-    /**
-     * Ensure singleton.
-     */
-    private GridCacheDhtTestUtils() {
-        // No-op.
-    }
-
-    /**
-     * @param dht Cache.
-     * @param keyCnt Number of test keys to put into cache.
-     * @throws IgniteCheckedException If failed to prepare.
-     */
-    @SuppressWarnings({"UnusedAssignment", "unchecked"})
-    static void prepareKeys(GridDhtCache<Integer, String> dht, int keyCnt) 
throws IgniteCheckedException {
-        AffinityFunction aff = dht.context().config().getAffinity();
-
-        GridCacheConcurrentMap cacheMap;
-
-        try {
-            Field field = GridCacheAdapter.class.getDeclaredField("map");
-
-            field.setAccessible(true);
-
-            cacheMap = (GridCacheConcurrentMap)field.get(dht);
-        }
-        catch (Exception e) {
-            throw new IgniteCheckedException("Failed to get cache map.", e);
-        }
-
-        GridDhtPartitionTopology top = dht.topology();
-
-        GridCacheContext ctx = dht.context();
-
-        for (int i = 0; i < keyCnt; i++) {
-            KeyCacheObject cacheKey = ctx.toCacheKeyObject(i);
-
-            cacheMap.putEntry(AffinityTopologyVersion.NONE, cacheKey, 
ctx.toCacheKeyObject("value" + i));
-
-            dht.preloader().request(Collections.singleton(cacheKey), 
AffinityTopologyVersion.NONE);
-
-            GridDhtLocalPartition part = top.localPartition(aff.partition(i), 
false);
-
-            assert part != null;
-
-            part.own();
-        }
-    }
-
-    /**
-     * @param dht Dht cache.
-     * @param idx Cache index
-     */
-    static void printDhtTopology(GridDhtCache<Integer, String> dht, int idx) {
-        final Affinity<Integer> aff = dht.affinity();
-
-        Ignite ignite = dht.context().grid();
-        ClusterNode locNode = ignite.cluster().localNode();
-
-        GridDhtPartitionTopology top = dht.topology();
-
-        System.out.println("\nTopology of cache #" + idx + " (" + locNode.id() 
+ ")" + ":");
-        System.out.println("----------------------------------");
-
-        List<Integer> affParts = new LinkedList<>();
-
-        GridDhtPartitionMap2 map = dht.topology().partitions(locNode.id());
-
-        if (map != null)
-            for (int p : map.keySet())
-                affParts.add(p);
-
-        Collections.sort(affParts);
-
-        System.out.println("Affinity partitions: " + affParts + "\n");
-
-        List<GridDhtLocalPartition> locals = new 
ArrayList<GridDhtLocalPartition>(top.localPartitions());
-
-        Collections.sort(locals);
-
-        for (final GridDhtLocalPartition part : locals) {
-            Collection<ClusterNode> partNodes = 
aff.mapKeyToPrimaryAndBackups(part.id());
-
-            String ownStr = !partNodes.contains(dht.context().localNode()) ? 
"NOT AN OWNER" :
-                F.eqNodes(CU.primary(partNodes), locNode) ? "PRIMARY" : 
"BACKUP";
-
-            Collection<Integer> keys = F.viewReadOnly(dht.keySet(), 
F.<Integer>identity(), new P1<Integer>() {
-                @Override public boolean apply(Integer k) {
-                    return aff.partition(k) == part.id();
-                }
-            });
-
-            System.out.println("Local partition: [" + part + "], [owning=" + 
ownStr + ", keyCnt=" + keys.size() +
-                ", keys=" + keys + "]");
-        }
-
-        System.out.println("\nNode map:");
-
-        for (Map.Entry<UUID, GridDhtPartitionMap2> e : 
top.partitionMap(false).entrySet()) {
-            List<Integer> list = new ArrayList<>(e.getValue().keySet());
-
-            Collections.sort(list);
-
-            System.out.println("[node=" + e.getKey() + ", parts=" + list + 
"]");
-        }
-
-        System.out.println("");
-    }
-
-    /**
-     * Checks consistency of partitioned cache.
-     * Any preload processes must be finished before this method call().
-     *
-     * @param dht Dht cache.
-     * @param idx Cache index.
-     * @param log Logger.
-     */
-    @SuppressWarnings("unchecked")
-    static void checkDhtTopology(GridDhtCache<Integer, String> dht, int idx, 
IgniteLogger log) {
-        assert dht != null;
-        assert idx >= 0;
-        assert log != null;
-
-        log.info("Checking balanced state of cache #" + idx);
-
-        Affinity<Object> aff = (Affinity)dht.affinity();
-
-        Ignite ignite = dht.context().grid();
-        ClusterNode locNode = ignite.cluster().localNode();
-
-        GridDhtPartitionTopology top = dht.topology();
-
-        // Expected partitions calculated with affinity function.
-        // They should be in topology in OWNING state.
-        Collection<Integer> affParts = new HashSet<>();
-
-        GridDhtPartitionMap2 map = dht.topology().partitions(locNode.id());
-
-        if (map != null)
-            for (int p : map.keySet())
-                affParts.add(p);
-
-        if (F.isEmpty(affParts))
-            return;
-
-        for (int p : affParts)
-            assert top.localPartition(p, false) != null :
-                "Partition does not exist in topology: [cache=" + idx + ", 
part=" + p + "]";
-
-        for (GridDhtLocalPartition p : top.localPartitions()) {
-            assert affParts.contains(p.id()) :
-                "Invalid local partition: [cache=" + idx + ", part=" + p + ", 
node partitions=" + affParts + "]";
-
-            assert p.state() == OWNING : "Invalid partition state [cache=" + 
idx + ", part=" + p + "]";
-
-            Collection<ClusterNode> partNodes = 
aff.mapPartitionToPrimaryAndBackups(p.id());
-
-            assert partNodes.contains(locNode) :
-                "Partition affinity nodes does not contain local node: 
[cache=" + idx + "]";
-        }
-
-        // Check keys.
-        for (GridCacheEntryEx e : dht.entries()) {
-            GridDhtCacheEntry entry = (GridDhtCacheEntry)e;
-
-            if (!affParts.contains(entry.partition()))
-                log.warning("Partition of stored entry is obsolete for node: 
[cache=" + idx + ", entry=" + entry +
-                    ", node partitions=" + affParts + "]");
-
-            int p = aff.partition(entry.key());
-
-            if (!affParts.contains(p))
-                log.warning("Calculated entry partition is not in node 
partitions: [cache=" + idx + ", part=" + p +
-                    ", entry=" + entry + ", node partitions=" + affParts + 
"]");
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b9eae88/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearReadersSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearReadersSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearReadersSelfTest.java
index b4e1ae6..faf8be4 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearReadersSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearReadersSelfTest.java
@@ -33,6 +33,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -233,8 +234,19 @@ public class GridCacheNearReadersSelfTest extends 
GridCommonAbstractTest {
 
         List<KeyCacheObject> cacheKeys = F.asList(ctx.toCacheKeyObject(1), 
ctx.toCacheKeyObject(2));
 
-        ((IgniteKernal)g1).internalCache(null).preloader().request(cacheKeys, 
new AffinityTopologyVersion(2)).get();
-        ((IgniteKernal)g2).internalCache(null).preloader().request(cacheKeys, 
new AffinityTopologyVersion(2)).get();
+        IgniteInternalFuture<?> fut = 
((IgniteKernal)g1).internalCache(null).preloader().request(cacheKeys,
+            new AffinityTopologyVersion(2),
+            true);
+
+        if (fut != null)
+            fut.get();
+
+        fut = 
((IgniteKernal)g2).internalCache(null).preloader().request(cacheKeys,
+            new AffinityTopologyVersion(2),
+            true);
+
+        if (fut != null)
+            fut.get();
 
         IgniteCache<Integer, String> cache1 = g1.cache(null);
         IgniteCache<Integer, String> cache2 = g2.cache(null);

Reply via email to