Repository: ignite Updated Branches: refs/heads/ignite-single-op-get 72f6ac484 -> 4b9eae88c
ignite-single-op-get Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4b9eae88 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4b9eae88 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4b9eae88 Branch: refs/heads/ignite-single-op-get Commit: 4b9eae88cdafb474b2a2cebe251ca3fd87e9c960 Parents: 72f6ac4 Author: sboikov <[email protected]> Authored: Sun Nov 22 20:04:17 2015 +0300 Committer: sboikov <[email protected]> Committed: Sun Nov 22 20:04:17 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCachePreloader.java | 11 +- .../cache/GridCachePreloaderAdapter.java | 8 +- .../cache/distributed/dht/GridDhtGetFuture.java | 66 +++--- .../dht/GridDhtTransactionalCacheAdapter.java | 8 +- .../distributed/dht/GridDhtTxPrepareFuture.java | 2 +- .../dht/atomic/GridDhtAtomicCache.java | 2 +- .../dht/colocated/GridDhtColocatedCache.java | 11 +- .../dht/preloader/GridDhtPreloader.java | 45 +--- .../distributed/dht/GridCacheDhtTestUtils.java | 227 ------------------- .../near/GridCacheNearReadersSelfTest.java | 16 +- 10 files changed, 68 insertions(+), 328 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/4b9eae88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java index 8b84b0b..d1b85a7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java @@ -134,20 +134,11 @@ public interface GridCachePreloader { public IgniteInternalFuture<Boolean> rebalanceFuture(); /** - * Requests that preloader sends the request for the key. - * - * @param keys Keys to request. - * @param topVer Topology version, {@code -1} if not required. - * @return Future to complete when all keys are preloaded. - */ - public IgniteInternalFuture<Object> request(Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer); - - /** * @param keys Keys. * @param topVer Topology. * @return Future. */ - @Nullable public IgniteInternalFuture<?> requestEx(Collection<KeyCacheObject> keys, + @Nullable public IgniteInternalFuture<?> request(Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer, boolean waitTop); /** http://git-wip-us.apache.org/repos/asf/ignite/blob/4b9eae88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java index 508c76c..e938cce 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java @@ -138,13 +138,7 @@ public class GridCachePreloaderAdapter implements GridCachePreloader { } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<Object> request(Collection<KeyCacheObject> keys, - AffinityTopologyVersion topVer) { - return new GridFinishedFuture<>(); - } - - /** {@inheritDoc} */ - @Nullable @Override public IgniteInternalFuture<?> requestEx(Collection<KeyCacheObject> keys, + @Nullable @Override public IgniteInternalFuture<?> request(Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer, boolean waitTop) { return null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/4b9eae88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java index 7108da6..08e20c2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java @@ -208,43 +208,51 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col * @param keys Keys. */ private void map(final LinkedHashMap<KeyCacheObject, Boolean> keys) { - GridDhtFuture<Object> fut = cctx.dht().dhtPreloader().request(keys.keySet(), topVer); + GridDhtFuture<Object> fut = (GridDhtFuture)cctx.dht().dhtPreloader().request(keys.keySet(), topVer, true); - if (!F.isEmpty(fut.invalidPartitions())) - retries.addAll(fut.invalidPartitions()); + if (fut != null) { + if (!F.isEmpty(fut.invalidPartitions())) + retries.addAll(fut.invalidPartitions()); - add(new GridEmbeddedFuture<>( - new IgniteBiClosure<Object, Exception, Collection<GridCacheEntryInfo>>() { - @Override public Collection<GridCacheEntryInfo> apply(Object o, Exception e) { - if (e != null) { // Check error first. - if (log.isDebugEnabled()) - log.debug("Failed to request keys from preloader [keys=" + keys + ", err=" + e + ']'); - - onDone(e); - } + add(new GridEmbeddedFuture<>( + new IgniteBiClosure<Object, Exception, Collection<GridCacheEntryInfo>>() { + @Override public Collection<GridCacheEntryInfo> apply(Object o, Exception e) { + if (e != null) { // Check error first. + if (log.isDebugEnabled()) + log.debug("Failed to request keys from preloader [keys=" + keys + ", err=" + e + ']'); - LinkedHashMap<KeyCacheObject, Boolean> mappedKeys = U.newLinkedHashMap(keys.size()); + onDone(e); + } - // Assign keys to primary nodes. - for (Map.Entry<KeyCacheObject, Boolean> key : keys.entrySet()) { - int part = cctx.affinity().partition(key.getKey()); + map0(keys); - if (!retries.contains(part)) { - if (!map(key.getKey(), parts)) - retries.add(part); - else - mappedKeys.put(key.getKey(), key.getValue()); - } + // Finish this one. + return Collections.emptyList(); } + }, + fut)); + } + else + map0(keys); + } - // Add new future. - add(getAsync(mappedKeys)); + private void map0(LinkedHashMap<KeyCacheObject, Boolean> keys) { + LinkedHashMap<KeyCacheObject, Boolean> mappedKeys = U.newLinkedHashMap(keys.size()); - // Finish this one. - return Collections.emptyList(); - } - }, - fut)); + // Assign keys to primary nodes. + for (Map.Entry<KeyCacheObject, Boolean> key : keys.entrySet()) { + int part = cctx.affinity().partition(key.getKey()); + + if (!retries.contains(part)) { + if (!map(key.getKey(), parts)) + retries.add(part); + else + mappedKeys.put(key.getKey(), key.getValue()); + } + } + + // Add new future. + add(getAsync(mappedKeys)); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/4b9eae88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index ae24ed1..43f8139 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -374,14 +374,14 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach * @param req Request. */ protected final void processDhtLockRequest(final UUID nodeId, final GridDhtLockRequest req) { - IgniteInternalFuture<Object> keyFut = F.isEmpty(req.keys()) ? null : - ctx.dht().dhtPreloader().request(req.keys(), req.topologyVersion()); + IgniteInternalFuture<?> keyFut = F.isEmpty(req.keys()) ? null : + ctx.dht().dhtPreloader().request(req.keys(), req.topologyVersion(), false); if (keyFut == null || keyFut.isDone()) processDhtLockRequest0(nodeId, req); else { - keyFut.listen(new CI1<IgniteInternalFuture<Object>>() { - @Override public void apply(IgniteInternalFuture<Object> t) { + keyFut.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> t) { processDhtLockRequest0(nodeId, req); } }); http://git-wip-us.apache.org/repos/asf/ignite/blob/4b9eae88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index ba0f0fd..ab341f3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -917,7 +917,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter Collection<KeyCacheObject> keys = entry.getValue(); - IgniteInternalFuture fut0 = cctx.cacheContext(cacheId).preloader().requestEx(keys, + IgniteInternalFuture fut0 = cctx.cacheContext(cacheId).preloader().request(keys, tx.topologyVersion(), false); http://git-wip-us.apache.org/repos/asf/ignite/blob/4b9eae88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index e8ff79f..12360a9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1160,7 +1160,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { final GridNearAtomicUpdateRequest req, final CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb ) { - IgniteInternalFuture<?> forceFut = preldr.requestEx(req.keys(), req.topologyVersion(), false); + IgniteInternalFuture<?> forceFut = preldr.request(req.keys(), req.topologyVersion(), false); if (forceFut == null || forceFut.isDone()) updateAllAsyncInternal0(nodeId, req, completionCb); http://git-wip-us.apache.org/repos/asf/ignite/blob/4b9eae88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index b69b42c..8aae7ef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -874,6 +874,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte * @param skipStore Skip store flag. * @return Lock future. */ + @SuppressWarnings("unchecked") IgniteInternalFuture<Exception> lockAllAsync( final GridCacheContext<?, ?> cacheCtx, @Nullable final GridNearTxLocal tx, @@ -891,13 +892,17 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte ) { assert keys != null; - IgniteInternalFuture<Object> keyFut = ctx.dht().dhtPreloader().request(keys, topVer); + IgniteInternalFuture<Object> keyFut = (IgniteInternalFuture)ctx.dht().dhtPreloader().request( + keys, + topVer, + true); // Prevent embedded future creation if possible. - if (keyFut.isDone()) { + if (keyFut == null || keyFut.isDone()) { try { // Check for exception. - keyFut.get(); + if (keyFut != null) + keyFut.get(); return lockAllAsync0(cacheCtx, tx, http://git-wip-us.apache.org/repos/asf/ignite/blob/4b9eae88/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index 829eb55..48fe04b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -693,7 +693,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { } /** {@inheritDoc} */ - @Nullable @Override public IgniteInternalFuture<?> requestEx(Collection<KeyCacheObject> keys, + @Nullable @Override public IgniteInternalFuture<?> request(Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer, boolean waitTop) { IgniteInternalFuture<?> topReadyFut = waitTop ? cctx.affinity().affinityReadyFuturex(topVer) : null; @@ -728,49 +728,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { } } - /** - * @param keys Keys to request. - * @return Future for request. - */ - @SuppressWarnings( {"unchecked", "RedundantCast"}) - @Override public GridDhtFuture<Object> request(Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer) { - final GridDhtForceKeysFuture<?, ?> fut = new GridDhtForceKeysFuture<>(cctx, topVer, keys, this); - - IgniteInternalFuture<?> topReadyFut = cctx.affinity().affinityReadyFuturex(topVer); - - if (startFut.isDone() && topReadyFut == null) - fut.init(); - else { - if (topReadyFut == null) - startFut.listen(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> syncFut) { - cctx.kernalContext().closure().runLocalSafe( - new GridPlainRunnable() { - @Override public void run() { - fut.init(); - } - }); - } - }); - else { - GridCompoundFuture<Object, Object> compound = new GridCompoundFuture<>(); - - compound.add((IgniteInternalFuture<Object>)startFut); - compound.add((IgniteInternalFuture<Object>)topReadyFut); - - compound.markInitialized(); - - compound.listen(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> syncFut) { - fut.init(); - } - }); - } - } - - return (GridDhtFuture)fut; - } - /** {@inheritDoc} */ @Override public void forcePreload() { demander.forcePreload(); http://git-wip-us.apache.org/repos/asf/ignite/blob/4b9eae88/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtTestUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtTestUtils.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtTestUtils.java deleted file mode 100644 index dd46e23..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtTestUtils.java +++ /dev/null @@ -1,227 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed.dht; - -import java.lang.reflect.Field; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.cache.affinity.Affinity; -import org.apache.ignite.cache.affinity.AffinityFunction; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.GridCacheAdapter; -import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMap; -import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; -import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.P1; -import org.apache.ignite.internal.util.typedef.internal.CU; - -import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING; - -/** - * Utility methods for dht preloader testing. - */ -public class GridCacheDhtTestUtils { - /** - * Ensure singleton. - */ - private GridCacheDhtTestUtils() { - // No-op. - } - - /** - * @param dht Cache. - * @param keyCnt Number of test keys to put into cache. - * @throws IgniteCheckedException If failed to prepare. - */ - @SuppressWarnings({"UnusedAssignment", "unchecked"}) - static void prepareKeys(GridDhtCache<Integer, String> dht, int keyCnt) throws IgniteCheckedException { - AffinityFunction aff = dht.context().config().getAffinity(); - - GridCacheConcurrentMap cacheMap; - - try { - Field field = GridCacheAdapter.class.getDeclaredField("map"); - - field.setAccessible(true); - - cacheMap = (GridCacheConcurrentMap)field.get(dht); - } - catch (Exception e) { - throw new IgniteCheckedException("Failed to get cache map.", e); - } - - GridDhtPartitionTopology top = dht.topology(); - - GridCacheContext ctx = dht.context(); - - for (int i = 0; i < keyCnt; i++) { - KeyCacheObject cacheKey = ctx.toCacheKeyObject(i); - - cacheMap.putEntry(AffinityTopologyVersion.NONE, cacheKey, ctx.toCacheKeyObject("value" + i)); - - dht.preloader().request(Collections.singleton(cacheKey), AffinityTopologyVersion.NONE); - - GridDhtLocalPartition part = top.localPartition(aff.partition(i), false); - - assert part != null; - - part.own(); - } - } - - /** - * @param dht Dht cache. - * @param idx Cache index - */ - static void printDhtTopology(GridDhtCache<Integer, String> dht, int idx) { - final Affinity<Integer> aff = dht.affinity(); - - Ignite ignite = dht.context().grid(); - ClusterNode locNode = ignite.cluster().localNode(); - - GridDhtPartitionTopology top = dht.topology(); - - System.out.println("\nTopology of cache #" + idx + " (" + locNode.id() + ")" + ":"); - System.out.println("----------------------------------"); - - List<Integer> affParts = new LinkedList<>(); - - GridDhtPartitionMap2 map = dht.topology().partitions(locNode.id()); - - if (map != null) - for (int p : map.keySet()) - affParts.add(p); - - Collections.sort(affParts); - - System.out.println("Affinity partitions: " + affParts + "\n"); - - List<GridDhtLocalPartition> locals = new ArrayList<GridDhtLocalPartition>(top.localPartitions()); - - Collections.sort(locals); - - for (final GridDhtLocalPartition part : locals) { - Collection<ClusterNode> partNodes = aff.mapKeyToPrimaryAndBackups(part.id()); - - String ownStr = !partNodes.contains(dht.context().localNode()) ? "NOT AN OWNER" : - F.eqNodes(CU.primary(partNodes), locNode) ? "PRIMARY" : "BACKUP"; - - Collection<Integer> keys = F.viewReadOnly(dht.keySet(), F.<Integer>identity(), new P1<Integer>() { - @Override public boolean apply(Integer k) { - return aff.partition(k) == part.id(); - } - }); - - System.out.println("Local partition: [" + part + "], [owning=" + ownStr + ", keyCnt=" + keys.size() + - ", keys=" + keys + "]"); - } - - System.out.println("\nNode map:"); - - for (Map.Entry<UUID, GridDhtPartitionMap2> e : top.partitionMap(false).entrySet()) { - List<Integer> list = new ArrayList<>(e.getValue().keySet()); - - Collections.sort(list); - - System.out.println("[node=" + e.getKey() + ", parts=" + list + "]"); - } - - System.out.println(""); - } - - /** - * Checks consistency of partitioned cache. - * Any preload processes must be finished before this method call(). - * - * @param dht Dht cache. - * @param idx Cache index. - * @param log Logger. - */ - @SuppressWarnings("unchecked") - static void checkDhtTopology(GridDhtCache<Integer, String> dht, int idx, IgniteLogger log) { - assert dht != null; - assert idx >= 0; - assert log != null; - - log.info("Checking balanced state of cache #" + idx); - - Affinity<Object> aff = (Affinity)dht.affinity(); - - Ignite ignite = dht.context().grid(); - ClusterNode locNode = ignite.cluster().localNode(); - - GridDhtPartitionTopology top = dht.topology(); - - // Expected partitions calculated with affinity function. - // They should be in topology in OWNING state. - Collection<Integer> affParts = new HashSet<>(); - - GridDhtPartitionMap2 map = dht.topology().partitions(locNode.id()); - - if (map != null) - for (int p : map.keySet()) - affParts.add(p); - - if (F.isEmpty(affParts)) - return; - - for (int p : affParts) - assert top.localPartition(p, false) != null : - "Partition does not exist in topology: [cache=" + idx + ", part=" + p + "]"; - - for (GridDhtLocalPartition p : top.localPartitions()) { - assert affParts.contains(p.id()) : - "Invalid local partition: [cache=" + idx + ", part=" + p + ", node partitions=" + affParts + "]"; - - assert p.state() == OWNING : "Invalid partition state [cache=" + idx + ", part=" + p + "]"; - - Collection<ClusterNode> partNodes = aff.mapPartitionToPrimaryAndBackups(p.id()); - - assert partNodes.contains(locNode) : - "Partition affinity nodes does not contain local node: [cache=" + idx + "]"; - } - - // Check keys. - for (GridCacheEntryEx e : dht.entries()) { - GridDhtCacheEntry entry = (GridDhtCacheEntry)e; - - if (!affParts.contains(entry.partition())) - log.warning("Partition of stored entry is obsolete for node: [cache=" + idx + ", entry=" + entry + - ", node partitions=" + affParts + "]"); - - int p = aff.partition(entry.key()); - - if (!affParts.contains(p)) - log.warning("Calculated entry partition is not in node partitions: [cache=" + idx + ", part=" + p + - ", entry=" + entry + ", node partitions=" + affParts + "]"); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/4b9eae88/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearReadersSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearReadersSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearReadersSelfTest.java index b4e1ae6..faf8be4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearReadersSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearReadersSelfTest.java @@ -33,6 +33,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; @@ -233,8 +234,19 @@ public class GridCacheNearReadersSelfTest extends GridCommonAbstractTest { List<KeyCacheObject> cacheKeys = F.asList(ctx.toCacheKeyObject(1), ctx.toCacheKeyObject(2)); - ((IgniteKernal)g1).internalCache(null).preloader().request(cacheKeys, new AffinityTopologyVersion(2)).get(); - ((IgniteKernal)g2).internalCache(null).preloader().request(cacheKeys, new AffinityTopologyVersion(2)).get(); + IgniteInternalFuture<?> fut = ((IgniteKernal)g1).internalCache(null).preloader().request(cacheKeys, + new AffinityTopologyVersion(2), + true); + + if (fut != null) + fut.get(); + + fut = ((IgniteKernal)g2).internalCache(null).preloader().request(cacheKeys, + new AffinityTopologyVersion(2), + true); + + if (fut != null) + fut.get(); IgniteCache<Integer, String> cache1 = g1.cache(null); IgniteCache<Integer, String> cache2 = g2.cache(null);
