Repository: ignite Updated Branches: refs/heads/ignite-2.5 ba4e33706 -> 25b25f271
IGNITE-8122 Restore partition state from WAL if no checkpoints are done - Fixes #3745. Signed-off-by: Alexey Goncharuk <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/25b25f27 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/25b25f27 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/25b25f27 Branch: refs/heads/ignite-2.5 Commit: 25b25f271bc89d77013e1cda2bad30d941e1c8ad Parents: ba4e337 Author: Pavel Kovalenko <[email protected]> Authored: Wed Apr 18 15:57:45 2018 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Wed Apr 18 16:04:18 2018 +0300 ---------------------------------------------------------------------- .../affinity/GridAffinityAssignment.java | 4 +- .../processors/cache/ExchangeActions.java | 2 +- .../dht/GridDhtPartitionTopology.java | 3 +- .../dht/GridDhtPartitionTopologyImpl.java | 220 +++++++++---------- .../dht/GridDhtPartitionsStateValidator.java | 61 ++++- .../GridDhtPartitionsExchangeFuture.java | 6 +- .../GridDhtPartitionsSingleMessage.java | 18 +- .../dht/preloader/GridDhtPreloader.java | 4 +- .../GridCacheDatabaseSharedManager.java | 89 +++----- .../distributed/CacheBaselineTopologyTest.java | 16 +- ...CacheLoadingConcurrentGridStartSelfTest.java | 3 + .../GridCachePartitionsStateValidationTest.java | 4 + ...idCachePartitionsStateValidatorSelfTest.java | 47 ++-- .../testsuites/IgniteStandByClusterSuite.java | 2 +- .../IgniteCacheQueryNodeRestartSelfTest.java | 2 + ...ngBaselineCacheQueryNodeRestartSelfTest.java | 4 +- 16 files changed, 258 insertions(+), 227 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/25b25f27/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java index 6da6aaa..cbec1a1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java @@ -149,7 +149,7 @@ public class GridAffinityAssignment implements AffinityAssignment, Serializable * @param part Partition. * @return Affinity nodes. */ - public List<ClusterNode> get(int part) { + @Override public List<ClusterNode> get(int part) { assert part >= 0 && part < assignment.size() : "Affinity partition is out of range" + " [part=" + part + ", partitions=" + assignment.size() + ']'; @@ -162,7 +162,7 @@ public class GridAffinityAssignment implements AffinityAssignment, Serializable * @param part Partition. * @return Affinity nodes IDs. */ - public HashSet<UUID> getIds(int part) { + @Override public HashSet<UUID> getIds(int part) { assert part >= 0 && part < assignment.size() : "Affinity partition is out of range" + " [part=" + part + ", partitions=" + assignment.size() + ']'; http://git-wip-us.apache.org/repos/asf/ignite/blob/25b25f27/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java index bcf3f40..c289b6e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java @@ -409,7 +409,7 @@ public class ExchangeActions { /** * */ - static class CacheGroupActionData { + public static class CacheGroupActionData { /** */ private final CacheGroupDescriptor desc; http://git-wip-us.apache.org/repos/asf/ignite/blob/25b25f27/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index 6f68dbb..d586a94d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -386,8 +386,9 @@ public interface GridDhtPartitionTopology { * State of all current owners that aren't contained in the set will be reset to MOVING. * * @param p Partition ID. - * @param updateSeq If should increment sequence when updated. * @param owners Set of new owners. + * @param haveHistory {@code True} if there is WAL history to rebalance given partition. + * @param updateSeq If should increment sequence when updated. * @return Set of node IDs that should reload partitions. */ public Set<UUID> setOwners(int p, Set<UUID> owners, boolean haveHistory, boolean updateSeq); http://git-wip-us.apache.org/repos/asf/ignite/blob/25b25f27/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 740903e..164f0bf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; -import java.nio.file.Files; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -31,8 +30,6 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReferenceArray; -import java.util.stream.Collectors; -import java.util.stream.IntStream; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; @@ -43,7 +40,6 @@ import org.apache.ignite.events.EventType; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.managers.discovery.DiscoCache; -import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager; import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheGroupContext; @@ -55,8 +51,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; -import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; -import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager; import org.apache.ignite.internal.util.F0; import org.apache.ignite.internal.util.GridAtomicLong; import org.apache.ignite.internal.util.GridPartitionStateMap; @@ -324,7 +318,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { long updateSeq = this.updateSeq.incrementAndGet(); - needRefresh = initPartitions0(affVer, exchFut, updateSeq); + needRefresh = initPartitions(affVer, grp.affinity().readyAssignments(affVer), exchFut, updateSeq); consistencyCheck(); } @@ -340,14 +334,15 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** + * Creates and initializes partitions using given {@code affVer} and {@code affAssignment}. + * * @param affVer Affinity version to use. + * @param affAssignment Affinity assignment to use. * @param exchFut Exchange future. * @param updateSeq Update sequence. * @return {@code True} if partitions must be refreshed. */ - private boolean initPartitions0(AffinityTopologyVersion affVer, GridDhtPartitionsExchangeFuture exchFut, long updateSeq) { - List<List<ClusterNode>> aff = grp.affinity().readyAssignments(affVer); - + private boolean initPartitions(AffinityTopologyVersion affVer, List<List<ClusterNode>> affAssignment, GridDhtPartitionsExchangeFuture exchFut, long updateSeq) { boolean needRefresh = false; if (grp.affinityNode()) { @@ -357,32 +352,24 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { GridDhtPartitionExchangeId exchId = exchFut.exchangeId(); - assert grp.affinity().lastVersion().equals(affVer) : - "Invalid affinity [topVer=" + grp.affinity().lastVersion() + - ", grp=" + grp.cacheOrGroupName() + - ", affVer=" + affVer + - ", fut=" + exchFut + ']'; - int num = grp.affinity().partitions(); if (grp.rebalanceEnabled()) { boolean added = exchFut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom()); - boolean first = added || (loc.equals(oldest) && loc.id().equals(exchId.nodeId()) && exchId.isJoined()); + boolean first = added || (loc.equals(oldest) && loc.id().equals(exchId.nodeId()) && exchId.isJoined()) || exchFut.activateCluster(); if (first) { - assert exchId.isJoined() || added; + assert exchId.isJoined() || added || exchFut.activateCluster(); for (int p = 0; p < num; p++) { - if (localNode(p, aff) || initLocalPartition(p, discoCache)) { - GridDhtLocalPartition locPart = createPartition(p); + if (localNode(p, affAssignment)) { + // Partition is created first time, so it's safe to own it. + boolean shouldOwn = locParts.get(p) == null; - if (grp.persistenceEnabled()) { - GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)grp.shared().database(); + GridDhtLocalPartition locPart = getOrCreatePartition(p); - locPart.restoreState(db.readPartitionState(grp, locPart.id())); - } - else { + if (shouldOwn) { boolean owned = locPart.own(); assert owned : "Failed to own partition for oldest node [grp=" + grp.cacheOrGroupName() + @@ -390,7 +377,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (log.isDebugEnabled()) log.debug("Owned partition for oldest node [grp=" + grp.cacheOrGroupName() + - ", part=" + locPart + ']'); + ", part=" + locPart + ']'); } needRefresh = true; @@ -400,15 +387,15 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } } else - createPartitions(affVer, aff, updateSeq); + createPartitions(affVer, affAssignment, updateSeq); } else { // If preloader is disabled, then we simply clear out // the partitions this node is not responsible for. for (int p = 0; p < num; p++) { - GridDhtLocalPartition locPart = localPartition0(p, affVer, false, true, false); + GridDhtLocalPartition locPart = localPartition0(p, affVer, false, true); - boolean belongs = localNode(p, aff); + boolean belongs = localNode(p, affAssignment); if (locPart != null) { if (!belongs) { @@ -429,7 +416,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { locPart.own(); } else if (belongs) { - locPart = createPartition(p); + locPart = getOrCreatePartition(p); locPart.own(); @@ -439,27 +426,14 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } } - updateRebalanceVersion(aff); + updateRebalanceVersion(affAssignment); return needRefresh; } /** - * @param p Partition ID to restore. - * @param discoCache Disco cache to use. - * @return {@code True} if should restore local partition. - */ - private boolean initLocalPartition(int p, DiscoCache discoCache) { - IgnitePageStoreManager storeMgr = ctx.pageStore(); - - return - grp.persistenceEnabled() && - storeMgr instanceof FilePageStoreManager && - discoCache.baselineNode(ctx.localNodeId()) && - Files.exists(((FilePageStoreManager)storeMgr).getPath(grp.sharedGroup(), grp.cacheOrGroupName(), p)); - } - - /** + * Creates non-existing partitions belong to given affinity {@code aff}. + * * @param affVer Affinity version. * @param aff Affinity assignments. * @param updateSeq Update sequence. @@ -475,7 +449,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (localNode(p, aff)) { // This will make sure that all non-existing partitions // will be created in MOVING state. - GridDhtLocalPartition locPart = createPartition(p); + GridDhtLocalPartition locPart = getOrCreatePartition(p); updateSeq = updateLocal(p, locPart.state(), updateSeq, affVer); } @@ -483,7 +457,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { // If this node's map is empty, we pre-create local partitions, // so local map will be sent correctly during exchange. else if (localNode(p, aff)) - createPartition(p); + getOrCreatePartition(p); } } @@ -588,18 +562,29 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (grpStarted || exchFut.firstEvent().type() == EVT_DISCOVERY_CUSTOM_EVT || exchFut.serverNodeDiscoveryEvent()) { + + AffinityTopologyVersion affVer; + List<List<ClusterNode>> affAssignment; + if (affReady) { - assert grp.affinity().lastVersion().equals(evts.topologyVersion()); + affVer = evts.topologyVersion(); - initPartitions0(evts.topologyVersion(), exchFut, updateSeq); + assert grp.affinity().lastVersion().equals(affVer) : + "Invalid affinity [topVer=" + grp.affinity().lastVersion() + + ", grp=" + grp.cacheOrGroupName() + + ", affVer=" + affVer + + ", fut=" + exchFut + ']'; + + affAssignment = grp.affinity().readyAssignments(affVer); } else { assert !exchFut.context().mergeExchanges(); - List<List<ClusterNode>> aff = grp.affinity().idealAssignment(); - - createPartitions(exchFut.initialVersion(), aff, updateSeq); + affVer = exchFut.initialVersion(); + affAssignment = grp.affinity().idealAssignment(); } + + initPartitions(affVer, affAssignment, exchFut, updateSeq); } } @@ -695,19 +680,12 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { long updateSeq = this.updateSeq.incrementAndGet(); for (int p = 0; p < num; p++) { - GridDhtLocalPartition locPart = localPartition0(p, topVer, false, false, false); + GridDhtLocalPartition locPart = localPartition0(p, topVer, false, true); if (partitionLocalNode(p, topVer)) { - // This partition will be created during next topology event, - // which obviously has not happened at this point. - if (locPart == null) { - if (log.isDebugEnabled()) { - log.debug("Skipping local partition afterExchange (will not create) [" + - "grp=" + grp.cacheOrGroupName() + ", p=" + p + ']'); - } - - continue; - } + // Prepare partition to rebalance if it's not happened on full map update phase. + if (locPart == null || locPart.state() == RENTING || locPart.state() == EVICTED) + locPart = rebalancePartition(p, false); GridDhtPartitionState state = locPart.state(); @@ -793,20 +771,23 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { @Nullable @Override public GridDhtLocalPartition localPartition(int p, AffinityTopologyVersion topVer, boolean create) throws GridDhtInvalidPartitionException { - return localPartition0(p, topVer, create, false, true); + return localPartition0(p, topVer, create, false); } /** {@inheritDoc} */ @Nullable @Override public GridDhtLocalPartition localPartition(int p, AffinityTopologyVersion topVer, boolean create, boolean showRenting) throws GridDhtInvalidPartitionException { - return localPartition0(p, topVer, create, showRenting, true); + return localPartition0(p, topVer, create, showRenting); } /** + * Creates partition with id {@code p} if it doesn't exist or evicted. + * In other case returns existing partition. + * * @param p Partition number. * @return Partition. */ - private GridDhtLocalPartition createPartition(int p) { + private GridDhtLocalPartition getOrCreatePartition(int p) { assert lock.isWriteLockedByCurrentThread(); assert ctx.database().checkpointLockIsHeldByThread(); @@ -865,16 +846,15 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** * @param p Partition number. * @param topVer Topology version. - * @param create Create flag. - * @param updateSeq Update sequence. + * @param create If {@code true} create partition if it doesn't exists or evicted. + * @param showRenting If {@code true} return partition in RENTING state if exists. * @return Local partition. */ @SuppressWarnings("TooBroadScope") private GridDhtLocalPartition localPartition0(int p, AffinityTopologyVersion topVer, boolean create, - boolean showRenting, - boolean updateSeq) { + boolean showRenting) { GridDhtLocalPartition loc; loc = locParts.get(p); @@ -1345,6 +1325,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (incomeCntrMap != null) { // update local counters in partitions for (int i = 0; i < locParts.length(); i++) { + cntrMap.updateCounter(i, incomeCntrMap.updateCounter(i)); + GridDhtLocalPartition part = locParts.get(i); if (part == null) @@ -1511,51 +1493,11 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } } else if (state == MOVING) { - GridDhtLocalPartition locPart = locParts.get(p); - - if (!partsToReload.contains(p)) { - if (locPart == null || locPart.state() == EVICTED) - locPart = createPartition(p); - - if (locPart.state() == OWNING) { - locPart.moving(); + boolean haveHistory = !partsToReload.contains(p); - changed = true; - } - } - else { - if (locPart == null || locPart.state() == EVICTED) { - createPartition(p); + rebalancePartition(p, haveHistory); - changed = true; - } - else if (locPart.state() == OWNING || locPart.state() == MOVING) { - if (locPart.state() == OWNING) - locPart.moving(); - locPart.clearAsync(); - - changed = true; - } - else if (locPart.state() == RENTING) { - // Try to prevent partition eviction. - if (locPart.reserve()) { - try { - locPart.moving(); - locPart.clearAsync(); - } finally { - locPart.release(); - } - } - // In other case just recreate it. - else { - assert locPart.state() == EVICTED; - - createPartition(p); - } - - changed = true; - } - } + changed = true; } } } @@ -2113,7 +2055,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** {@inheritDoc} */ - @Override public Set<UUID> setOwners(int p, Set<UUID> owners, boolean haveHistory, boolean updateSeq) { + @Override public Set<UUID> setOwners(int p, Set<UUID> ownersByUpdCounters, boolean haveHistory, boolean updateSeq) { Set<UUID> result = haveHistory ? Collections.<UUID>emptySet() : new HashSet<UUID>(); ctx.database().checkpointReadLock(); @@ -2125,17 +2067,15 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { GridDhtLocalPartition locPart = locParts.get(p); if (locPart != null) { - if (locPart.state() == OWNING && !owners.contains(ctx.localNodeId())) { - locPart.moving(); + if (locPart.state() == OWNING && !ownersByUpdCounters.contains(ctx.localNodeId())) { + rebalancePartition(p, haveHistory); - if (!haveHistory) { - locPart.clearAsync(); + if (!haveHistory) result.add(ctx.localNodeId()); - } U.warn(log, "Partition has been scheduled for rebalancing due to outdated update counter " + "[nodeId=" + ctx.localNodeId() + ", grp=" + grp.cacheOrGroupName() + - ", partId=" + locPart.id() + ", haveHistory=" + haveHistory + "]"); + ", partId=" + p + ", haveHistory=" + haveHistory + "]"); } } @@ -2146,7 +2086,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (!partMap.containsKey(p)) continue; - if (partMap.get(p) == OWNING && !owners.contains(remoteNodeId)) { + if (partMap.get(p) == OWNING && !ownersByUpdCounters.contains(remoteNodeId)) { partMap.put(p, MOVING); if (!haveHistory) @@ -2180,6 +2120,42 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** + * Prepares given partition {@code p} for rebalance. + * Changes partition state to MOVING and starts clearing if needed. + * Prevents ongoing renting if required. + * + * @param p Partition id. + * @param haveHistory If {@code true} there is WAL history to rebalance partition, + * in other case partition will be cleared for full rebalance. + */ + private GridDhtLocalPartition rebalancePartition(int p, boolean haveHistory) { + GridDhtLocalPartition part = getOrCreatePartition(p); + + // Prevent renting. + if (part.state() == RENTING) { + if (part.reserve()) { + part.moving(); + part.release(); + } + else { + assert part.state() == EVICTED : part; + + part = getOrCreatePartition(p); + } + } + + if (part.state() != MOVING) + part.moving(); + + if (!haveHistory) + part.clearAsync(); + + assert part.state() == MOVING : part; + + return part; + } + + /** * Finds local partitions which don't belong to affinity and runs eviction process for such partitions. * * @param updateSeq Update sequence. http://git-wip-us.apache.org/repos/asf/ignite/blob/25b25f27/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsStateValidator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsStateValidator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsStateValidator.java index 92a0584..cc0542c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsStateValidator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsStateValidator.java @@ -35,6 +35,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.SB; import org.apache.ignite.lang.IgniteProductVersion; +import org.jetbrains.annotations.Nullable; import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; @@ -71,9 +72,9 @@ public class GridDhtPartitionsStateValidator { public void validatePartitionCountersAndSizes(GridDhtPartitionsExchangeFuture fut, GridDhtPartitionTopology top, Map<UUID, GridDhtPartitionsSingleMessage> messages) throws IgniteCheckedException { - // Ignore just joined nodes. final Set<UUID> ignoringNodes = new HashSet<>(); + // Ignore just joined nodes. for (DiscoveryEvent evt : fut.events().events()) if (evt.type() == EVT_NODE_JOINED) ignoringNodes.add(evt.eventNode().id()); @@ -99,6 +100,46 @@ public class GridDhtPartitionsStateValidator { } /** + * Checks what partitions from given {@code singleMsg} message should be excluded from validation. + * + * @param top Topology to validate. + * @param nodeId Node which sent single message. + * @param singleMsg Single message. + * @return Set of partition ids should be excluded from validation. + */ + @Nullable private Set<Integer> shouldIgnore(GridDhtPartitionTopology top, UUID nodeId, GridDhtPartitionsSingleMessage singleMsg) { + CachePartitionPartialCountersMap countersMap = singleMsg.partitionUpdateCounters(top.groupId(), top.partitions()); + Map<Integer, Long> sizesMap = singleMsg.partitionSizes(top.groupId()); + + Set<Integer> ignore = null; + + for (int p = 0; p < top.partitions(); p++) { + if (top.partitionState(nodeId, p) != GridDhtPartitionState.OWNING) { + if (ignore == null) + ignore = new HashSet<>(); + + ignore.add(p); + + continue; + } + + int partIdx = countersMap.partitionIndex(p); + long updateCounter = partIdx >= 0 ? countersMap.updateCounterAt(partIdx) : 0; + long size = sizesMap.containsKey(p) ? sizesMap.get(p) : 0; + + // Do not validate partitions with zero update counter and size. + if (updateCounter == 0 && size == 0) { + if (ignore == null) + ignore = new HashSet<>(); + + ignore.add(p); + } + } + + return ignore; + } + + /** * Validate partitions update counters for given {@code top}. * * @param top Topology to validate. @@ -117,7 +158,10 @@ public class GridDhtPartitionsStateValidator { // Populate counters statistics from local node partitions. for (GridDhtLocalPartition part : top.currentLocalPartitions()) { - if (top.partitionState(cctx.localNodeId(), part.id()) != GridDhtPartitionState.OWNING) + if (part.state() != GridDhtPartitionState.OWNING) + continue; + + if (part.updateCounter() == 0 && part.fullSize() == 0) continue; updateCountersAndNodesByPartitions.put(part.id(), new T2<>(cctx.localNodeId(), part.updateCounter())); @@ -133,8 +177,10 @@ public class GridDhtPartitionsStateValidator { CachePartitionPartialCountersMap countersMap = e.getValue().partitionUpdateCounters(top.groupId(), partitions); + Set<Integer> ignorePartitions = shouldIgnore(top, nodeId, e.getValue()); + for (int part = 0; part < partitions; part++) { - if (top.partitionState(nodeId, part) != GridDhtPartitionState.OWNING) + if (ignorePartitions != null && ignorePartitions.contains(part)) continue; int partIdx = countersMap.partitionIndex(part); @@ -166,7 +212,10 @@ public class GridDhtPartitionsStateValidator { // Populate sizes statistics from local node partitions. for (GridDhtLocalPartition part : top.currentLocalPartitions()) { - if (top.partitionState(cctx.localNodeId(), part.id()) != GridDhtPartitionState.OWNING) + if (part.state() != GridDhtPartitionState.OWNING) + continue; + + if (part.updateCounter() == 0 && part.fullSize() == 0) continue; sizesAndNodesByPartitions.put(part.id(), new T2<>(cctx.localNodeId(), part.fullSize())); @@ -182,8 +231,10 @@ public class GridDhtPartitionsStateValidator { Map<Integer, Long> sizesMap = e.getValue().partitionSizes(top.groupId()); + Set<Integer> ignorePartitions = shouldIgnore(top, nodeId, e.getValue()); + for (int part = 0; part < partitions; part++) { - if (top.partitionState(nodeId, part) != GridDhtPartitionState.OWNING) + if (ignorePartitions != null && ignorePartitions.contains(part)) continue; long currentSize = sizesMap.containsKey(part) ? sizesMap.get(part) : 0L; http://git-wip-us.apache.org/repos/asf/ignite/blob/25b25f27/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index af5acd6..0d57d48 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -58,6 +58,7 @@ import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.pagemem.wal.record.ExchangeRecord; +import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage; @@ -2244,6 +2245,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } /** + * Collects and determines new owners of partitions for all nodes for given {@code top}. + * * @param top Topology to assign. */ private void assignPartitionStates(GridDhtPartitionTopology top) { @@ -2585,6 +2588,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte detectLostPartitions(resTopVer); } + // Recalculate new affinity based on partitions availability. if (!exchCtx.mergeExchanges() && forceAffReassignment) idealAffDiff = cctx.affinity().onCustomEventWithEnforcedAffinityReassignment(this); @@ -2768,7 +2772,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte validator.validatePartitionCountersAndSizes(this, top, msgs); } catch (IgniteCheckedException ex) { - log.warning("Partition states validation was failed for cache " + grpDesc.cacheOrGroupName(), ex); + log.warning("Partition states validation has failed for group: " + grpDesc.cacheOrGroupName() + ". " + ex.getMessage()); // TODO: Handle such errors https://issues.apache.org/jira/browse/IGNITE-7833 } } http://git-wip-us.apache.org/repos/asf/ignite/blob/25b25f27/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java index 6ebafac..b60070e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java @@ -73,7 +73,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes private Map<Integer, Map<Integer, Long>> partSizes; /** Serialized partitions counters. */ - private byte[] partSizesBytes; + private byte[] partsSizesBytes; /** Partitions history reservation counters. */ @GridToStringInclude @@ -324,7 +324,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes boolean marshal = (parts != null && partsBytes == null) || (partCntrs != null && partCntrsBytes == null) || (partHistCntrs != null && partHistCntrsBytes == null) || - (partSizes != null && partSizesBytes == null) || + (partSizes != null && partsSizesBytes == null) || (err != null && errBytes == null); if (marshal) { @@ -343,7 +343,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes if (partHistCntrs != null && partHistCntrsBytes == null) partHistCntrsBytes0 = U.marshal(ctx, partHistCntrs); - if (partSizes != null && partSizesBytes == null) + if (partSizes != null && partsSizesBytes == null) partSizesBytes0 = U.marshal(ctx, partSizes); if (err != null && errBytes == null) @@ -375,7 +375,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes partsBytes = partsBytes0; partCntrsBytes = partCntrsBytes0; partHistCntrsBytes = partHistCntrsBytes0; - partSizesBytes = partSizesBytes0; + partsSizesBytes = partSizesBytes0; errBytes = errBytes0; } } @@ -405,11 +405,11 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes partHistCntrs = U.unmarshal(ctx, partHistCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); } - if (partSizesBytes != null && partSizes == null) { + if (partsSizesBytes != null && partSizes == null) { if (compressed()) - partSizes = U.unmarshalZip(ctx.marshaller(), partSizesBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + partSizes = U.unmarshalZip(ctx.marshaller(), partsSizesBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); else - partSizes = U.unmarshal(ctx, partSizesBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + partSizes = U.unmarshal(ctx, partsSizesBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); } if (errBytes != null && err == null) { @@ -504,7 +504,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes writer.incrementState(); case 13: - if (!writer.writeByteArray("partsSizesBytes", partSizesBytes)) + if (!writer.writeByteArray("partsSizesBytes", partsSizesBytes)) return false; writer.incrementState(); @@ -589,7 +589,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes reader.incrementState(); case 13: - partSizesBytes = reader.readByteArray("partsSizesBytes"); + partsSizesBytes = reader.readByteArray("partsSizesBytes"); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/ignite/blob/25b25f27/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index 6ec6ad3..ddcb81e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -202,7 +202,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { // If partition belongs to local node. if (aff.get(p).contains(ctx.localNode())) { - GridDhtLocalPartition part = top.localPartition(p, topVer, true, true); + GridDhtLocalPartition part = top.localPartition(p); assert part != null; assert part.id() == p; @@ -226,7 +226,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { part = top.localPartition(p, topVer, true); } - assert part != null && part.state() == MOVING : "Partition has invalid state for rebalance " + part; + assert part.state() == MOVING : "Partition has invalid state for rebalance " + aff.topologyVersion() + " " + part; ClusterNode histSupplier = null; http://git-wip-us.apache.org/repos/asf/ignite/blob/25b25f27/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index 16d3292..3009feb 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -59,6 +59,7 @@ import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; import javax.management.ObjectName; import org.apache.ignite.DataStorageMetrics; import org.apache.ignite.IgniteCheckedException; @@ -1151,9 +1152,18 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan boolean clusterInTransitionStateToActive = fut.activateCluster(); - // Before local node join event. - if (clusterInTransitionStateToActive || (joinEvt && locNode && isSrvNode)) + // In case of cluster activation or local join restore, restore whole manager state. + if (clusterInTransitionStateToActive || (joinEvt && locNode && isSrvNode)) { restoreState(); + } + // In case of starting groups, restore partition states only for these groups. + else if (fut.exchangeActions() != null && !F.isEmpty(fut.exchangeActions().cacheGroupsToStart())) { + Set<Integer> restoreGroups = fut.exchangeActions().cacheGroupsToStart().stream() + .map(actionData -> actionData.descriptor().groupId()) + .collect(Collectors.toSet()); + + restorePartitionStates(Collections.emptyMap(), restoreGroups); + } if (cctx.kernalContext().query().moduleEnabled()) { ExchangeActions acts = fut.exchangeActions(); @@ -1393,6 +1403,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } /** + * Restores from last checkpoint and applies WAL changes since this checkpoint. + * * @throws IgniteCheckedException If failed to restore database status from WAL. */ private void restoreState() throws IgniteCheckedException { @@ -2154,7 +2166,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan checkpointReadLock(); try { - restorePartitionState(partStates, Collections.emptySet()); + restorePartitionStates(partStates, null); } finally { checkpointReadUnlock(); @@ -2264,7 +2276,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } if (!metastoreOnly) - restorePartitionState(partStates, ignoreGrps); + restorePartitionStates(partStates, null); } finally { if (!metastoreOnly) @@ -2277,15 +2289,17 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } /** - * @param partStates Partition states. - * @throws IgniteCheckedException If failed to restore. + * Initializes not empty partitions and restores their state from page memory or WAL. + * Partition states presented in page memory may be overriden by states restored from WAL {@code partStates}. + * + * @param partStates Partition states restored from WAL. + * @param onlyForGroups If not {@code null} restore states only for specified cache groups. + * @throws IgniteCheckedException If failed to restore partition states. */ - private void restorePartitionState( - Map<T2<Integer, Integer>, T2<Integer, Long>> partStates, - Collection<Integer> ignoreGrps - ) throws IgniteCheckedException { + private void restorePartitionStates(Map<T2<Integer, Integer>, T2<Integer, Long>> partStates, + @Nullable Set<Integer> onlyForGroups) throws IgniteCheckedException { for (CacheGroupContext grp : cctx.cache().cacheGroups()) { - if (grp.isLocal() || !grp.affinityNode() || ignoreGrps.contains(grp.groupId())) { + if (grp.isLocal() || !grp.affinityNode()) { // Local cache has no partitions and its states. continue; } @@ -2293,6 +2307,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan if (!grp.dataRegion().config().isPersistenceEnabled()) continue; + if (onlyForGroups != null && !onlyForGroups.contains(grp.groupId())) + continue; + int grpId = grp.groupId(); PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory(); @@ -2376,56 +2393,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } /** - * @param grpCtx Group context. - * @param partId Partition ID. - * @return Partition state. - */ - public GridDhtPartitionState readPartitionState(CacheGroupContext grpCtx, int partId) { - int grpId = grpCtx.groupId(); - PageMemoryEx pageMem = (PageMemoryEx)grpCtx.dataRegion().pageMemory(); - - try { - if (storeMgr.exists(grpId, partId)) { - storeMgr.ensure(grpId, partId); - - if (storeMgr.pages(grpId, partId) > 1) { - long partMetaId = pageMem.partitionMetaPageId(grpId, partId); - long partMetaPage = pageMem.acquirePage(grpId, partMetaId); - - try { - long pageAddr = pageMem.readLock(grpId, partMetaId, partMetaPage); - - try { - if (PageIO.getType(pageAddr) == PageIO.T_PART_META) { - PagePartitionMetaIO io = PagePartitionMetaIO.VERSIONS.forPage(pageAddr); - - GridDhtPartitionState state = GridDhtPartitionState.fromOrdinal((int)io.getPartitionState(pageAddr)); - - if (state == null) - state = GridDhtPartitionState.MOVING; - - return state; - } - } - finally { - pageMem.readUnlock(grpId, partMetaId, partMetaPage); - } - } - finally { - pageMem.releasePage(grpId, partMetaId, partMetaPage); - } - } - } - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to read partition state (will default to MOVING) [grp=" + grpCtx + - ", partId=" + partId + "]", e); - } - - return GridDhtPartitionState.MOVING; - } - - /** * Wal truncate callBack. * * @param highBound WALPointer. http://git-wip-us.apache.org/repos/asf/ignite/blob/25b25f27/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java index 0d59a2d..c3d404b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java @@ -83,6 +83,9 @@ public class CacheBaselineTopologyTest extends GridCommonAbstractTest { private boolean delayRebalance; /** */ + private boolean disableAutoActivation; + + /** */ private Map<String, Object> userAttrs; /** */ @@ -107,6 +110,8 @@ public class CacheBaselineTopologyTest extends GridCommonAbstractTest { cleanPersistenceDir(); client = false; + + disableAutoActivation = false; } /** {@inheritDoc} */ @@ -120,6 +125,9 @@ public class CacheBaselineTopologyTest extends GridCommonAbstractTest { cfg.setConsistentId(igniteInstanceName); + if (disableAutoActivation) + cfg.setAutoActivationEnabled(false); + cfg.setDataStorageConfiguration( new DataStorageConfiguration().setDefaultDataRegionConfiguration( new DataRegionConfiguration() @@ -884,11 +892,15 @@ public class CacheBaselineTopologyTest extends GridCommonAbstractTest { delayRebalance = true; + /* There is a problem with handling simultaneous auto activation after restart and manual activation. + To properly catch the moment when cluster activation has finished we temporary disable auto activation. */ + disableAutoActivation = true; + startGrids(4); ig = grid(0); - ig.active(true); + ig.cluster().active(true); cache = ig.cache(cacheName); @@ -990,7 +1002,7 @@ public class CacheBaselineTopologyTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override public void reset() { - delegate.reset();; + delegate.reset(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/25b25f27/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTest.java index 1e046d4..ebc804f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTest.java @@ -31,6 +31,7 @@ import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CachePeekMode; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cache.store.CacheStoreAdapter; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; @@ -93,6 +94,8 @@ public class CacheLoadingConcurrentGridStartSelfTest extends GridCommonAbstractT ccfg.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(new TestCacheStoreAdapter())); + ccfg.setAffinity(new RendezvousAffinityFunction(false, 64)); + if (getTestIgniteInstanceName(0).equals(igniteInstanceName)) { if (client) cfg.setClientMode(true); http://git-wip-us.apache.org/repos/asf/ignite/blob/25b25f27/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidationTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidationTest.java index 63d772a..fc617bb 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidationTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidationTest.java @@ -106,6 +106,10 @@ public class GridCachePartitionsStateValidationTest extends GridCommonAbstractTe awaitPartitionMapExchange(); + // Populate cache to increment update counters. + for (int i = 0; i < 1000; i++) + ignite.cache(CACHE_NAME).put(i, i); + // Modify update counter for some partition. for (GridDhtLocalPartition partition : ignite.cachex(CACHE_NAME).context().topology().localPartitions()) { partition.updateCounter(100500L); http://git-wip-us.apache.org/repos/asf/ignite/blob/25b25f27/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidatorSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidatorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidatorSelfTest.java index 9ed8d54..43a2303 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidatorSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidatorSelfTest.java @@ -27,6 +27,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.Nullable; import org.junit.Assert; import org.mockito.Matchers; import org.mockito.Mockito; @@ -70,24 +71,21 @@ public class GridCachePartitionsStateValidatorSelfTest extends GridCommonAbstrac Mockito.when(partitionMock.id()).thenReturn(id); Mockito.when(partitionMock.updateCounter()).thenReturn(updateCounter); Mockito.when(partitionMock.fullSize()).thenReturn(size); + Mockito.when(partitionMock.state()).thenReturn(GridDhtPartitionState.OWNING); return partitionMock; } /** - * @return Message containing specified {@code countersMap}. + * @param countersMap Update counters map. + * @param sizesMap Sizes map. + * @return Message with specified {@code countersMap} and {@code sizeMap}. */ - private GridDhtPartitionsSingleMessage fromUpdateCounters(Map<Integer, T2<Long, Long>> countersMap) { + private GridDhtPartitionsSingleMessage from(@Nullable Map<Integer, T2<Long, Long>> countersMap, @Nullable Map<Integer, Long> sizesMap) { GridDhtPartitionsSingleMessage msg = new GridDhtPartitionsSingleMessage(); - msg.addPartitionUpdateCounters(0, countersMap); - return msg; - } - - /** - * @return Message containing specified {@code sizesMap}. - */ - private GridDhtPartitionsSingleMessage fromCacheSizes(Map<Integer, Long> sizesMap) { - GridDhtPartitionsSingleMessage msg = new GridDhtPartitionsSingleMessage(); - msg.addPartitionSizes(0, sizesMap); + if (countersMap != null) + msg.addPartitionUpdateCounters(0, countersMap); + if (sizesMap != null) + msg.addPartitionSizes(0, sizesMap); return msg; } @@ -98,15 +96,22 @@ public class GridCachePartitionsStateValidatorSelfTest extends GridCommonAbstrac UUID remoteNode = UUID.randomUUID(); UUID ignoreNode = UUID.randomUUID(); - // For partitions 0 and 2 (zero counter) we have inconsistent update counters. + // For partitions 0 and 2 we have inconsistent update counters. Map<Integer, T2<Long, Long>> updateCountersMap = new HashMap<>(); updateCountersMap.put(0, new T2<>(2L, 2L)); updateCountersMap.put(1, new T2<>(2L, 2L)); + updateCountersMap.put(2, new T2<>(5L, 5L)); + + // For partitions 0 and 2 we have inconsistent cache sizes. + Map<Integer, Long> cacheSizesMap = new HashMap<>(); + cacheSizesMap.put(0, 2L); + cacheSizesMap.put(1, 2L); + cacheSizesMap.put(2, 2L); // Form single messages map. Map<UUID, GridDhtPartitionsSingleMessage> messages = new HashMap<>(); - messages.put(remoteNode, fromUpdateCounters(updateCountersMap)); - messages.put(ignoreNode, fromUpdateCounters(updateCountersMap)); + messages.put(remoteNode, from(updateCountersMap, cacheSizesMap)); + messages.put(ignoreNode, from(updateCountersMap, cacheSizesMap)); GridDhtPartitionsStateValidator validator = new GridDhtPartitionsStateValidator(cctxMock); @@ -120,7 +125,7 @@ public class GridCachePartitionsStateValidatorSelfTest extends GridCommonAbstrac Assert.assertTrue(result.get(0).get(localNodeId) == 1L); Assert.assertTrue(result.get(0).get(remoteNode) == 2L); Assert.assertTrue(result.get(2).get(localNodeId) == 3L); - Assert.assertTrue(result.get(2).get(remoteNode) == 0L); + Assert.assertTrue(result.get(2).get(remoteNode) == 5L); } /** @@ -130,6 +135,12 @@ public class GridCachePartitionsStateValidatorSelfTest extends GridCommonAbstrac UUID remoteNode = UUID.randomUUID(); UUID ignoreNode = UUID.randomUUID(); + // For partitions 0 and 2 we have inconsistent update counters. + Map<Integer, T2<Long, Long>> updateCountersMap = new HashMap<>(); + updateCountersMap.put(0, new T2<>(2L, 2L)); + updateCountersMap.put(1, new T2<>(2L, 2L)); + updateCountersMap.put(2, new T2<>(5L, 5L)); + // For partitions 0 and 2 we have inconsistent cache sizes. Map<Integer, Long> cacheSizesMap = new HashMap<>(); cacheSizesMap.put(0, 2L); @@ -138,8 +149,8 @@ public class GridCachePartitionsStateValidatorSelfTest extends GridCommonAbstrac // Form single messages map. Map<UUID, GridDhtPartitionsSingleMessage> messages = new HashMap<>(); - messages.put(remoteNode, fromCacheSizes(cacheSizesMap)); - messages.put(ignoreNode, fromCacheSizes(cacheSizesMap)); + messages.put(remoteNode, from(updateCountersMap, cacheSizesMap)); + messages.put(ignoreNode, from(updateCountersMap, cacheSizesMap)); GridDhtPartitionsStateValidator validator = new GridDhtPartitionsStateValidator(cctxMock); http://git-wip-us.apache.org/repos/asf/ignite/blob/25b25f27/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteStandByClusterSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteStandByClusterSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteStandByClusterSuite.java index 6039ae3..fd124b7 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteStandByClusterSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteStandByClusterSuite.java @@ -47,7 +47,7 @@ public class IgniteStandByClusterSuite extends TestSuite { * @return Test suite. */ public static TestSuite suite() { - TestSuite suite = new TestSuite("Ignite Activate/DeActivate Cluster Test Suit"); + TestSuite suite = new TestSuite("Ignite Activate/DeActivate Cluster Test Suite"); suite.addTestSuite(IgniteClusterActivateDeactivateTest.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/25b25f27/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java index fc1cea6..dd495cf 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java @@ -73,6 +73,8 @@ public class IgniteCacheQueryNodeRestartSelfTest extends GridCacheAbstractSelfTe @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration c = super.getConfiguration(igniteInstanceName); + c.setConsistentId(igniteInstanceName); + TcpDiscoverySpi disco = new TcpDiscoverySpi(); disco.setIpFinder(ipFinder); http://git-wip-us.apache.org/repos/asf/ignite/blob/25b25f27/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/baseline/IgniteChangingBaselineCacheQueryNodeRestartSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/baseline/IgniteChangingBaselineCacheQueryNodeRestartSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/baseline/IgniteChangingBaselineCacheQueryNodeRestartSelfTest.java index 8e049ac..3ee19d5 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/baseline/IgniteChangingBaselineCacheQueryNodeRestartSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/baseline/IgniteChangingBaselineCacheQueryNodeRestartSelfTest.java @@ -58,7 +58,7 @@ public class IgniteChangingBaselineCacheQueryNodeRestartSelfTest extends IgniteC initStoreStrategy(); - grid(0).active(true); + grid(0).cluster().active(true); awaitPartitionMapExchange(); } @@ -74,7 +74,7 @@ public class IgniteChangingBaselineCacheQueryNodeRestartSelfTest extends IgniteC @Override protected IgniteInternalFuture createRestartAction(final AtomicBoolean done, final AtomicInteger restartCnt) throws Exception { return multithreadedAsync(new Callable<Object>() { /** */ - private final long baselineTopChangeInterval = 30 * 1000; + private final long baselineTopChangeInterval = 10 * 1000; /** */ private final int logFreq = 50;
