Repository: ignite Updated Branches: refs/heads/ignite-4154-2 e291914ca -> e35b8a582
ignite-4154 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e35b8a58 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e35b8a58 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e35b8a58 Branch: refs/heads/ignite-4154-2 Commit: e35b8a582dee804e2058cc237783de5f55e92ad4 Parents: e291914 Author: sboikov <[email protected]> Authored: Mon Nov 7 12:09:26 2016 +0300 Committer: sboikov <[email protected]> Committed: Mon Nov 7 12:14:21 2016 +0300 ---------------------------------------------------------------------- .../affinity/GridAffinityAssignmentCache.java | 62 +------ .../affinity/GridAffinityProcessor.java | 81 +++++++++ .../cache/CacheAffinitySharedManager.java | 27 +-- .../GridCachePartitionExchangeManager.java | 182 ++++++++++++++----- .../dht/GridClientPartitionTopology.java | 15 +- .../dht/GridDhtPartitionTopologyImpl.java | 2 +- .../dht/preloader/GridDhtPartitionFullMap.java | 4 + .../dht/preloader/GridDhtPartitionMap2.java | 21 ++- .../GridDhtPartitionsExchangeFuture.java | 55 ++---- .../preloader/GridDhtPartitionsFullMessage.java | 22 ++- .../GridDhtPartitionsSingleMessage.java | 11 +- .../GridCacheSyncReplicatedPreloadSelfTest.java | 3 - .../IgniteCacheSyncRebalanceModeSelfTest.java | 4 +- 13 files changed, 309 insertions(+), 180 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/e35b8a58/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java index 9166b31..a388c7a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java @@ -108,7 +108,7 @@ public class GridAffinityAssignmentCache { private final AtomicInteger fullHistSize = new AtomicInteger(); /** */ - private final SimilarAffinityKey similarAffKey; + private final Object similarAffKey; /** * Constructs affinity cached calculations. @@ -147,9 +147,14 @@ public class GridAffinityAssignmentCache { affCache = new ConcurrentSkipListMap<>(); head = new AtomicReference<>(new GridAffinityAssignment(AffinityTopologyVersion.NONE)); - similarAffKey = new SimilarAffinityKey(aff.getClass(), nodeFilter.getClass(), backups, partsCnt); + similarAffKey = ctx.affinity().similaryAffinityKey(aff, nodeFilter, backups, partsCnt); + + assert similarAffKey != null; } + /** + * @return Key to find caches with similar affinity. + */ public Object similarAffinityKey() { return similarAffKey; } @@ -612,57 +617,4 @@ public class GridAffinityAssignmentCache { return S.toString(AffinityReadyFuture.class, this); } } - - /** - * - */ - private static class SimilarAffinityKey { - /** */ - private final int backups; - - /** */ - private final Class<?> affFuncCls; - - /** */ - private final Class<?> filterCls; - - /** */ - private final int partsCnt; - - /** */ - private final int hash; - - public SimilarAffinityKey(Class<?> affFuncCls, Class<?> filterCls, int backups, int partsCnt) { - this.backups = backups; - this.affFuncCls = affFuncCls; - this.filterCls = filterCls; - this.partsCnt = partsCnt; - - int hash = backups; - hash = 31 * hash + affFuncCls.hashCode(); - hash = 31 * hash + filterCls.hashCode(); - hash= 31 * hash + partsCnt; - - this.hash = hash; - } - - @Override public int hashCode() { - return hash; - } - - @Override public boolean equals(Object o) { - if (o == this) - return true; - - if (o == null || getClass() != o.getClass()) - return false; - - SimilarAffinityKey key = (SimilarAffinityKey)o; - - return backups == key.backups && - affFuncCls == key.affFuncCls && - filterCls == key.filterCls && - partsCnt == key.partsCnt; - } - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/e35b8a58/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java index d3783f0..b9182ae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java @@ -58,6 +58,7 @@ import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -568,6 +569,20 @@ public class GridAffinityProcessor extends GridProcessorAdapter { return nodes.iterator().next(); } + /** + * @param aff Affinity function. + * @param nodeFilter Node class. + * @param backups Number of backups. + * @param parts Number of partitions. + * @return Key to find caches with similar affinity. + */ + public Object similaryAffinityKey(AffinityFunction aff, + IgnitePredicate<ClusterNode> nodeFilter, + int backups, + int parts) { + return new SimilarAffinityKey(aff.getClass(), nodeFilter.getClass(), backups, parts); + } + /** {@inheritDoc} */ @Override public void printMemoryStats() { X.println(">>>"); @@ -966,4 +981,70 @@ public class GridAffinityProcessor extends GridProcessorAdapter { return aff; } } + + /** + * + */ + private static class SimilarAffinityKey { + /** */ + private final int backups; + + /** */ + private final Class<?> affFuncCls; + + /** */ + private final Class<?> filterCls; + + /** */ + private final int partsCnt; + + /** */ + private final int hash; + + /** + * @param affFuncCls Affinity function class. + * @param filterCls Node filter class. + * @param backups Number of backups. + * @param partsCnt Number of partitions. + */ + SimilarAffinityKey(Class<?> affFuncCls, Class<?> filterCls, int backups, int partsCnt) { + this.backups = backups; + this.affFuncCls = affFuncCls; + this.filterCls = filterCls; + this.partsCnt = partsCnt; + + int hash = backups; + hash = 31 * hash + affFuncCls.hashCode(); + hash = 31 * hash + filterCls.hashCode(); + hash= 31 * hash + partsCnt; + + this.hash = hash; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return hash; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (o == this) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + SimilarAffinityKey key = (SimilarAffinityKey)o; + + return backups == key.backups && + affFuncCls == key.affFuncCls && + filterCls == key.filterCls && + partsCnt == key.partsCnt; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(SimilarAffinityKey.class, this); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/e35b8a58/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 88f1f97..2890887 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -127,7 +127,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @param node Event node. * @param topVer Topology version. */ - public void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer) { + void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer) { if (type == EVT_NODE_JOINED && node.isLocal()) { // Clean-up in case of client reconnect. registeredCaches.clear(); @@ -153,7 +153,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @param msg Customer message. * @return {@code True} if minor topology version should be increased. */ - public boolean onCustomEvent(CacheAffinityChangeMessage msg) { + boolean onCustomEvent(CacheAffinityChangeMessage msg) { assert lateAffAssign : msg; if (msg.exchangeId() != null) { @@ -219,7 +219,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @param top Topology. * @param checkCacheId Cache ID. */ - public void checkRebalanceState(GridDhtPartitionTopology top, Integer checkCacheId) { + void checkRebalanceState(GridDhtPartitionTopology top, Integer checkCacheId) { if (!lateAffAssign) return; @@ -1246,6 +1246,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @param aff Affinity. * @param rebalanceInfo Rebalance information. * @param latePrimary If {@code true} delays primary assignment if it is not owner. + * @param affCache Already calculated assignments (to reduce data stored in history). * @throws IgniteCheckedException If failed. */ private void initAffinityOnNodeJoin(GridDhtPartitionsExchangeFuture fut, @@ -1303,10 +1304,10 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } /** - * @param aff - * @param assign - * @param affCache - * @return + * @param aff Assignment cache. + * @param assign Assignment. + * @param affCache Assignments already calculated for other caches. + * @return Assignment. */ private List<List<ClusterNode>> cachedAssignment(GridAffinityAssignmentCache aff, List<List<ClusterNode>> assign, @@ -1393,7 +1394,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @return Affinity assignment. * @throws IgniteCheckedException If failed. */ - public Map<Integer, Map<Integer, List<UUID>>> initAffinityOnNodeLeft0(final GridDhtPartitionsExchangeFuture fut) + private Map<Integer, Map<Integer, List<UUID>>> initAffinityOnNodeLeft0(final GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException { final AffinityTopologyVersion topVer = fut.topologyVersion(); @@ -1580,7 +1581,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @param aff Affinity cache. * @param initAff Existing affinity cache. */ - public CacheHolder(boolean rebalanceEnabled, GridAffinityAssignmentCache aff, @Nullable GridAffinityAssignmentCache initAff) { + CacheHolder(boolean rebalanceEnabled, + GridAffinityAssignmentCache aff, + @Nullable GridAffinityAssignmentCache initAff) { this.aff = aff; if (initAff != null) @@ -1632,7 +1635,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap /** * Created cache is started on coordinator. */ - class CacheHolder1 extends CacheHolder { + private class CacheHolder1 extends CacheHolder { /** */ private final GridCacheContext cctx; @@ -1640,7 +1643,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @param cctx Cache context. * @param initAff Current affinity. */ - public CacheHolder1(GridCacheContext cctx, @Nullable GridAffinityAssignmentCache initAff) { + CacheHolder1(GridCacheContext cctx, @Nullable GridAffinityAssignmentCache initAff) { super(cctx.rebalanceEnabled(), cctx.affinity().affinityCache(), initAff); assert !cctx.isLocal() : cctx.name(); @@ -1677,7 +1680,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap /** * Created if cache is not started on coordinator. */ - static class CacheHolder2 extends CacheHolder { + private static class CacheHolder2 extends CacheHolder { /** */ private final GridCacheSharedContext cctx; http://git-wip-us.apache.org/repos/asf/ignite/blob/e35b8a58/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 9097934..953ab8d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -21,6 +21,7 @@ import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Comparator; import java.util.Date; import java.util.HashMap; @@ -44,7 +45,9 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.cache.affinity.AffinityFunction; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; @@ -56,6 +59,7 @@ import org.apache.ignite.internal.events.DiscoveryCustomEvent; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage; @@ -534,8 +538,23 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (top != null) return top; + Object affKey = null; + + DynamicCacheDescriptor desc = cctx.cache().cacheDescriptor(cacheId); + + if (desc != null) { + CacheConfiguration ccfg = desc.cacheConfiguration(); + + AffinityFunction aff = ccfg.getAffinity(); + + affKey = cctx.kernalContext().affinity().similaryAffinityKey(aff, + ccfg.getNodeFilter(), + ccfg.getBackups(), + aff.partitions()); + } + GridClientPartitionTopology old = clientTops.putIfAbsent(cacheId, - top = new GridClientPartitionTopology(cctx, cacheId, exchFut)); + top = new GridClientPartitionTopology(cctx, cacheId, exchFut, affKey)); return old != null ? old : top; } @@ -765,7 +784,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana * @return {@code True} if message was sent, {@code false} if node left grid. */ private boolean sendAllPartitions(Collection<ClusterNode> nodes) { - GridDhtPartitionsFullMessage m = createPartitionsMessage(nodes, null, null, true); + GridDhtPartitionsFullMessage m = createPartitionsFullMessage(nodes, null, null, true); if (log.isDebugEnabled()) log.debug("Sending all partitions [nodeIds=" + U.nodeIds(nodes) + ", msg=" + m + ']'); @@ -786,13 +805,17 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana return true; } + /** * @param nodes Target nodes. - * @return Message; + * @param exchId Non-null exchange ID if message is created for exchange. + * @param lastVer Last version. + * @param compress {@code True} if it is possible to use compression for message. + * @return Message. */ - public GridDhtPartitionsFullMessage createPartitionsMessage(Collection<ClusterNode> nodes, - GridDhtPartitionExchangeId exchId, - GridCacheVersion lastVer, + public GridDhtPartitionsFullMessage createPartitionsFullMessage(Collection<ClusterNode> nodes, + @Nullable GridDhtPartitionExchangeId exchId, + @Nullable GridCacheVersion lastVer, boolean compress) { GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(exchId, lastVer, @@ -832,8 +855,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (ready) { GridDhtPartitionFullMap locMap = cacheCtx.topology().partitionMap(true); - if (useOldApi) - locMap = new GridDhtPartitionFullMap(locMap.nodeId(), locMap.nodeOrder(), locMap.updateSequence(), locMap); + if (useOldApi) { + locMap = new GridDhtPartitionFullMap(locMap.nodeId(), + locMap.nodeOrder(), + locMap.updateSequence(), + locMap); + } addFullPartitionsMap(m, dupData, @@ -850,7 +877,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana // It is important that client topologies be added after contexts. for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) { - m.addFullPartitionsMap(top.cacheId(), top.partitionMap(true)); + GridDhtPartitionFullMap map = top.partitionMap(true); + + addFullPartitionsMap(m, + dupData, + compress, + top.cacheId(), + map, + top.similarAffinityKey()); if (exchId != null) m.addPartitionUpdateCounters(top.cacheId(), top.updateCounters()); @@ -859,34 +893,42 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana return m; } + /** + * @param m Message. + * @param dupData Duplicated data map. + * @param compress {@code True} if need check for duplicated partition state data. + * @param cacheId Cache ID. + * @param map Map to add. + * @param affKey Cache affinity key. + */ private void addFullPartitionsMap(GridDhtPartitionsFullMessage m, Map<Object, T2<Integer, GridDhtPartitionFullMap>> dupData, boolean compress, Integer cacheId, - GridDhtPartitionFullMap locMap, + GridDhtPartitionFullMap map, Object affKey) { Integer dupDataCache = null; - if (compress) { + if (compress && affKey != null && !m.containsCache(cacheId)) { T2<Integer, GridDhtPartitionFullMap> state0 = dupData.get(affKey); - if (state0 != null && state0.get2().partitionStateEquals(locMap)) { - GridDhtPartitionFullMap locMap0 = new GridDhtPartitionFullMap(locMap.nodeId(), - locMap.nodeOrder(), - locMap.updateSequence()); + if (state0 != null && state0.get2().partitionStateEquals(map)) { + GridDhtPartitionFullMap map0 = new GridDhtPartitionFullMap(map.nodeId(), + map.nodeOrder(), + map.updateSequence()); - for (Map.Entry<UUID, GridDhtPartitionMap2> e : locMap.entrySet()) - locMap0.put(e.getKey(), e.getValue().emptyCopy()); + for (Map.Entry<UUID, GridDhtPartitionMap2> e : map.entrySet()) + map0.put(e.getKey(), e.getValue().emptyCopy()); - locMap = locMap0; + map = map0; dupDataCache = state0.get1(); } else - dupData.put(affKey, new T2<>(cacheId, locMap)); + dupData.put(affKey, new T2<>(cacheId, map)); } - m.addFullPartitionsMap(cacheId, locMap, dupDataCache); + m.addFullPartitionsMap(cacheId, map, dupDataCache); } /** @@ -894,27 +936,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana * @param id ID. */ private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) { - GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id, + GridDhtPartitionsSingleMessage m = createPartitionsSingleMessage(node, + id, cctx.kernalContext().clientNode(), - cctx.versions().last(), - node.version().compareToIgnoreTimestamp(GridDhtPartitionsSingleMessage.PART_MAP_COMPRESS_SINCE) >= 0); - - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (!cacheCtx.isLocal()) { - GridDhtPartitionMap2 locMap = cacheCtx.topology().localPartitionMap(); - - if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0) - locMap = new GridDhtPartitionMap(locMap.nodeId(), locMap.updateSequence(), locMap.map()); - - m.addLocalPartitionMap(cacheCtx.cacheId(), locMap); - } - } - - for (GridClientPartitionTopology top : clientTops.values()) { - GridDhtPartitionMap2 locMap = top.localPartitionMap(); - - m.addLocalPartitionMap(top.cacheId(), locMap); - } + false); if (log.isDebugEnabled()) log.debug("Sending local partitions [nodeId=" + node.id() + ", msg=" + m + ']'); @@ -933,6 +958,81 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** + * @param targetNode Target node. + * @param exchangeId ID. + * @param clientOnlyExchange Client exchange flag. + * @param sndCounters {@code True} if need send partition update counters. + * @return Message. + */ + public GridDhtPartitionsSingleMessage createPartitionsSingleMessage(ClusterNode targetNode, + @Nullable GridDhtPartitionExchangeId exchangeId, + boolean clientOnlyExchange, + boolean sndCounters) + { + boolean compress = + targetNode.version().compareToIgnoreTimestamp(GridDhtPartitionsSingleMessage.PART_MAP_COMPRESS_SINCE) >= 0; + + GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(exchangeId, + clientOnlyExchange, + cctx.versions().last(), + compress); + + Map<Object, T2<Integer,Map<Integer, GridDhtPartitionState>>> dupData = new HashMap<>(); + + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { + if (!cacheCtx.isLocal()) { + GridDhtPartitionMap2 locMap = cacheCtx.topology().localPartitionMap(); + + if (targetNode.version().compareTo(GridDhtPartitionMap2.SINCE) < 0) + locMap = new GridDhtPartitionMap(locMap.nodeId(), locMap.updateSequence(), locMap.map()); + + addPartitionMap(m, + dupData, + compress, + cacheCtx.cacheId(), + locMap, + cacheCtx.affinity().affinityCache().similarAffinityKey()); + + if (sndCounters) + m.partitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters()); + } + } + + return m; + } + + /** + * @param m Message. + * @param dupData Duplicated data map. + * @param compress {@code True} if need check for duplicated partition state data. + * @param cacheId Cache ID. + * @param map Map to add. + * @param affKey Cache affinity key. + */ + private void addPartitionMap(GridDhtPartitionsSingleMessage m, + Map<Object, T2<Integer,Map<Integer, GridDhtPartitionState>>> dupData, + boolean compress, + Integer cacheId, + GridDhtPartitionMap2 map, + Object affKey) { + Integer dupDataCache = null; + + if (compress) { + T2<Integer, Map<Integer, GridDhtPartitionState>> state0 = dupData.get(affKey); + + if (state0 != null && state0.get2().equals(map.map())) { + dupDataCache = state0.get1(); + + map.map(Collections.<Integer, GridDhtPartitionState>emptyMap()); + } + else + dupData.put(affKey, new T2<>(cacheId, map.map())); + } + + m.addLocalPartitionMap(cacheId, map, dupDataCache); + } + + /** * @param nodeId Cause node ID. * @param topVer Topology version. * @param evt Event type. @@ -949,7 +1049,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana * @param affChangeMsg Affinity change message. * @return Exchange future. */ - GridDhtPartitionsExchangeFuture exchangeFuture(GridDhtPartitionExchangeId exchId, + private GridDhtPartitionsExchangeFuture exchangeFuture(GridDhtPartitionExchangeId exchId, @Nullable DiscoveryEvent discoEvt, @Nullable Collection<DynamicCacheChangeRequest> reqs, @Nullable CacheAffinityChangeMessage affChangeMsg) { http://git-wip-us.apache.org/repos/asf/ignite/blob/e35b8a58/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index 58933b7..4418b11 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -97,18 +97,24 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { /** Partition update counters. */ private Map<Integer, Long> cntrMap = new HashMap<>(); + /** */ + private final Object similarAffKey; + /** * @param cctx Context. * @param cacheId Cache ID. * @param exchFut Exchange ID. + * @param similarAffKey Key to find caches with similar affinity. */ public GridClientPartitionTopology( GridCacheSharedContext cctx, int cacheId, - GridDhtPartitionsExchangeFuture exchFut + GridDhtPartitionsExchangeFuture exchFut, + Object similarAffKey ) { this.cctx = cctx; this.cacheId = cacheId; + this.similarAffKey = similarAffKey; topVer = exchFut.topologyVersion(); @@ -125,6 +131,13 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } /** + * @return Key to find caches with similar affinity. + */ + @Nullable public Object similarAffinityKey() { + return similarAffKey; + } + + /** * @return Full map string representation. */ @SuppressWarnings( {"ConstantConditions"}) http://git-wip-us.apache.org/repos/asf/ignite/blob/e35b8a58/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 71458fb..08a1c89 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -1518,7 +1518,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { Long cntr0 = res.get(part.id()); long cntr1 = part.updateCounter(); - if ((cntr0 == null || cntr1 > cntr0) && cntr1 != 0) + if ((cntr0 == null || cntr1 > cntr0) && cntr1 != 0L) res.put(part.id(), cntr1); } http://git-wip-us.apache.org/repos/asf/ignite/blob/e35b8a58/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java index 4253cc2..8f5ad17 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java @@ -157,6 +157,10 @@ public class GridDhtPartitionFullMap extends HashMap<UUID, GridDhtPartitionMap2> return updateSeq; } + /** + * @param fullMap Map. + * @return {@code True} if this map and given map contain the same data. + */ public boolean partitionStateEquals(GridDhtPartitionFullMap fullMap) { if (size() != fullMap.size()) return false; http://git-wip-us.apache.org/repos/asf/ignite/blob/e35b8a58/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java index 5cdafa1..930d5fa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java @@ -89,7 +89,18 @@ public class GridDhtPartitionMap2 implements Comparable<GridDhtPartitionMap2>, E } } - private GridDhtPartitionMap2(UUID nodeId, long updateSeq, AffinityTopologyVersion top, Map<Integer, GridDhtPartitionState> map, int moving) { + /** + * @param nodeId Node ID. + * @param updateSeq Update sequence number. + * @param top Topology version. + * @param map Map. + * @param moving Number of moving partitions. + */ + private GridDhtPartitionMap2(UUID nodeId, + long updateSeq, + AffinityTopologyVersion top, + Map<Integer, GridDhtPartitionState> map, + int moving) { this.nodeId = nodeId; this.updateSeq = updateSeq; this.top = top; @@ -97,8 +108,11 @@ public class GridDhtPartitionMap2 implements Comparable<GridDhtPartitionMap2>, E this.moving = moving; } + /** + * @return Copy with empty partition state map. + */ public GridDhtPartitionMap2 emptyCopy() { - Map<Integer, GridDhtPartitionState> map = new HashMap<>(); + Map<Integer, GridDhtPartitionState> map = U.newHashMap(0); return new GridDhtPartitionMap2(nodeId, updateSeq, top, map, moving); } @@ -288,9 +302,8 @@ public class GridDhtPartitionMap2 implements Comparable<GridDhtPartitionMap2>, E long ver = in.readLong(); int minorVer = in.readInt(); - if (ver != 0) { + if (ver != 0) top = new AffinityTopologyVersion(ver, minorVer); - } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/e35b8a58/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 52ed262..e5b4c2d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -680,7 +680,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } else { if (!centralizedAff) - sendLocalPartitions(crd, exchId); + sendLocalPartitions(crd); initDone(); @@ -930,49 +930,14 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT /** * @param node Node. - * @param id ID. * @throws IgniteCheckedException If failed. */ - private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) + private void sendLocalPartitions(ClusterNode node) throws IgniteCheckedException { - boolean compress = - node.version().compareToIgnoreTimestamp(GridDhtPartitionsSingleMessage.PART_MAP_COMPRESS_SINCE) >= 0; - - GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id, + GridDhtPartitionsSingleMessage m = cctx.exchange().createPartitionsSingleMessage(node, + exchangeId(), clientOnlyExchange, - cctx.versions().last(), - compress); - - Map<Object, T2<Integer,Map<Integer, GridDhtPartitionState>>> dupData = new HashMap<>(); - - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (!cacheCtx.isLocal()) { - GridDhtPartitionMap2 locMap = cacheCtx.topology().localPartitionMap(); - - if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0) - locMap = new GridDhtPartitionMap(locMap.nodeId(), locMap.updateSequence(), locMap.map()); - - Integer dupDataCache = null; - - if (compress) { - Object affKey = cacheCtx.affinity().affinityCache().similarAffinityKey(); - - T2<Integer, Map<Integer, GridDhtPartitionState>> state0 = dupData.get(affKey); - - if (state0 != null && state0.get2().equals(locMap.map())) { - dupDataCache = state0.get1(); - - locMap.map(Collections.<Integer, GridDhtPartitionState>emptyMap()); - } - else - dupData.put(affKey, new T2<>(cacheCtx.cacheId(), locMap.map())); - } - - m.addLocalPartitionMap(cacheCtx.cacheId(), locMap, dupDataCache); - - m.partitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters()); - } - } + true); if (log.isDebugEnabled()) log.debug("Sending local partitions [nodeId=" + node.id() + ", exchId=" + exchId + ", msg=" + m + ']'); @@ -988,13 +953,15 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT /** * @param nodes Target nodes. - * @return Message; + * @param compress {@code True} if it is possible to use compression for message. + * @return Message. */ private GridDhtPartitionsFullMessage createPartitionsMessage(Collection<ClusterNode> nodes, boolean compress) { GridCacheVersion last = lastVer.get(); - return cctx.exchange().createPartitionsMessage(nodes, - exchangeId(), last != null ? last : cctx.versions().last(), + return cctx.exchange().createPartitionsFullMessage(nodes, + exchangeId(), + last != null ? last : cctx.versions().last(), compress); } @@ -1017,7 +984,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT */ private void sendPartitions(ClusterNode oldestNode) { try { - sendLocalPartitions(oldestNode, exchId); + sendLocalPartitions(oldestNode); } catch (ClusterTopologyCheckedException ignore) { if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/ignite/blob/e35b8a58/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index e5a2828..304c183 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@ -29,10 +29,8 @@ import org.apache.ignite.internal.GridDirectMap; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; @@ -97,6 +95,9 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa this.topVer = topVer; } + /** + * @param compress {@code True} if it is possible to use compression for message. + */ public void compress(boolean compress) { this.compress = compress; } @@ -110,17 +111,18 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa /** * @param cacheId Cache ID. - * @param fullMap Full partitions map. + * @return {@code True} if message contains full map for given cache. */ - public void addFullPartitionsMap(int cacheId, GridDhtPartitionFullMap fullMap) { - addFullPartitionsMap(cacheId, fullMap, null); + public boolean containsCache(int cacheId) { + return parts != null && parts.containsKey(cacheId); } /** * @param cacheId Cache ID. * @param fullMap Full partitions map. + * @param dupDataCache Optional ID of cache with the same partition state map. */ - public void addFullPartitionsMap(int cacheId, GridDhtPartitionFullMap fullMap, Integer dupDataCache) { + public void addFullPartitionsMap(int cacheId, GridDhtPartitionFullMap fullMap, @Nullable Integer dupDataCache) { if (parts == null) parts = new HashMap<>(); @@ -238,12 +240,12 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa assert map2 != null : e.getValue(); - for (Map.Entry<UUID, GridDhtPartitionMap2> e0 : map1.entrySet()) { - GridDhtPartitionMap2 partMap1 = e0.getValue(); + for (Map.Entry<UUID, GridDhtPartitionMap2> e0 : map2.entrySet()) { + GridDhtPartitionMap2 partMap1 = map1.get(e0.getKey()); - assert partMap1.map().isEmpty(); + assert partMap1 != null && partMap1.map().isEmpty() : partMap1; - GridDhtPartitionMap2 partMap2 = map1.get(e0.getKey()); + GridDhtPartitionMap2 partMap2 = e0.getValue(); assert partMap2 != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/e35b8a58/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java index 134a3b2..1fbcb6f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java @@ -26,7 +26,6 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridDirectMap; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.F; @@ -82,6 +81,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes * @param exchId Exchange ID. * @param client Client message flag. * @param lastVer Last version. + * @param compress {@code True} if it is possible to use compression for message. */ public GridDhtPartitionsSingleMessage(GridDhtPartitionExchangeId exchId, boolean client, @@ -101,20 +101,15 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes } /** - * Adds partition map to this message. - * * @param cacheId Cache ID to add local partition for. * @param locMap Local partition map. + * @param dupDataCache Optional ID of cache with the same partition state map. */ - public void addLocalPartitionMap(int cacheId, GridDhtPartitionMap2 locMap) { + public void addLocalPartitionMap(int cacheId, GridDhtPartitionMap2 locMap, @Nullable Integer dupDataCache) { if (parts == null) parts = new HashMap<>(); parts.put(cacheId, locMap); - } - - public void addLocalPartitionMap(int cacheId, GridDhtPartitionMap2 locMap, Integer dupDataCache) { - addLocalPartitionMap(cacheId, locMap); if (dupDataCache != null) { assert F.isEmpty(locMap.map()); http://git-wip-us.apache.org/repos/asf/ignite/blob/e35b8a58/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java index 87d02a5..cde6b8d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java @@ -41,9 +41,6 @@ public class GridCacheSyncReplicatedPreloadSelfTest extends GridCommonAbstractTe /** */ private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - /** */ - private static final boolean DISCO_DEBUG_MODE = false; - /** * Constructs test. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/e35b8a58/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheSyncRebalanceModeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheSyncRebalanceModeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheSyncRebalanceModeSelfTest.java index 9b0637e..f3942d5 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheSyncRebalanceModeSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheSyncRebalanceModeSelfTest.java @@ -34,7 +34,9 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; public class IgniteCacheSyncRebalanceModeSelfTest extends GridCommonAbstractTest { /** Entry count. */ public static final int CNT = 100_000; - public static final String STATIC_CACHE_NAME = "static"; + + /** */ + private static final String STATIC_CACHE_NAME = "static"; /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
