IGNITE-10207 Fixed missed loss policy checks - Fixes #5360. Signed-off-by: Dmitriy Govorukhin <dmitriy.govoruk...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e4760980 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e4760980 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e4760980 Branch: refs/heads/ignite-10043 Commit: e4760980ab6077ee398965f551fdf8302820ae0e Parents: 665aa95 Author: Dmitriy Govorukhin <dmitriy.govoruk...@gmail.com> Authored: Thu Nov 15 19:15:13 2018 +0300 Committer: Dmitriy Govorukhin <dmitriy.govoruk...@gmail.com> Committed: Thu Nov 15 19:15:13 2018 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheContext.java | 19 +- .../cache/distributed/dht/GridDhtGetFuture.java | 40 +- .../distributed/dht/GridDhtGetSingleFuture.java | 41 +- .../dht/GridDhtTopologyFutureAdapter.java | 233 ++++--- .../dht/GridPartitionedGetFuture.java | 2 +- .../dht/GridPartitionedSingleGetFuture.java | 9 +- .../GridDhtPartitionsExchangeFuture.java | 4 +- .../distributed/near/GridNearGetFuture.java | 2 +- ...CacheResultIsNotNullOnPartitionLossTest.java | 23 +- .../IgniteCachePartitionLossPolicySelfTest.java | 652 +++++++------------ ...ndexingCachePartitionLossPolicySelfTest.java | 2 +- 11 files changed, 483 insertions(+), 544 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/e4760980/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 44d067c..53c0bf0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -2241,9 +2241,14 @@ public class GridCacheContext<K, V> implements Externalizable { * * @param affNodes All affinity nodes. * @param canRemap Flag indicating that 'get' should be done on a locked topology version. + * @param partitionId Partition ID. * @return Affinity node to get key from or {@code null} if there is no suitable alive node. */ - @Nullable public ClusterNode selectAffinityNodeBalanced(List<ClusterNode> affNodes, boolean canRemap) { + @Nullable public ClusterNode selectAffinityNodeBalanced( + List<ClusterNode> affNodes, + int partitionId, + boolean canRemap + ) { if (!readLoadBalancingEnabled) { if (!canRemap) { for (ClusterNode node : affNodes) { @@ -2267,7 +2272,7 @@ public class GridCacheContext<K, V> implements Externalizable { ClusterNode n0 = null; for (ClusterNode node : affNodes) { - if (canRemap || discovery().alive(node)) { + if ((canRemap || discovery().alive(node) && isOwner(node, partitionId))) { if (locMacs.equals(node.attribute(ATTR_MACS))) return node; @@ -2282,6 +2287,16 @@ public class GridCacheContext<K, V> implements Externalizable { } /** + * Check that node is owner for partition. + * @param node Cluster node. + * @param partitionId Partition ID. + * @return {@code} + */ + private boolean isOwner(ClusterNode node, int partitionId) { + return topology().partitionState(node.id(), partitionId) == OWNING; + } + + /** * Prepare affinity field for builder (if possible). * * @param buider Builder. http://git-wip-us.apache.org/repos/asf/ignite/blob/e4760980/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java index 024e262..96d1769 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java @@ -37,6 +37,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.ReaderArguments; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFutureAdapter.LostPolicyValidator; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; @@ -55,6 +56,11 @@ import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import static java.util.Collections.singleton; +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFutureAdapter.OperationType.READ; +import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.LOST; +import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING; + /** * */ @@ -185,6 +191,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col * Initializes future. */ void init() { + // TODO get rid of force keys request https://issues.apache.org/jira/browse/IGNITE-10251 GridDhtFuture<Object> fut = cctx.group().preloader().request(cctx, keys.keySet(), topVer); if (fut != null) { @@ -209,14 +216,14 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col return; } - map0(keys); + map0(keys, true); markInitialized(); } }); } else { - map0(keys); + map0(keys, false); markInitialized(); } @@ -257,7 +264,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col /** * @param keys Keys to map. */ - private void map0(Map<KeyCacheObject, Boolean> keys) { + private void map0(Map<KeyCacheObject, Boolean> keys, boolean forceKeys) { Map<KeyCacheObject, Boolean> mappedKeys = null; // Assign keys to primary nodes. @@ -265,7 +272,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col int part = cctx.affinity().partition(key.getKey()); if (retries == null || !retries.contains(part)) { - if (!map(key.getKey())) { + if (!map(key.getKey(), forceKeys)) { if (retries == null) retries = new HashSet<>(); @@ -309,7 +316,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col * @param key Key. * @return {@code True} if mapped. */ - private boolean map(KeyCacheObject key) { + private boolean map(KeyCacheObject key, boolean forceKeys) { try { int keyPart = cctx.affinity().partition(key); @@ -320,14 +327,31 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col if (part == null) return false; + if (!forceKeys && part.state() == LOST && !recovery) { + Throwable error = LostPolicyValidator.validate(cctx, key, READ, singleton(part.id())); + + if (error != null) { + onDone(null, error); + + return false; + } + } + if (parts == null || !F.contains(parts, part.id())) { // By reserving, we make sure that partition won't be unloaded while processed. if (part.reserve()) { - parts = parts == null ? new int[1] : Arrays.copyOf(parts, parts.length + 1); + if (forceKeys || (part.state() == OWNING || part.state() == LOST)) { + parts = parts == null ? new int[1] : Arrays.copyOf(parts, parts.length + 1); - parts[parts.length - 1] = part.id(); + parts[parts.length - 1] = part.id(); - return true; + return true; + } + else { + part.release(); + + return false; + } } else return false; http://git-wip-us.apache.org/repos/asf/ignite/blob/e4760980/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java index 88f6848..e0fe8be 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java @@ -35,6 +35,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.ReaderArguments; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFutureAdapter.LostPolicyValidator; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; @@ -47,6 +48,11 @@ import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import static java.util.Collections.singleton; +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFutureAdapter.OperationType.READ; +import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.LOST; +import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING; + /** * */ @@ -207,6 +213,7 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa * */ private void map() { + // TODO get rid of force keys request https://issues.apache.org/jira/browse/IGNITE-10251 if (cctx.group().preloader().needForceKeys()) { GridDhtFuture<Object> fut = cctx.group().preloader().request( cctx, @@ -240,7 +247,7 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa onDone(e); } else - map0(); + map0(true); } } ); @@ -249,19 +256,20 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa } } - map0(); + map0(false); } /** * */ - private void map0() { + private void map0(boolean forceKeys) { assert retry == null : retry; - if (!map(key)) { + if (!map(key, forceKeys)) { retry = cctx.affinity().partition(key); - onDone((GridCacheEntryInfo)null); + if (!isDone()) + onDone((GridCacheEntryInfo)null); return; } @@ -278,7 +286,7 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa * @param key Key. * @return {@code True} if mapped. */ - private boolean map(KeyCacheObject key) { + private boolean map(KeyCacheObject key, boolean forceKeys) { try { int keyPart = cctx.affinity().partition(key); @@ -291,11 +299,28 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa assert this.part == -1; + if (!forceKeys && part.state() == LOST && !recovery) { + Throwable error = LostPolicyValidator.validate(cctx, key, READ, singleton(part.id())); + + if (error != null) { + onDone(null, error); + + return false; + } + } + // By reserving, we make sure that partition won't be unloaded while processed. if (part.reserve()) { - this.part = part.id(); + if (forceKeys || (part.state() == OWNING || part.state() == LOST)) { + this.part = part.id(); + + return true; + } + else { + part.release(); - return true; + return false; + } } else return false; http://git-wip-us.apache.org/repos/asf/ignite/blob/e4760980/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java index 9214308..8a6a5ee 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java @@ -29,12 +29,13 @@ import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.CacheInvalidStateException; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.F; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_ALL; import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_SAFE; -import static org.apache.ignite.cache.PartitionLossPolicy.READ_WRITE_ALL; import static org.apache.ignite.cache.PartitionLossPolicy.READ_WRITE_SAFE; +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFutureAdapter.OperationType.WRITE; /** * @@ -42,7 +43,7 @@ import static org.apache.ignite.cache.PartitionLossPolicy.READ_WRITE_SAFE; public abstract class GridDhtTopologyFutureAdapter extends GridFutureAdapter<AffinityTopologyVersion> implements GridDhtTopologyFuture { /** Cache groups validation results. */ - protected volatile Map<Integer, CacheValidation> grpValidRes; + protected volatile Map<Integer, CacheGroupValidation> grpValidRes = Collections.emptyMap(); /** Whether or not cluster is active. */ protected volatile boolean clusterIsActive = true; @@ -52,7 +53,7 @@ public abstract class GridDhtTopologyFutureAdapter extends GridFutureAdapter<Aff * @param topNodes Topology nodes. * @return Validation result. */ - protected final CacheValidation validateCacheGroup(CacheGroupContext grp, Collection<ClusterNode> topNodes) { + protected final CacheGroupValidation validateCacheGroup(CacheGroupContext grp, Collection<ClusterNode> topNodes) { Collection<Integer> lostParts = grp.isLocal() ? Collections.<Integer>emptyList() : grp.topology().lostPartitions(); @@ -65,11 +66,11 @@ public abstract class GridDhtTopologyFutureAdapter extends GridFutureAdapter<Aff valid = validator.validate(topNodes); } - return new CacheValidation(valid, lostParts); + return new CacheGroupValidation(valid, lostParts); } /** {@inheritDoc} */ - @Nullable @Override public final Throwable validateCache( + @Override public final @Nullable Throwable validateCache( GridCacheContext cctx, boolean recovery, boolean read, @@ -87,115 +88,181 @@ public abstract class GridDhtTopologyFutureAdapter extends GridFutureAdapter<Aff return new CacheInvalidStateException( "Failed to perform cache operation (cluster is not activated): " + cctx.name()); + OperationType opType = read ? OperationType.READ : WRITE; + CacheGroupContext grp = cctx.group(); - PartitionLossPolicy partLossPlc = grp.config().getPartitionLossPolicy(); + PartitionLossPolicy lossPlc = grp.config().getPartitionLossPolicy(); + + if (cctx.shared().readOnlyMode() && opType == WRITE) + return new IgniteCheckedException("Failed to perform cache operation (cluster is in read only mode)"); if (grp.needsRecovery() && !recovery) { - if (!read && (partLossPlc == READ_ONLY_SAFE || partLossPlc == READ_ONLY_ALL)) - return new IgniteCheckedException("Failed to write to cache (cache is moved to a read-only state): " + - cctx.name()); + if (opType == WRITE && (lossPlc == READ_ONLY_SAFE || lossPlc == READ_ONLY_ALL)) + return new IgniteCheckedException( + "Failed to write to cache (cache is moved to a read-only state): " + cctx.name()); } - if (cctx.shared().readOnlyMode() && !read) - return new IgniteCheckedException("Failed to perform cache operation (cluster is in read only mode)" ); + CacheGroupValidation validation = grpValidRes.get(grp.groupId()); - if (grp.needsRecovery() || grp.topologyValidator() != null) { - CacheValidation validation = grpValidRes.get(grp.groupId()); + if (validation == null) + return null; - if (validation == null) - return null; + if (opType == WRITE && !validation.isValid()) { + return new IgniteCheckedException("Failed to perform cache operation " + + "(cache topology is not valid): " + cctx.name()); + } - if (!validation.valid && !read) - return new IgniteCheckedException("Failed to perform cache operation " + - "(cache topology is not valid): " + cctx.name()); + if (recovery) + return null; - if (recovery || !grp.needsRecovery()) - return null; + if (validation.hasLostPartitions()) { + if (key != null) + return LostPolicyValidator.validate(cctx, key, opType, validation.lostPartitions()); - if (key != null) { - int p = cctx.affinity().partition(key); + if (keys != null) + return LostPolicyValidator.validate(cctx, keys, opType, validation.lostPartitions()); + } - CacheInvalidStateException ex = validatePartitionOperation(cctx.name(), read, key, p, - validation.lostParts, partLossPlc); + return null; + } - if (ex != null) - return ex; - } + /** + * Cache group validation result. + */ + protected static class CacheGroupValidation { + /** Topology validation result. */ + private final boolean valid; - if (keys != null) { - for (Object k : keys) { - int p = cctx.affinity().partition(k); + /** Lost partitions on this topology version. */ + private final Collection<Integer> lostParts; - CacheInvalidStateException ex = validatePartitionOperation(cctx.name(), read, k, p, - validation.lostParts, partLossPlc); + /** + * @param valid Valid flag. + * @param lostParts Lost partitions. + */ + private CacheGroupValidation(boolean valid, Collection<Integer> lostParts) { + this.valid = valid; + this.lostParts = lostParts; + } - if (ex != null) - return ex; - } - } + /** + * @return True if valid, False if invalide. + */ + public boolean isValid() { + return valid; } - return null; + /** + * @return True if lost partition is present, False if not. + */ + public boolean hasLostPartitions() { + return !F.isEmpty(lostParts); + } + + /** + * @return Lost patition ID collection. + */ + public Collection<Integer> lostPartitions() { + return lostParts; + } } /** - * @param cacheName Cache name. - * @param read Read flag. - * @param key Key to check. - * @param part Partition this key belongs to. - * @param lostParts Collection of lost partitions. - * @param plc Partition loss policy. - * @return Invalid state exception if this operation is disallowed. + * */ - private CacheInvalidStateException validatePartitionOperation( - String cacheName, - boolean read, - Object key, - int part, - Collection<Integer> lostParts, - PartitionLossPolicy plc - ) { - if (lostParts.contains(part)) { - if (!read) { - assert plc == READ_WRITE_ALL || plc == READ_WRITE_SAFE; + public enum OperationType { + /** + * Read operation. + */ + READ, + /** + * Write operation. + */ + WRITE + } + + /** + * Lost policy validator. + */ + public static class LostPolicyValidator { + /** + * + */ + public static Throwable validate( + GridCacheContext cctx, + Object key, + OperationType opType, + Collection<Integer> lostParts + ) { + CacheGroupContext grp = cctx.group(); + + PartitionLossPolicy lostPlc = grp.config().getPartitionLossPolicy(); + + int partition = cctx.affinity().partition(key); + + return validate(cctx, key, partition, opType, lostPlc, lostParts); + } + + /** + * + */ + public static Throwable validate( + GridCacheContext cctx, + Collection<?> keys, + OperationType opType, + Collection<Integer> lostParts + ) { + CacheGroupContext grp = cctx.group(); - if (plc == READ_WRITE_SAFE) { + PartitionLossPolicy lostPlc = grp.config().getPartitionLossPolicy(); + + for (Object key : keys) { + int partition = cctx.affinity().partition(key); + + Throwable res = validate(cctx, key, partition, opType, lostPlc, lostParts); + + if (res != null) + return res; + } + + return null; + } + + /** + * + */ + private static Throwable validate( + GridCacheContext cctx, + Object key, + int partition, + OperationType opType, + PartitionLossPolicy lostPlc, + Collection<Integer> lostParts + ) { + if (opType == WRITE) { + if (lostPlc == READ_ONLY_SAFE || lostPlc == READ_ONLY_ALL) { + return new IgniteCheckedException( + "Failed to write to cache (cache is moved to a read-only state): " + cctx.name() + ); + } + + if (lostParts.contains(partition) && lostPlc == READ_WRITE_SAFE) { return new CacheInvalidStateException("Failed to execute cache operation " + "(all partition owners have left the grid, partition data has been lost) [" + - "cacheName=" + cacheName + ", part=" + part + ", key=" + key + ']'); + "cacheName=" + cctx.name() + ", part=" + partition + ", key=" + key + ']'); } } - else { - // Read. - if (plc == READ_ONLY_SAFE || plc == READ_WRITE_SAFE) + + if (opType == OperationType.READ) { + if (lostParts.contains(partition) && (lostPlc == READ_ONLY_SAFE || lostPlc == READ_WRITE_SAFE)) return new CacheInvalidStateException("Failed to execute cache operation " + "(all partition owners have left the grid, partition data has been lost) [" + - "cacheName=" + cacheName + ", part=" + part + ", key=" + key + ']'); + "cacheName=" + cctx.name() + ", part=" + partition + ", key=" + key + ']' + ); } - } - - return null; - } - - /** - * Cache validation result. - */ - protected static class CacheValidation { - /** Topology validation result. */ - private boolean valid; - /** Lost partitions on this topology version. */ - private Collection<Integer> lostParts; - - /** - * @param valid Valid flag. - * @param lostParts Lost partitions. - */ - private CacheValidation(boolean valid, Collection<Integer> lostParts) { - this.valid = valid; - this.lostParts = lostParts; + return null; } } - } http://git-wip-us.apache.org/repos/asf/ignite/blob/e4760980/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index 8725e05..2fcd677 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -483,7 +483,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda } } - ClusterNode node = cctx.selectAffinityNodeBalanced(affNodes, canRemap); + ClusterNode node = cctx.selectAffinityNodeBalanced(affNodes, part, canRemap); if (node == null) { onDone(serverNotFoundError(part, topVer)); http://git-wip-us.apache.org/repos/asf/ignite/blob/e4760980/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java index ad716e6..4d0e129 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java @@ -367,7 +367,7 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec } } - ClusterNode affNode = cctx.selectAffinityNodeBalanced(affNodes, canRemap); + ClusterNode affNode = cctx.selectAffinityNodeBalanced(affNodes, part, canRemap); if (affNode == null) { onDone(serverNotFoundError(part, topVer)); @@ -775,6 +775,13 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec private void remap(final AffinityTopologyVersion topVer) { cctx.closures().runLocalSafe(new Runnable() { @Override public void run() { + GridDhtTopologyFuture lastFut = cctx.shared().exchange().lastFinishedFuture(); + + Throwable error = lastFut.validateCache(cctx, recovery, true, key, null); + + if (error != null) + onDone(error); + map(topVer); } }); http://git-wip-us.apache.org/repos/asf/ignite/blob/e4760980/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 3702a51..c8471c7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -2077,10 +2077,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } } - if (serverNodeDiscoveryEvent()) + if (serverNodeDiscoveryEvent() || localJoinExchange()) detectLostPartitions(res); - Map<Integer, CacheValidation> m = U.newHashMap(cctx.cache().cacheGroups().size()); + Map<Integer, CacheGroupValidation> m = U.newHashMap(cctx.cache().cacheGroups().size()); for (CacheGroupContext grp : cctx.cache().cacheGroups()) m.put(grp.groupId(), validateCacheGroup(grp, events().lastEvent().topologyNodes())); http://git-wip-us.apache.org/repos/asf/ignite/blob/e4760980/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index 0350e1a..54c3cae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -494,7 +494,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap } } - ClusterNode affNode = cctx.selectAffinityNodeBalanced(affNodes, canRemap); + ClusterNode affNode = cctx.selectAffinityNodeBalanced(affNodes, part, canRemap); if (affNode == null) { onDone(serverNotFoundError(part, topVer)); http://git-wip-us.apache.org/repos/asf/ignite/blob/e4760980/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheResultIsNotNullOnPartitionLossTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheResultIsNotNullOnPartitionLossTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheResultIsNotNullOnPartitionLossTest.java index ceafc9e..0958f83 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheResultIsNotNullOnPartitionLossTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheResultIsNotNullOnPartitionLossTest.java @@ -17,15 +17,21 @@ package org.apache.ignite.internal.processors.cache.distributed; +import java.util.Collections; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteClientDisconnectedException; import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cache.PartitionLossPolicy; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.events.EventType; @@ -48,13 +54,13 @@ public class CacheResultIsNotNullOnPartitionLossTest extends GridCommonAbstractT private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); /** Number of servers to be started. */ - private static final int SERVERS = 10; + private static final int SERVERS = 5; /** Index of node that is goning to be the only client node. */ private static final int CLIENT_IDX = SERVERS; /** Number of cache entries to insert into the test cache. */ - private static final int CACHE_ENTRIES_CNT = 10_000; + private static final int CACHE_ENTRIES_CNT = 60; /** True if {@link #getConfiguration(String)} is expected to configure client node on next invocations. */ private boolean isClient; @@ -75,6 +81,7 @@ public class CacheResultIsNotNullOnPartitionLossTest extends GridCommonAbstractT .setCacheMode(CacheMode.PARTITIONED) .setBackups(0) .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC) + .setAffinity(new RendezvousAffinityFunction(false, 50)) .setPartitionLossPolicy(PartitionLossPolicy.READ_WRITE_SAFE) ); @@ -90,7 +97,12 @@ public class CacheResultIsNotNullOnPartitionLossTest extends GridCommonAbstractT cleanPersistenceDir(); - startGrids(SERVERS); + List<Integer> list = IntStream.range(0, SERVERS).boxed().collect(Collectors.toList()); + + Collections.shuffle(list); + + for (Integer i : list) + startGrid(i); isClient = true; @@ -178,9 +190,9 @@ public class CacheResultIsNotNullOnPartitionLossTest extends GridCommonAbstractT readerThreadStarted.await(1, TimeUnit.SECONDS); for (int i = 0; i < SERVERS - 1; i++) { - Thread.sleep(50L); - grid(i).close(); + + Thread.sleep(400L); } } finally { @@ -204,6 +216,7 @@ public class CacheResultIsNotNullOnPartitionLossTest extends GridCommonAbstractT private boolean expectedThrowableClass(Throwable throwable) { return X.hasCause( throwable, + IgniteClientDisconnectedException.class, CacheInvalidStateException.class, ClusterTopologyCheckedException.class, IllegalStateException.class, http://git-wip-us.apache.org/repos/asf/ignite/blob/e4760980/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java index caf0829..f02563d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java @@ -18,14 +18,11 @@ package org.apache.ignite.internal.processors.cache.distributed; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.TreeSet; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; @@ -39,8 +36,6 @@ import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cache.query.ScanQuery; import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.DataRegionConfiguration; -import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.events.CacheRebalancingEvent; import org.apache.ignite.events.Event; @@ -48,7 +43,7 @@ import org.apache.ignite.events.EventType; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.TestDelayingCommunicationSpi; import org.apache.ignite.internal.managers.communication.GridIoMessage; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; import org.apache.ignite.internal.util.typedef.F; @@ -58,18 +53,19 @@ import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; -import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import static java.util.Arrays.asList; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.internal.processors.cache.GridCacheUtils.cacheId; /** * */ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTest { /** */ - private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); /** */ private boolean client; @@ -78,43 +74,36 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe private PartitionLossPolicy partLossPlc; /** */ - protected static final String CACHE_NAME = "partitioned"; + private int backups; /** */ - private int backups = 0; + private final AtomicBoolean delayPartExchange = new AtomicBoolean(); /** */ - private final AtomicBoolean delayPartExchange = new AtomicBoolean(false); - - /** */ - private final TopologyChanger killSingleNode = new TopologyChanger(false, Collections.singletonList(3), Arrays.asList(0, 1, 2, 4), 0); - - /** */ - private boolean isPersistenceEnabled; + private final TopologyChanger killSingleNode = new TopologyChanger( + false, asList(3), asList(0, 1, 2, 4), 0 + ); /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); - ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER)); cfg.setCommunicationSpi(new TestDelayingCommunicationSpi() { + /** {@inheritDoc} */ @Override protected boolean delayMessage(Message msg, GridIoMessage ioMsg) { - return delayPartExchange.get() && (msg instanceof GridDhtPartitionsFullMessage || msg instanceof GridDhtPartitionsAbstractMessage); + return delayPartExchange.get() && + (msg instanceof GridDhtPartitionsFullMessage || msg instanceof GridDhtPartitionsAbstractMessage); } - @Override protected int delayMillis() { - return 250; - } }); cfg.setClientMode(client); - cfg.setCacheConfiguration(cacheConfiguration()); + cfg.setConsistentId(gridName); - cfg.setDataStorageConfiguration(new DataStorageConfiguration().setDefaultDataRegionConfiguration( - new DataRegionConfiguration().setPersistenceEnabled(isPersistenceEnabled) - )); + cfg.setCacheConfiguration(cacheConfiguration()); return cfg; } @@ -123,7 +112,7 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe * @return Cache configuration. */ protected CacheConfiguration<Integer, Integer> cacheConfiguration() { - CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>(CACHE_NAME); + CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME); cacheCfg.setCacheMode(PARTITIONED); cacheCfg.setBackups(backups); @@ -135,44 +124,27 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe } /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - super.beforeTest(); - - delayPartExchange.set(false); - - partLossPlc = PartitionLossPolicy.IGNORE; - - backups = 0; - - isPersistenceEnabled = false; + @Override protected void afterTest() throws Exception { + stopAllGrids(); } /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - stopAllGrids(); + @Override protected void beforeTest() throws Exception { + super.beforeTest(); cleanPersistenceDir(); - super.afterTest(); - } - - /** - * @throws Exception if failed. - */ - public void testReadOnlySafe() throws Exception { - partLossPlc = PartitionLossPolicy.READ_ONLY_SAFE; + delayPartExchange.set(false); - checkLostPartition(false, true, killSingleNode); + backups = 0; } /** * @throws Exception if failed. */ - public void testReadOnlySafeWithPersistence() throws Exception { + public void testReadOnlySafe() throws Exception { partLossPlc = PartitionLossPolicy.READ_ONLY_SAFE; - isPersistenceEnabled = true; - checkLostPartition(false, true, killSingleNode); } @@ -188,19 +160,6 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe /** * @throws Exception if failed. */ - public void testReadOnlyAllWithPersistence() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-10041"); - - partLossPlc = PartitionLossPolicy.READ_ONLY_ALL; - - isPersistenceEnabled = true; - - checkLostPartition(false, false, killSingleNode); - } - - /** - * @throws Exception if failed. - */ public void testReadWriteSafe() throws Exception { partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE; @@ -210,17 +169,6 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe /** * @throws Exception if failed. */ - public void testReadWriteSafeWithPersistence() throws Exception { - partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE; - - isPersistenceEnabled = true; - - checkLostPartition(true, true, killSingleNode); - } - - /** - * @throws Exception if failed. - */ public void testReadWriteAll() throws Exception { partLossPlc = PartitionLossPolicy.READ_WRITE_ALL; @@ -230,34 +178,10 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe /** * @throws Exception if failed. */ - public void testReadWriteAllWithPersistence() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-10041"); - - partLossPlc = PartitionLossPolicy.READ_WRITE_ALL; - - isPersistenceEnabled = true; - - checkLostPartition(true, false, killSingleNode); - } - - /** - * @throws Exception if failed. - */ public void testReadWriteSafeAfterKillTwoNodes() throws Exception { partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE; - checkLostPartition(true, true, new TopologyChanger(false, Arrays.asList(3, 2), Arrays.asList(0, 1, 4), 0)); - } - - /** - * @throws Exception if failed. - */ - public void testReadWriteSafeAfterKillTwoNodesWithPersistence() throws Exception { - partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE; - - isPersistenceEnabled = true; - - checkLostPartition(true, true, new TopologyChanger(false, Arrays.asList(3, 2), Arrays.asList(0, 1, 4), 0)); + checkLostPartition(true, true, new TopologyChanger(false, asList(3, 2), asList(0, 1, 4), 0)); } /** @@ -266,18 +190,7 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe public void testReadWriteSafeAfterKillTwoNodesWithDelay() throws Exception { partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE; - checkLostPartition(true, true, new TopologyChanger(false, Arrays.asList(3, 2), Arrays.asList(0, 1, 4), 20)); - } - - /** - * @throws Exception if failed. - */ - public void testReadWriteSafeAfterKillTwoNodesWithDelayWithPersistence() throws Exception { - partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE; - - isPersistenceEnabled = true; - - checkLostPartition(true, true, new TopologyChanger(false, Arrays.asList(3, 2), Arrays.asList(0, 1, 4), 20)); + checkLostPartition(true, true, new TopologyChanger(false, asList(3, 2), asList(0, 1, 4), 20)); } /** @@ -288,22 +201,7 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe backups = 1; - checkLostPartition(true, true, new TopologyChanger(true, Arrays.asList(3, 2, 1), Arrays.asList(0, 4), 0)); - } - - /** - * @throws Exception if failed. - */ - public void testReadWriteSafeWithBackupsAfterKillThreeNodesWithPersistence() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-10043"); - - partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE; - - backups = 1; - - isPersistenceEnabled = true; - - checkLostPartition(true, true, new TopologyChanger(true, Arrays.asList(3, 2, 1), Arrays.asList(0, 4), 0)); + checkLostPartition(true, true, new TopologyChanger(true, asList(3, 2, 1), asList(0, 4), 0)); } /** @@ -312,18 +210,7 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe public void testReadWriteSafeAfterKillCrd() throws Exception { partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE; - checkLostPartition(true, true, new TopologyChanger(true, Arrays.asList(3, 0), Arrays.asList(1, 2, 4), 0)); - } - - /** - * @throws Exception if failed. - */ - public void testReadWriteSafeAfterKillCrdWithPersistence() throws Exception { - partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE; - - isPersistenceEnabled = true; - - checkLostPartition(true, true, new TopologyChanger(true, Arrays.asList(3, 0), Arrays.asList(1, 2, 4), 0)); + checkLostPartition(true, true, new TopologyChanger(true, asList(3, 0), asList(1, 2, 4), 0)); } /** @@ -334,20 +221,7 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe backups = 1; - checkLostPartition(true, true, new TopologyChanger(true, Arrays.asList(3, 2), Arrays.asList(0, 1, 4), 0)); - } - - /** - * @throws Exception if failed. - */ - public void testReadWriteSafeWithBackupsWithPersistence() throws Exception { - partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE; - - backups = 1; - - isPersistenceEnabled = true; - - checkLostPartition(true, true, new TopologyChanger(true, Arrays.asList(3, 2), Arrays.asList(0, 1, 4), 0)); + checkLostPartition(true, true, new TopologyChanger(true, asList(3, 2), asList(0, 1, 4), 0)); } /** @@ -358,95 +232,26 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe backups = 1; - checkLostPartition(true, true, new TopologyChanger(true, Arrays.asList(3, 0), Arrays.asList(1, 2, 4), 0)); - } - - /** - * @throws Exception if failed. - */ - public void testReadWriteSafeWithBackupsAfterKillCrdWithPersistence() throws Exception { - partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE; - - backups = 1; - - isPersistenceEnabled = true; - - checkLostPartition(true, true, new TopologyChanger(true, Arrays.asList(3, 0), Arrays.asList(1, 2, 4), 0)); - } - - /** - * @throws Exception if failed. - */ - public void testIgnore() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-5078"); - - partLossPlc = PartitionLossPolicy.IGNORE; - - checkIgnore(killSingleNode); + checkLostPartition(true, true, new TopologyChanger(true, asList(3, 0), asList(1, 2, 4), 0)); } /** + * @param topChanger topology changer. * @throws Exception if failed. */ - public void testIgnoreWithPersistence() throws Exception { + public void testIgnore(TopologyChanger topChanger) throws Exception { fail("https://issues.apache.org/jira/browse/IGNITE-5078"); - fail("https://issues.apache.org/jira/browse/IGNITE-10041"); - - partLossPlc = PartitionLossPolicy.IGNORE; - - isPersistenceEnabled = true; - - checkIgnore(killSingleNode); - } - - /** - * @throws Exception if failed. - */ - public void testIgnoreKillThreeNodes() throws Exception { - partLossPlc = PartitionLossPolicy.IGNORE; - - // TODO aliveNodes should include node 4, but it fails due to https://issues.apache.org/jira/browse/IGNITE-5078. - // TODO need to add 4 to the aliveNodes after IGNITE-5078 is fixed. - // TopologyChanger onlyCrdIsAlive = new TopologyChanger(false, Arrays.asList(1, 2, 3), Arrays.asList(0, 4), 0); - TopologyChanger onlyCrdIsAlive = new TopologyChanger(false, Arrays.asList(1, 2, 3), Collections.singletonList(0), 0); - - checkIgnore(onlyCrdIsAlive); - } - - /** - * @throws Exception if failed. - */ - public void testIgnoreKillThreeNodesWithPersistence() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-10041"); - - partLossPlc = PartitionLossPolicy.IGNORE; - - isPersistenceEnabled = true; - - // TODO aliveNodes should include node 4, but it fails due to https://issues.apache.org/jira/browse/IGNITE-5078. - // TODO need to add 4 to the aliveNodes after IGNITE-5078 is fixed. - // TopologyChanger onlyCrdIsAlive = new TopologyChanger(false, Arrays.asList(1, 2, 3), Arrays.asList(0, 4), 0); - TopologyChanger onlyCrdIsAlive = new TopologyChanger(false, Arrays.asList(1, 2, 3), Collections.singletonList(0), 0); - - checkIgnore(onlyCrdIsAlive); - } - - /** - * @param topChanger topology changer. - * @throws Exception if failed. - */ - private void checkIgnore(TopologyChanger topChanger) throws Exception { topChanger.changeTopology(); for (Ignite ig : G.allGrids()) { - IgniteCache<Integer, Integer> cache = ig.cache(CACHE_NAME); + IgniteCache<Integer, Integer> cache = ig.cache(DEFAULT_CACHE_NAME); Collection<Integer> lost = cache.lostPartitions(); assertTrue("[grid=" + ig.name() + ", lost=" + lost.toString() + ']', lost.isEmpty()); - int parts = ig.affinity(CACHE_NAME).partitions(); + int parts = ig.affinity(DEFAULT_CACHE_NAME).partitions(); for (int i = 0; i < parts; i++) { cache.get(i); @@ -465,127 +270,109 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe private void checkLostPartition(boolean canWrite, boolean safe, TopologyChanger topChanger) throws Exception { assert partLossPlc != null; - List<Integer> lostParts = topChanger.changeTopology(); - - // Wait for all grids (servers and client) have same topology version - // to make sure that all nodes received map with lost partition. - boolean success = GridTestUtils.waitForCondition(() -> { - AffinityTopologyVersion last = null; - for (Ignite ig : G.allGrids()) { - AffinityTopologyVersion ver = ((IgniteEx)ig).context().cache().context().exchange().readyAffinityVersion(); - - if (last != null && !last.equals(ver)) - return false; - - last = ver; - } - - return true; - }, 10000); - - assertTrue("Failed to wait for new topology", success); + int part = topChanger.changeTopology().get(0); for (Ignite ig : G.allGrids()) { info("Checking node: " + ig.cluster().localNode().id()); - IgniteCache<Integer, Integer> cache = ig.cache(CACHE_NAME); + IgniteCache<Integer, Integer> cache = ig.cache(DEFAULT_CACHE_NAME); - verifyLostPartitions(ig, lostParts); - - verifyCacheOps(canWrite, safe, ig); - - validateQuery(safe, ig); + verifyCacheOps(canWrite, safe, part, ig); - // TODO withPartitionRecover doesn't work with BLT - https://issues.apache.org/jira/browse/IGNITE-10041. - if (!isPersistenceEnabled) { - // Check we can read and write to lost partition in recovery mode. - IgniteCache<Integer, Integer> recoverCache = cache.withPartitionRecover(); + // Check we can read and write to lost partition in recovery mode. + IgniteCache<Integer, Integer> recoverCache = cache.withPartitionRecover(); - for (int lostPart : recoverCache.lostPartitions()) { - recoverCache.get(lostPart); - recoverCache.put(lostPart, lostPart); - } - - // Check that writing in recover mode does not clear partition state. - verifyLostPartitions(ig, lostParts); + for (int lostPart : recoverCache.lostPartitions()) { + recoverCache.get(lostPart); + recoverCache.put(lostPart, lostPart); + } - verifyCacheOps(canWrite, safe, ig); + // Check that writing in recover mode does not clear partition state. + verifyCacheOps(canWrite, safe, part, ig); - validateQuery(safe, ig); - } + // Validate queries. + validateQuery(safe, ig); } - // Bring all nodes back. - for (int i : topChanger.killNodes) { - IgniteEx grd = startGrid(i); + checkNewNode(true, canWrite, safe, part); + checkNewNode(false, canWrite, safe, part); - info("Newly started node: " + grd.cluster().localNode().id()); + // Check that partition state does not change after we start a new node. + IgniteEx grd = startGrid(3); - // Check that partition state does not change after we start each node. - // TODO With persistence enabled LOST partitions become OWNING after a node joins back - https://issues.apache.org/jira/browse/IGNITE-10044. - if (!isPersistenceEnabled) { - for (Ignite ig : G.allGrids()) { - verifyCacheOps(canWrite, safe, ig); + info("Newly started node: " + grd.cluster().localNode().id()); - // TODO Query effectively waits for rebalance due to https://issues.apache.org/jira/browse/IGNITE-10057 - // TODO and after resetLostPartition there is another OWNING copy in the cluster due to https://issues.apache.org/jira/browse/IGNITE-10058. - // TODO Uncomment after https://issues.apache.org/jira/browse/IGNITE-10058 is fixed. -// validateQuery(safe, ig); - } - } - } + for (Ignite ig : G.allGrids()) + verifyCacheOps(canWrite, safe, part, ig); - ignite(4).resetLostPartitions(Collections.singletonList(CACHE_NAME)); + ignite(4).resetLostPartitions(Collections.singletonList(DEFAULT_CACHE_NAME)); awaitPartitionMapExchange(true, true, null); for (Ignite ig : G.allGrids()) { - IgniteCache<Integer, Integer> cache = ig.cache(CACHE_NAME); + IgniteCache<Integer, Integer> cache = ig.cache(DEFAULT_CACHE_NAME); assertTrue(cache.lostPartitions().isEmpty()); - int parts = ig.affinity(CACHE_NAME).partitions(); + int parts = ig.affinity(DEFAULT_CACHE_NAME).partitions(); for (int i = 0; i < parts; i++) { cache.get(i); cache.put(i, i); } - - for (int i = 0; i < parts; i++) { - checkQueryPasses(ig, false, i); - - if (shouldExecuteLocalQuery(ig, i)) - checkQueryPasses(ig, true, i); - - } - - checkQueryPasses(ig, false); } } /** - * @param node Node. - * @param lostParts Lost partition IDs. + * @param client Client flag. + * @param canWrite Can write flag. + * @param safe Safe flag. + * @param part List of lost partitions. + * @throws Exception If failed to start a new node. */ - private void verifyLostPartitions(Ignite node, List<Integer> lostParts) { - IgniteCache<Integer, Integer> cache = node.cache(CACHE_NAME); + private void checkNewNode( + boolean client, + boolean canWrite, + boolean safe, + int part + ) throws Exception { + this.client = client; + + try { + IgniteEx cl = startGrid("newNode"); + + CacheGroupContext grpCtx = cl.context().cache().cacheGroup(cacheId(DEFAULT_CACHE_NAME)); + + assertTrue(grpCtx.needsRecovery()); - Set<Integer> actualSortedLostParts = new TreeSet<>(cache.lostPartitions()); - Set<Integer> expSortedLostParts = new TreeSet<>(lostParts); + verifyCacheOps(canWrite, safe, part, cl); - assertEqualsCollections(expSortedLostParts, actualSortedLostParts); + validateQuery(safe, cl); + } + finally { + stopGrid("newNode", false); + + this.client = false; + } } /** + * * @param canWrite {@code True} if writes are allowed. * @param safe {@code True} if lost partition should trigger exception. + * @param part Lost partition ID. * @param ig Ignite instance. */ - private void verifyCacheOps(boolean canWrite, boolean safe, Ignite ig) { - IgniteCache<Integer, Integer> cache = ig.cache(CACHE_NAME); + private void verifyCacheOps(boolean canWrite, boolean safe, int part, Ignite ig) { + IgniteCache<Integer, Integer> cache = ig.cache(DEFAULT_CACHE_NAME); - int parts = ig.affinity(CACHE_NAME).partitions(); + Collection<Integer> lost = cache.lostPartitions(); + + assertTrue("Failed to find expected lost partition [exp=" + part + ", lost=" + lost + ']', + lost.contains(part)); + + int parts = ig.affinity(DEFAULT_CACHE_NAME).partitions(); // Check read. for (int i = 0; i < parts; i++) { @@ -632,8 +419,8 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe * @param nodes List of nodes to find partition. * @return List of partitions that aren't primary or backup for specified nodes. */ - private List<Integer> noPrimaryOrBackupPartition(List<Integer> nodes) { - Affinity<Object> aff = ignite(4).affinity(CACHE_NAME); + protected List<Integer> noPrimaryOrBackupPartition(List<Integer> nodes) { + Affinity<Object> aff = ignite(4).affinity(DEFAULT_CACHE_NAME); List<Integer> parts = new ArrayList<>(); @@ -657,6 +444,127 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe return parts; } + /** */ + private class TopologyChanger { + /** Flag to delay partition exchange */ + private boolean delayExchange; + + /** List of nodes to kill */ + private List<Integer> killNodes; + + /** List of nodes to be alive */ + private List<Integer> aliveNodes; + + /** Delay between node stops */ + private long stopDelay; + + /** + * @param delayExchange Flag for delay partition exchange. + * @param killNodes List of nodes to kill. + * @param aliveNodes List of nodes to be alive. + * @param stopDelay Delay between stopping nodes. + */ + public TopologyChanger( + boolean delayExchange, + List<Integer> killNodes, + List<Integer> aliveNodes, + long stopDelay + ) { + this.delayExchange = delayExchange; + this.killNodes = killNodes; + this.aliveNodes = aliveNodes; + this.stopDelay = stopDelay; + } + + /** + * @return Lost partition ID. + * @throws Exception If failed. + */ + protected List<Integer> changeTopology() throws Exception { + startGrids(4); + + Affinity<Object> aff = ignite(0).affinity(DEFAULT_CACHE_NAME); + + for (int i = 0; i < aff.partitions(); i++) + ignite(0).cache(DEFAULT_CACHE_NAME).put(i, i); + + client = true; + + startGrid(4); + + client = false; + + for (int i = 0; i < 5; i++) + info(">>> Node [idx=" + i + ", nodeId=" + ignite(i).cluster().localNode().id() + ']'); + + awaitPartitionMapExchange(); + + final List<Integer> parts = noPrimaryOrBackupPartition(aliveNodes); + + if (parts.isEmpty()) + throw new IllegalStateException("No partition on nodes: " + killNodes); + + final List<Map<Integer, Semaphore>> lostMap = new ArrayList<>(); + + for (int i : aliveNodes) { + HashMap<Integer, Semaphore> semaphoreMap = new HashMap<>(); + + for (Integer part : parts) + semaphoreMap.put(part, new Semaphore(0)); + + lostMap.add(semaphoreMap); + + grid(i).events().localListen(new P1<Event>() { + @Override public boolean apply(Event evt) { + assert evt.type() == EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST; + + CacheRebalancingEvent cacheEvt = (CacheRebalancingEvent)evt; + + if (F.eq(DEFAULT_CACHE_NAME, cacheEvt.cacheName())) { + if (semaphoreMap.containsKey(cacheEvt.partition())) + semaphoreMap.get(cacheEvt.partition()).release(); + } + + return true; + } + }, EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST); + } + + if (delayExchange) + delayPartExchange.set(true); + + ExecutorService executor = Executors.newFixedThreadPool(killNodes.size()); + + for (Integer node : killNodes) { + executor.submit(new Runnable() { + @Override public void run() { + grid(node).close(); + } + }); + + Thread.sleep(stopDelay); + } + + executor.shutdown(); + + delayPartExchange.set(false); + + Thread.sleep(5_000L); + + for (Map<Integer, Semaphore> map : lostMap) { + for (Map.Entry<Integer, Semaphore> entry : map.entrySet()) + assertTrue("Failed to wait for partition LOST event for partition:" + entry.getKey(), entry.getValue().tryAcquire(1)); + } + + for (Map<Integer, Semaphore> map : lostMap) { + for (Map.Entry<Integer, Semaphore> entry : map.entrySet()) + assertFalse("Partition LOST event raised twice for partition:" + entry.getKey(), entry.getValue().tryAcquire(1)); + } + + return parts; + } + } + /** * Validate query execution on a node. * @@ -665,7 +573,7 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe */ private void validateQuery(boolean safe, Ignite node) { // Get node lost and remaining partitions. - IgniteCache<?, ?> cache = node.cache(CACHE_NAME); + IgniteCache<?, ?> cache = node.cache(DEFAULT_CACHE_NAME); Collection<Integer> lostParts = cache.lostPartitions(); @@ -673,7 +581,7 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe Integer remainingPart = null; - for (int i = 0; i < node.affinity(CACHE_NAME).partitions(); i++) { + for (int i = 0; i < node.affinity(DEFAULT_CACHE_NAME).partitions(); i++) { if (lostParts.contains(i)) continue; @@ -730,7 +638,7 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe int numOfPrimaryParts = 0; - for (int nodePrimaryPart : node.affinity(CACHE_NAME).primaryPartitions(node.cluster().localNode())) { + for (int nodePrimaryPart : node.affinity(DEFAULT_CACHE_NAME).primaryPartitions(node.cluster().localNode())) { for (int part : parts) { if (part == nodePrimaryPart) numOfPrimaryParts++; @@ -754,7 +662,7 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe if (loc) return; - IgniteCache cache = node.cache(CACHE_NAME); + IgniteCache cache = node.cache(DEFAULT_CACHE_NAME); ScanQuery qry = new ScanQuery(); @@ -777,124 +685,4 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe // TODO Need to add an actual check after https://issues.apache.org/jira/browse/IGNITE-9902 is fixed. // No-op. } - - /** */ - private class TopologyChanger { - /** Flag to delay partition exchange */ - private boolean delayExchange; - - /** List of nodes to kill */ - private List<Integer> killNodes; - - /** List of nodes to be alive */ - private List<Integer> aliveNodes; - - /** Delay between node stops */ - private long stopDelay; - - /** - * @param delayExchange Flag for delay partition exchange. - * @param killNodes List of nodes to kill. - * @param aliveNodes List of nodes to be alive. - * @param stopDelay Delay between stopping nodes. - */ - private TopologyChanger(boolean delayExchange, List<Integer> killNodes, List<Integer> aliveNodes, - long stopDelay) { - this.delayExchange = delayExchange; - this.killNodes = killNodes; - this.aliveNodes = aliveNodes; - this.stopDelay = stopDelay; - } - - /** - * @return Lost partition ID. - * @throws Exception If failed. - */ - private List<Integer> changeTopology() throws Exception { - startGrids(4); - - if (isPersistenceEnabled) - grid(0).cluster().active(true); - - Affinity<Object> aff = ignite(0).affinity(CACHE_NAME); - - for (int i = 0; i < aff.partitions(); i++) - ignite(0).cache(CACHE_NAME).put(i, i); - - client = true; - - startGrid(4); - - client = false; - - for (int i = 0; i < 5; i++) - info(">>> Node [idx=" + i + ", nodeId=" + ignite(i).cluster().localNode().id() + ']'); - - awaitPartitionMapExchange(); - - final List<Integer> parts = noPrimaryOrBackupPartition(aliveNodes); - - if (parts.isEmpty()) - throw new IllegalStateException("No partition on nodes: " + killNodes); - - final List<Map<Integer, Semaphore>> lostMap = new ArrayList<>(); - - for (int i : aliveNodes) { - HashMap<Integer, Semaphore> semaphoreMap = new HashMap<>(); - - for (Integer part : parts) - semaphoreMap.put(part, new Semaphore(0)); - - lostMap.add(semaphoreMap); - - grid(i).events().localListen(new P1<Event>() { - @Override public boolean apply(Event evt) { - assert evt.type() == EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST; - - CacheRebalancingEvent cacheEvt = (CacheRebalancingEvent)evt; - - if (F.eq(CACHE_NAME, cacheEvt.cacheName())) { - if (semaphoreMap.containsKey(cacheEvt.partition())) - semaphoreMap.get(cacheEvt.partition()).release(); - } - - return true; - } - }, EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST); - } - - if (delayExchange) - delayPartExchange.set(true); - - ExecutorService executor = Executors.newFixedThreadPool(killNodes.size()); - - for (Integer node : killNodes) { - executor.submit(new Runnable() { - @Override public void run() { - grid(node).close(); - } - }); - - Thread.sleep(stopDelay); - } - - executor.shutdown(); - - delayPartExchange.set(false); - - Thread.sleep(5_000L); - - for (Map<Integer, Semaphore> map : lostMap) { - for (Map.Entry<Integer, Semaphore> entry : map.entrySet()) - assertTrue("Failed to wait for partition LOST event for partition:" + entry.getKey(), entry.getValue().tryAcquire(1)); - } - - for (Map<Integer, Semaphore> map : lostMap) { - for (Map.Entry<Integer, Semaphore> entry : map.entrySet()) - assertFalse("Partition LOST event raised twice for partition:" + entry.getKey(), entry.getValue().tryAcquire(1)); - } - - return parts; - } - } } http://git-wip-us.apache.org/repos/asf/ignite/blob/e4760980/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IndexingCachePartitionLossPolicySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IndexingCachePartitionLossPolicySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IndexingCachePartitionLossPolicySelfTest.java index a31a1c6..7007499 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IndexingCachePartitionLossPolicySelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IndexingCachePartitionLossPolicySelfTest.java @@ -69,7 +69,7 @@ public class IndexingCachePartitionLossPolicySelfTest extends IgniteCachePartiti * @param loc Local flag. */ private static void executeQuery(Ignite node, boolean loc, int... parts) { - IgniteCache cache = node.cache(CACHE_NAME); + IgniteCache cache = node.cache(DEFAULT_CACHE_NAME); SqlFieldsQuery qry = new SqlFieldsQuery("SELECT * FROM Integer");