Repository: ignite Updated Branches: refs/heads/ignite-2.5 81ba6c842 -> db8865a7b
IGNITE-8405 Update partition owners during exchange in 1 operation. - Fixes #3929. Signed-off-by: Alexey Goncharuk <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/db8865a7 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/db8865a7 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/db8865a7 Branch: refs/heads/ignite-2.5 Commit: db8865a7b0dd0103f19852bf342b3b4e21cbb0e4 Parents: 81ba6c8 Author: Pavel Kovalenko <[email protected]> Authored: Thu May 3 17:05:40 2018 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Thu May 3 17:40:01 2018 +0300 ---------------------------------------------------------------------- .../dht/GridClientPartitionTopology.java | 67 ++++++++++------ .../dht/GridDhtPartitionTopology.java | 15 ++-- .../dht/GridDhtPartitionTopologyImpl.java | 83 +++++++++++++------- .../GridDhtPartitionsExchangeFuture.java | 21 +++-- .../util/tostring/GridToStringBuilder.java | 65 +++++++++++++-- ...eBaselineCacheQueryNodeRestartsSelfTest.java | 2 +- 6 files changed, 174 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/db8865a7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index 477d316..b365233 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; @@ -50,15 +51,14 @@ import org.apache.ignite.internal.util.GridAtomicLong; import org.apache.ignite.internal.util.GridPartitionStateMap; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.EVICTED; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING; -import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.RENTING; /** * Partition topology for node which does not have any local partitions. @@ -1138,39 +1138,62 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } /** {@inheritDoc} */ - @Override public Set<UUID> setOwners(int p, Set<UUID> owners, boolean haveHistory, boolean updateSeq) { - Set<UUID> result = haveHistory ? Collections.<UUID>emptySet() : new HashSet<UUID>(); + @Override public Map<UUID, Set<Integer>> resetOwners(Map<Integer, Set<UUID>> ownersByUpdCounters, Set<Integer> haveHistory) { + Map<UUID, Set<Integer>> result = new HashMap<>(); lock.writeLock().lock(); try { - for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) { - GridDhtPartitionMap partMap = e.getValue(); - UUID remoteNodeId = e.getKey(); + // Process remote partitions. + for (Map.Entry<Integer, Set<UUID>> entry : ownersByUpdCounters.entrySet()) { + int part = entry.getKey(); + Set<UUID> newOwners = entry.getValue(); - if (!partMap.containsKey(p)) - continue; + for (Map.Entry<UUID, GridDhtPartitionMap> remotes : node2part.entrySet()) { + UUID remoteNodeId = remotes.getKey(); + GridDhtPartitionMap partMap = remotes.getValue(); - if (partMap.get(p) == OWNING && !owners.contains(remoteNodeId)) { - partMap.put(p, MOVING); + GridDhtPartitionState state = partMap.get(part); + + if (state == null || state != OWNING) + continue; - if (!haveHistory) - result.add(remoteNodeId); + if (!newOwners.contains(remoteNodeId)) { + partMap.put(part, MOVING); - partMap.updateSequence(partMap.updateSequence() + 1, partMap.topologyVersion()); + partMap.updateSequence(partMap.updateSequence() + 1, partMap.topologyVersion()); - U.warn(log, "Partition has been scheduled for rebalancing due to outdated update counter " + - "[nodeId=" + remoteNodeId + ", groupId=" + grpId + - ", partId=" + p + ", haveHistory=" + haveHistory + "]"); + result.computeIfAbsent(remoteNodeId, n -> new HashSet<>()); + result.get(remoteNodeId).add(part); + } } } - part2node.put(p, owners); + for (Map.Entry<UUID, Set<Integer>> entry : result.entrySet()) { + UUID nodeId = entry.getKey(); + Set<Integer> partsToRebalance = entry.getValue(); - if (updateSeq) - this.updateSeq.incrementAndGet(); - } - finally { + if (!partsToRebalance.isEmpty()) { + Set<Integer> historical = partsToRebalance.stream() + .filter(haveHistory::contains) + .collect(Collectors.toSet()); + + // Filter out partitions having WAL history. + partsToRebalance.removeAll(historical); + + U.warn(log, "Partitions have been scheduled for rebalancing due to outdated update counter " + + "[grpId=" + grpId + + ", nodeId=" + nodeId + + ", partsFull=" + S.compact(partsToRebalance) + + ", partsHistorical=" + S.compact(historical) + "]"); + } + } + + for (Map.Entry<Integer, Set<UUID>> entry : ownersByUpdCounters.entrySet()) + part2node.put(entry.getKey(), entry.getValue()); + + updateSeq.incrementAndGet(); + } finally { lock.writeLock().unlock(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/db8865a7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index 2df2e89..d6c5450 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -389,16 +389,15 @@ public interface GridDhtPartitionTopology { public boolean rebalanceFinished(AffinityTopologyVersion topVer); /** - * Make nodes from provided set owners for a given partition. - * State of all current owners that aren't contained in the set will be reset to MOVING. + * Calculates nodes and partitions which have non-actual state and must be rebalanced. + * State of all current owners that aren't contained in the given {@code ownersByUpdCounters} will be reset to MOVING. * - * @param p Partition ID. - * @param owners Set of new owners. - * @param haveHistory {@code True} if there is WAL history to rebalance given partition. - * @param updateSeq If should increment sequence when updated. - * @return Set of node IDs that should reload partitions. + * @param ownersByUpdCounters Map (partition, set of node IDs that have most actual state about partition + * (update counter is maximal) and should hold OWNING state for such partition). + * @param haveHistory Set of partitions which have WAL history to rebalance. + * @return Map (nodeId, set of partitions that should be rebalanced <b>fully</b> by this node). */ - public Set<UUID> setOwners(int p, Set<UUID> owners, boolean haveHistory, boolean updateSeq); + public Map<UUID, Set<Integer>> resetOwners(Map<Integer, Set<UUID>> ownersByUpdCounters, Set<Integer> haveHistory); /** * Callback on exchange done. http://git-wip-us.apache.org/repos/asf/ignite/blob/db8865a7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 0a08fe1..aa32902 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -30,6 +30,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReferenceArray; +import java.util.stream.Collectors; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; @@ -60,6 +61,7 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.LT; +import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -2076,8 +2078,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** {@inheritDoc} */ - @Override public Set<UUID> setOwners(int p, Set<UUID> ownersByUpdCounters, boolean haveHistory, boolean updateSeq) { - Set<UUID> result = haveHistory ? Collections.<UUID>emptySet() : new HashSet<UUID>(); + @Override public Map<UUID, Set<Integer>> resetOwners(Map<Integer, Set<UUID>> ownersByUpdCounters, Set<Integer> haveHistory) { + Map<UUID, Set<Integer>> result = new HashMap<>(); ctx.database().checkpointReadLock(); @@ -2085,50 +2087,73 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { lock.writeLock().lock(); try { - GridDhtLocalPartition locPart = locParts.get(p); + // First process local partitions. + for (Map.Entry<Integer, Set<UUID>> entry : ownersByUpdCounters.entrySet()) { + int part = entry.getKey(); + Set<UUID> newOwners = entry.getValue(); + + GridDhtLocalPartition locPart = localPartition(part); - if (locPart != null) { - if (locPart.state() == OWNING && !ownersByUpdCounters.contains(ctx.localNodeId())) { - rebalancePartition(p, haveHistory); + if (locPart == null || locPart.state() != OWNING) + continue; - if (!haveHistory) - result.add(ctx.localNodeId()); + if (!newOwners.contains(ctx.localNodeId())) { + rebalancePartition(part, haveHistory.contains(part)); - U.warn(log, "Partition has been scheduled for rebalancing due to outdated update counter " + - "[nodeId=" + ctx.localNodeId() + ", grp=" + grp.cacheOrGroupName() + - ", partId=" + p + ", haveHistory=" + haveHistory + "]"); + result.computeIfAbsent(ctx.localNodeId(), n -> new HashSet<>()); + result.get(ctx.localNodeId()).add(part); } } - for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) { - UUID remoteNodeId = e.getKey(); - GridDhtPartitionMap partMap = e.getValue(); + // Then process remote partitions. + for (Map.Entry<Integer, Set<UUID>> entry : ownersByUpdCounters.entrySet()) { + int part = entry.getKey(); + Set<UUID> newOwners = entry.getValue(); - if (!partMap.containsKey(p)) - continue; + for (Map.Entry<UUID, GridDhtPartitionMap> remotes : node2part.entrySet()) { + UUID remoteNodeId = remotes.getKey(); + GridDhtPartitionMap partMap = remotes.getValue(); + + GridDhtPartitionState state = partMap.get(part); - if (partMap.get(p) == OWNING && !ownersByUpdCounters.contains(remoteNodeId)) { - partMap.put(p, MOVING); + if (state == null || state != OWNING) + continue; - if (!haveHistory) - result.add(remoteNodeId); + if (!newOwners.contains(remoteNodeId)) { + partMap.put(part, MOVING); - partMap.updateSequence(partMap.updateSequence() + 1, partMap.topologyVersion()); + partMap.updateSequence(partMap.updateSequence() + 1, partMap.topologyVersion()); - if (partMap.nodeId().equals(ctx.localNodeId())) - this.updateSeq.setIfGreater(partMap.updateSequence()); + if (partMap.nodeId().equals(ctx.localNodeId())) + updateSeq.setIfGreater(partMap.updateSequence()); - U.warn(log, "Partition has been scheduled for rebalancing due to outdated update counter " + - "[nodeId=" + remoteNodeId + ", grp=" + grp.cacheOrGroupName() + - ", partId=" + p + ", haveHistory=" + haveHistory + "]"); + result.computeIfAbsent(remoteNodeId, n -> new HashSet<>()); + result.get(remoteNodeId).add(part); + } } } - if (updateSeq) { - long updSeq = this.updateSeq.incrementAndGet(); + for (Map.Entry<UUID, Set<Integer>> entry : result.entrySet()) { + UUID nodeId = entry.getKey(); + Set<Integer> rebalancedParts = entry.getValue(); + + if (!rebalancedParts.isEmpty()) { + Set<Integer> historical = rebalancedParts.stream() + .filter(haveHistory::contains) + .collect(Collectors.toSet()); - node2part = new GridDhtPartitionFullMap(node2part, updSeq); + // Filter out partitions having WAL history. + rebalancedParts.removeAll(historical); + + U.warn(log, "Partitions have been scheduled for rebalancing due to outdated update counter " + + "[grp=" + grp.cacheOrGroupName() + + ", nodeId=" + nodeId + + ", partsFull=" + S.compact(rebalancedParts) + + ", partsHistorical=" + S.compact(historical) + "]"); + } } + + node2part = new GridDhtPartitionFullMap(node2part, updateSeq.incrementAndGet()); } finally { lock.writeLock().unlock(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/db8865a7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 33bd989..39f4ed1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -59,7 +59,6 @@ import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.pagemem.wal.record.ExchangeRecord; -import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage; @@ -73,7 +72,6 @@ import org.apache.ignite.internal.processors.cache.ExchangeContext; import org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; -import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.LocalJoinCachesContext; import org.apache.ignite.internal.processors.cache.StateChangeRequest; @@ -2375,19 +2373,18 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } } - for (Map.Entry<Integer, CounterWithNodes> e : maxCntrs.entrySet()) { - int p = e.getKey(); - long maxCntr = e.getValue().cnt; - - entryLeft--; + Map<Integer, Set<UUID>> ownersByUpdCounters = new HashMap<>(maxCntrs.size()); + for (Map.Entry<Integer, CounterWithNodes> e : maxCntrs.entrySet()) + ownersByUpdCounters.put(e.getKey(), e.getValue().nodes); - if (entryLeft != 0 && maxCntr == 0) - continue; + Map<UUID, Set<Integer>> partitionsToRebalance = top.resetOwners(ownersByUpdCounters, haveHistory); - Set<UUID> nodesToReload = top.setOwners(p, e.getValue().nodes, haveHistory.contains(p), entryLeft == 0); + for (Map.Entry<UUID, Set<Integer>> e : partitionsToRebalance.entrySet()) { + UUID nodeId = e.getKey(); + Set<Integer> parts = e.getValue(); - for (UUID nodeId : nodesToReload) - partsToReload.put(nodeId, top.groupId(), p); + for (int part : parts) + partsToReload.put(nodeId, top.groupId(), part); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/db8865a7/modules/core/src/main/java/org/apache/ignite/internal/util/tostring/GridToStringBuilder.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/tostring/GridToStringBuilder.java b/modules/core/src/main/java/org/apache/ignite/internal/util/tostring/GridToStringBuilder.java index 56eef1d..d361112 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/tostring/GridToStringBuilder.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/tostring/GridToStringBuilder.java @@ -17,32 +17,36 @@ package org.apache.ignite.internal.util.tostring; -import org.apache.ignite.IgniteException; -import org.apache.ignite.IgniteSystemProperties; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.jetbrains.annotations.Nullable; - import java.io.Externalizable; import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; import java.lang.reflect.Field; import java.lang.reflect.Modifier; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.EventListener; import java.util.HashMap; import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.Queue; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.SB; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; -import static org.apache.ignite.IgniteSystemProperties.IGNITE_TO_STRING_INCLUDE_SENSITIVE; import static org.apache.ignite.IgniteSystemProperties.IGNITE_TO_STRING_COLLECTION_LIMIT; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_TO_STRING_INCLUDE_SENSITIVE; /** * Provides auto-generation framework for {@code toString()} output. @@ -1818,4 +1822,51 @@ public class GridToStringBuilder { return cd; } + + /** + * Returns sorted and compacted string representation of given {@code col}. + * Two nearby numbers with difference at most 1 are compacted to one continuous segment. + * E.g. collection of [1, 2, 3, 5, 6, 7, 10] will be compacted to [1-3, 5-7, 10]. + * + * @param col Collection of integers. + * @return Compacted string representation of given collections. + */ + public static String compact(@NotNull Collection<Integer> col) { + if (col.isEmpty()) + return "[]"; + + SB sb = new SB(); + sb.a('['); + + List<Integer> l = new ArrayList<>(col); + Collections.sort(l); + + int left = l.get(0), right = left; + for (int i = 1; i < l.size(); i++) { + int val = l.get(i); + + if (right == val || right + 1 == val) { + right = val; + continue; + } + + if (left == right) + sb.a(left); + else + sb.a(left).a('-').a(right); + + sb.a(',').a(' '); + + left = right = val; + } + + if (left == right) + sb.a(left); + else + sb.a(left).a('-').a(right); + + sb.a(']'); + + return sb.toString(); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/db8865a7/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/baseline/IgniteStableBaselineCacheQueryNodeRestartsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/baseline/IgniteStableBaselineCacheQueryNodeRestartsSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/baseline/IgniteStableBaselineCacheQueryNodeRestartsSelfTest.java index 81d8038..3248906 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/baseline/IgniteStableBaselineCacheQueryNodeRestartsSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/baseline/IgniteStableBaselineCacheQueryNodeRestartsSelfTest.java @@ -50,7 +50,7 @@ public class IgniteStableBaselineCacheQueryNodeRestartsSelfTest extends IgniteCa initStoreStrategy(); - grid(0).active(true); + grid(0).cluster().active(true); stopGrid(gridCount());
