Repository: ignite Updated Branches: refs/heads/ignite-4154-2 [created] e291914ca
ignite-4154 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e291914c Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e291914c Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e291914c Branch: refs/heads/ignite-4154-2 Commit: e291914ca72762fd4e39867f2d1569f4706e4627 Parents: 6ac5317 Author: sboikov <[email protected]> Authored: Fri Nov 4 14:31:11 2016 +0300 Committer: sboikov <[email protected]> Committed: Fri Nov 4 17:03:31 2016 +0300 ---------------------------------------------------------------------- .../GridCachePartitionExchangeManager.java | 137 ++++++++++++++----- .../dht/GridDhtPartitionTopologyImpl.java | 11 +- .../dht/preloader/GridDhtPartitionFullMap.java | 14 ++ .../dht/preloader/GridDhtPartitionMap2.java | 43 +++--- .../GridDhtPartitionsExchangeFuture.java | 85 +++++------- .../preloader/GridDhtPartitionsFullMessage.java | 82 ++++++++++- .../GridDhtPartitionsSingleMessage.java | 62 ++++++++- 7 files changed, 315 insertions(+), 119 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/e291914c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index a81bf0f..9097934 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -72,6 +72,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.util.GridListSet; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -81,6 +82,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.GPC; import org.apache.ignite.internal.util.typedef.internal.S; @@ -762,62 +764,129 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana * @param nodes Nodes. * @return {@code True} if message was sent, {@code false} if node left grid. */ - private boolean sendAllPartitions(Collection<? extends ClusterNode> nodes) { - GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(null, null, AffinityTopologyVersion.NONE); + private boolean sendAllPartitions(Collection<ClusterNode> nodes) { + GridDhtPartitionsFullMessage m = createPartitionsMessage(nodes, null, null, true); - boolean useOldApi = false; - boolean compress = true; + if (log.isDebugEnabled()) + log.debug("Sending all partitions [nodeIds=" + U.nodeIds(nodes) + ", msg=" + m + ']'); for (ClusterNode node : nodes) { - if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0) { - useOldApi = true; - compress = false; + try { + cctx.io().sendNoRetry(node, m, SYSTEM_POOL); + } + catch (ClusterTopologyCheckedException ignore) { + if (log.isDebugEnabled()) + log.debug("Failed to send partition update to node because it left grid (will ignore) [node=" + + node.id() + ", msg=" + m + ']'); + } + catch (IgniteCheckedException e) { + U.warn(log, "Failed to send partitions full message [node=" + node + ", err=" + e + ']'); + } + } - break; + return true; + } + /** + * @param nodes Target nodes. + * @return Message; + */ + public GridDhtPartitionsFullMessage createPartitionsMessage(Collection<ClusterNode> nodes, + GridDhtPartitionExchangeId exchId, + GridCacheVersion lastVer, + boolean compress) { + GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(exchId, + lastVer, + exchId != null ? exchId.topologyVersion() : AffinityTopologyVersion.NONE); + + boolean useOldApi = false; + + if (nodes != null) { + for (ClusterNode node : nodes) { + if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0) { + useOldApi = true; + compress = false; + + break; + } + else if (node.version().compareToIgnoreTimestamp(GridDhtPartitionsAbstractMessage.PART_MAP_COMPRESS_SINCE) < 0) + compress = false; } - else if (node.version().compareToIgnoreTimestamp(GridDhtPartitionsAbstractMessage.PART_MAP_COMPRESS_SINCE) < 0) - compress = false; } m.compress(compress); + Map<Object, T2<Integer, GridDhtPartitionFullMap>> dupData = new HashMap<>(); + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (!cacheCtx.isLocal() && cacheCtx.started()) { - GridDhtPartitionFullMap locMap = cacheCtx.topology().partitionMap(true); - - if (useOldApi) { - locMap = new GridDhtPartitionFullMap(locMap.nodeId(), - locMap.nodeOrder(), - locMap.updateSequence(), - locMap); + if (!cacheCtx.isLocal()) { + boolean ready; + + if (exchId != null) { + AffinityTopologyVersion startTopVer = cacheCtx.startTopologyVersion(); + + ready = startTopVer == null || startTopVer.compareTo(exchId.topologyVersion()) <= 0; } + else + ready = cacheCtx.started(); + + if (ready) { + GridDhtPartitionFullMap locMap = cacheCtx.topology().partitionMap(true); + + if (useOldApi) + locMap = new GridDhtPartitionFullMap(locMap.nodeId(), locMap.nodeOrder(), locMap.updateSequence(), locMap); - m.addFullPartitionsMap(cacheCtx.cacheId(), locMap); + addFullPartitionsMap(m, + dupData, + compress, + cacheCtx.cacheId(), + locMap, + cacheCtx.affinity().affinityCache().similarAffinityKey()); + + if (exchId != null) + m.addPartitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters()); + } } } // It is important that client topologies be added after contexts. - for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) + for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) { m.addFullPartitionsMap(top.cacheId(), top.partitionMap(true)); - if (log.isDebugEnabled()) - log.debug("Sending all partitions [nodeIds=" + U.nodeIds(nodes) + ", msg=" + m + ']'); + if (exchId != null) + m.addPartitionUpdateCounters(top.cacheId(), top.updateCounters()); + } - for (ClusterNode node : nodes) { - try { - cctx.io().sendNoRetry(node, m, SYSTEM_POOL); - } - catch (ClusterTopologyCheckedException ignore) { - if (log.isDebugEnabled()) - log.debug("Failed to send partition update to node because it left grid (will ignore) [node=" + - node.id() + ", msg=" + m + ']'); - } - catch (IgniteCheckedException e) { - U.warn(log, "Failed to send partitions full message [node=" + node + ", err=" + e + ']'); + return m; + } + + private void addFullPartitionsMap(GridDhtPartitionsFullMessage m, + Map<Object, T2<Integer, GridDhtPartitionFullMap>> dupData, + boolean compress, + Integer cacheId, + GridDhtPartitionFullMap locMap, + Object affKey) { + Integer dupDataCache = null; + + if (compress) { + T2<Integer, GridDhtPartitionFullMap> state0 = dupData.get(affKey); + + if (state0 != null && state0.get2().partitionStateEquals(locMap)) { + GridDhtPartitionFullMap locMap0 = new GridDhtPartitionFullMap(locMap.nodeId(), + locMap.nodeOrder(), + locMap.updateSequence()); + + for (Map.Entry<UUID, GridDhtPartitionMap2> e : locMap.entrySet()) + locMap0.put(e.getKey(), e.getValue().emptyCopy()); + + locMap = locMap0; + + dupDataCache = state0.get1(); } + else + dupData.put(affKey, new T2<>(cacheId, locMap)); } - return true; + m.addFullPartitionsMap(cacheId, locMap, dupDataCache); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/e291914c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 871a084..71458fb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -71,6 +71,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** Flag to control amount of output for full map. */ private static final boolean FULL_MAP_DEBUG = false; + /** */ + private static final Long ZERO = 0L; + /** Context. */ private final GridCacheContext<?, ?> cctx; @@ -1029,7 +1032,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) { Long cntr = this.cntrMap.get(e.getKey()); - if (cntr == null || cntr < e.getValue()) + if ((cntr == null || cntr < e.getValue()) && !e.getValue().equals(ZERO)) this.cntrMap.put(e.getKey(), e.getValue()); } @@ -1169,7 +1172,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) { Long cntr = this.cntrMap.get(e.getKey()); - if (cntr == null || cntr < e.getValue()) + if ((cntr == null || cntr < e.getValue()) && !e.getValue().equals(ZERO)) this.cntrMap.put(e.getKey(), e.getValue()); } @@ -1513,9 +1516,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { continue; Long cntr0 = res.get(part.id()); - Long cntr1 = part.updateCounter(); + long cntr1 = part.updateCounter(); - if (cntr0 == null || cntr1 > cntr0) + if ((cntr0 == null || cntr1 > cntr0) && cntr1 != 0) res.put(part.id(), cntr1); } http://git-wip-us.apache.org/repos/asf/ignite/blob/e291914c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java index 498d492..4253cc2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java @@ -157,6 +157,20 @@ public class GridDhtPartitionFullMap extends HashMap<UUID, GridDhtPartitionMap2> return updateSeq; } + public boolean partitionStateEquals(GridDhtPartitionFullMap fullMap) { + if (size() != fullMap.size()) + return false; + + for (Map.Entry<UUID, GridDhtPartitionMap2> e : entrySet()) { + GridDhtPartitionMap2 m = fullMap.get(e.getKey()); + + if (m == null || !m.map().equals(e.getValue().map())) + return false; + } + + return true; + } + /** * @param updateSeq New update sequence value. * @return Old update sequence value. http://git-wip-us.apache.org/repos/asf/ignite/blob/e291914c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java index 15b5a2e..5cdafa1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java @@ -63,25 +63,15 @@ public class GridDhtPartitionMap2 implements Comparable<GridDhtPartitionMap2>, E /** * @param nodeId Node ID. * @param updateSeq Update sequence number. - */ - public GridDhtPartitionMap2(UUID nodeId, long updateSeq) { - assert nodeId != null; - assert updateSeq > 0; - - this.nodeId = nodeId; - this.updateSeq = updateSeq; - - map = new HashMap<>(); - } - - /** - * @param nodeId Node ID. - * @param updateSeq Update sequence number. + * @param top Topology version. * @param m Map to copy. * @param onlyActive If {@code true}, then only active states will be included. */ - public GridDhtPartitionMap2(UUID nodeId, long updateSeq, AffinityTopologyVersion top, - Map<Integer, GridDhtPartitionState> m, boolean onlyActive) { + public GridDhtPartitionMap2(UUID nodeId, + long updateSeq, + AffinityTopologyVersion top, + Map<Integer, GridDhtPartitionState> m, + boolean onlyActive) { assert nodeId != null; assert updateSeq > 0; @@ -99,6 +89,20 @@ public class GridDhtPartitionMap2 implements Comparable<GridDhtPartitionMap2>, E } } + private GridDhtPartitionMap2(UUID nodeId, long updateSeq, AffinityTopologyVersion top, Map<Integer, GridDhtPartitionState> map, int moving) { + this.nodeId = nodeId; + this.updateSeq = updateSeq; + this.top = top; + this.map = map; + this.moving = moving; + } + + public GridDhtPartitionMap2 emptyCopy() { + Map<Integer, GridDhtPartitionState> map = new HashMap<>(); + + return new GridDhtPartitionMap2(nodeId, updateSeq, top, map, moving); + } + /** * Empty constructor required for {@link Externalizable}. */ @@ -174,6 +178,13 @@ public class GridDhtPartitionMap2 implements Comparable<GridDhtPartitionMap2>, E } /** + * @param map Partition states map. + */ + public void map(Map<Integer, GridDhtPartitionState> map) { + this.map = map; + } + + /** * @return Node ID. */ public UUID nodeId() { http://git-wip-us.apache.org/repos/asf/ignite/blob/e291914c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 6a17583..52ed262 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -54,6 +54,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; @@ -64,6 +65,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.S; @@ -933,10 +935,15 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT */ private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) throws IgniteCheckedException { + boolean compress = + node.version().compareToIgnoreTimestamp(GridDhtPartitionsSingleMessage.PART_MAP_COMPRESS_SINCE) >= 0; + GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id, clientOnlyExchange, cctx.versions().last(), - node.version().compareToIgnoreTimestamp(GridDhtPartitionsSingleMessage.PART_MAP_COMPRESS_SINCE) >= 0); + compress); + + Map<Object, T2<Integer,Map<Integer, GridDhtPartitionState>>> dupData = new HashMap<>(); for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (!cacheCtx.isLocal()) { @@ -945,7 +952,23 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0) locMap = new GridDhtPartitionMap(locMap.nodeId(), locMap.updateSequence(), locMap.map()); - m.addLocalPartitionMap(cacheCtx.cacheId(), locMap); + Integer dupDataCache = null; + + if (compress) { + Object affKey = cacheCtx.affinity().affinityCache().similarAffinityKey(); + + T2<Integer, Map<Integer, GridDhtPartitionState>> state0 = dupData.get(affKey); + + if (state0 != null && state0.get2().equals(locMap.map())) { + dupDataCache = state0.get1(); + + locMap.map(Collections.<Integer, GridDhtPartitionState>emptyMap()); + } + else + dupData.put(affKey, new T2<>(cacheCtx.cacheId(), locMap.map())); + } + + m.addLocalPartitionMap(cacheCtx.cacheId(), locMap, dupDataCache); m.partitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters()); } @@ -967,58 +990,12 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT * @param nodes Target nodes. * @return Message; */ - private GridDhtPartitionsFullMessage createPartitionsMessage(Collection<ClusterNode> nodes) { + private GridDhtPartitionsFullMessage createPartitionsMessage(Collection<ClusterNode> nodes, boolean compress) { GridCacheVersion last = lastVer.get(); - GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(exchangeId(), - last != null ? last : cctx.versions().last(), - topologyVersion()); - - boolean useOldApi = false; - boolean compress = true; - - if (nodes != null) { - for (ClusterNode node : nodes) { - if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0) { - useOldApi = true; - compress = false; - - break; - } - else if (node.version().compareToIgnoreTimestamp(GridDhtPartitionsAbstractMessage.PART_MAP_COMPRESS_SINCE) < 0) - compress = false; - } - } - - m.compress(compress); - - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (!cacheCtx.isLocal()) { - AffinityTopologyVersion startTopVer = cacheCtx.startTopologyVersion(); - - boolean ready = startTopVer == null || startTopVer.compareTo(topologyVersion()) <= 0; - - if (ready) { - GridDhtPartitionFullMap locMap = cacheCtx.topology().partitionMap(true); - - if (useOldApi) - locMap = new GridDhtPartitionFullMap(locMap.nodeId(), locMap.nodeOrder(), locMap.updateSequence(), locMap); - - m.addFullPartitionsMap(cacheCtx.cacheId(), locMap); - - m.addPartitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters()); - } - } - } - - // It is important that client topologies be added after contexts. - for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) { - m.addFullPartitionsMap(top.cacheId(), top.partitionMap(true)); - - m.addPartitionUpdateCounters(top.cacheId(), top.updateCounters()); - } - - return m; + return cctx.exchange().createPartitionsMessage(nodes, + exchangeId(), last != null ? last : cctx.versions().last(), + compress); } /** @@ -1026,7 +1003,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT * @throws IgniteCheckedException If failed. */ private void sendAllPartitions(Collection<ClusterNode> nodes) throws IgniteCheckedException { - GridDhtPartitionsFullMessage m = createPartitionsMessage(nodes); + GridDhtPartitionsFullMessage m = createPartitionsMessage(nodes, true); if (log.isDebugEnabled()) log.debug("Sending full partition map [nodeIds=" + F.viewReadOnly(nodes, F.node2id()) + @@ -1244,7 +1221,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT Map<Integer, Map<Integer, List<UUID>>> assignmentChange = fut.get(); - GridDhtPartitionsFullMessage m = createPartitionsMessage(null); + GridDhtPartitionsFullMessage m = createPartitionsMessage(null, false); CacheAffinityChangeMessage msg = new CacheAffinityChangeMessage(exchId, m, assignmentChange); http://git-wip-us.apache.org/repos/asf/ignite/blob/e291914c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index 63d63e2..e5a2828 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@ -22,14 +22,20 @@ import java.nio.ByteBuffer; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.UUID; + import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.GridDirectMap; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.jetbrains.annotations.NotNull; @@ -48,6 +54,10 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa private Map<Integer, GridDhtPartitionFullMap> parts; /** */ + @GridDirectMap(keyType = Integer.class, valueType = Integer.class) + private Map<Integer, Integer> dupPartsData; + + /** */ private byte[] partsBytes; /** Partitions update counters. */ @@ -63,7 +73,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa /** */ @GridDirectTransient - private boolean compress; + private transient boolean compress; /** * Required by {@link Externalizable}. @@ -103,11 +113,29 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa * @param fullMap Full partitions map. */ public void addFullPartitionsMap(int cacheId, GridDhtPartitionFullMap fullMap) { + addFullPartitionsMap(cacheId, fullMap, null); + } + + /** + * @param cacheId Cache ID. + * @param fullMap Full partitions map. + */ + public void addFullPartitionsMap(int cacheId, GridDhtPartitionFullMap fullMap, Integer dupDataCache) { if (parts == null) parts = new HashMap<>(); - if (!parts.containsKey(cacheId)) + if (!parts.containsKey(cacheId)) { parts.put(cacheId, fullMap); + + if (dupDataCache != null) { + assert parts.containsKey(dupDataCache); + + if (dupPartsData == null) + dupPartsData = new HashMap<>(); + + dupPartsData.put(cacheId, dupDataCache); + } + } } /** @@ -197,6 +225,32 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa parts = U.unmarshalZip(ctx.marshaller(), partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); else parts = U.unmarshal(ctx, partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + + if (dupPartsData != null) { + assert parts != null; + + for (Map.Entry<Integer, Integer> e : dupPartsData.entrySet()) { + GridDhtPartitionFullMap map1 = parts.get(e.getKey()); + + assert map1 != null : e.getKey(); + + GridDhtPartitionFullMap map2 = parts.get(e.getValue()); + + assert map2 != null : e.getValue(); + + for (Map.Entry<UUID, GridDhtPartitionMap2> e0 : map1.entrySet()) { + GridDhtPartitionMap2 partMap1 = e0.getValue(); + + assert partMap1.map().isEmpty(); + + GridDhtPartitionMap2 partMap2 = map1.get(e0.getKey()); + + assert partMap2 != null; + + partMap1.map(new HashMap<>(partMap2.map())); + } + } + } } if (parts == null) @@ -229,18 +283,24 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa switch (writer.state()) { case 6: - if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes)) + if (!writer.writeMap("dupPartsData", dupPartsData, MessageCollectionItemType.INT, MessageCollectionItemType.INT)) return false; writer.incrementState(); case 7: - if (!writer.writeByteArray("partsBytes", partsBytes)) + if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes)) return false; writer.incrementState(); case 8: + if (!writer.writeByteArray("partsBytes", partsBytes)) + return false; + + writer.incrementState(); + + case 9: if (!writer.writeMessage("topVer", topVer)) return false; @@ -263,7 +323,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa switch (reader.state()) { case 6: - partCntrsBytes = reader.readByteArray("partCntrsBytes"); + dupPartsData = reader.readMap("dupPartsData", MessageCollectionItemType.INT, MessageCollectionItemType.INT, false); if (!reader.isLastRead()) return false; @@ -271,7 +331,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa reader.incrementState(); case 7: - partsBytes = reader.readByteArray("partsBytes"); + partCntrsBytes = reader.readByteArray("partCntrsBytes"); if (!reader.isLastRead()) return false; @@ -279,6 +339,14 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa reader.incrementState(); case 8: + partsBytes = reader.readByteArray("partsBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 9: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -298,7 +366,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 9; + return 10; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/e291914c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java index a37e092..134a3b2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java @@ -23,12 +23,16 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.GridDirectMap; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.jetbrains.annotations.Nullable; @@ -45,6 +49,10 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes @GridDirectTransient private Map<Integer, GridDhtPartitionMap2> parts; + /** */ + @GridDirectMap(keyType = Integer.class, valueType = Integer.class) + private Map<Integer, Integer> dupPartsData; + /** Serialized partitions. */ private byte[] partsBytes; @@ -60,7 +68,8 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes private boolean client; /** */ - private boolean compress; + @GridDirectTransient + private transient boolean compress; /** * Required by {@link Externalizable}. @@ -104,6 +113,19 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes parts.put(cacheId, locMap); } + public void addLocalPartitionMap(int cacheId, GridDhtPartitionMap2 locMap, Integer dupDataCache) { + addLocalPartitionMap(cacheId, locMap); + + if (dupDataCache != null) { + assert F.isEmpty(locMap.map()); + + if (dupPartsData == null) + dupPartsData = new HashMap<>(); + + dupPartsData.put(cacheId, dupDataCache); + } + } + /** * @param cacheId Cache ID. * @param cntrMap Partition update counters. @@ -192,6 +214,24 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes else partCntrs = U.unmarshal(ctx, partCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); } + + if (dupPartsData != null) { + assert parts != null; + + for (Map.Entry<Integer, Integer> e : dupPartsData.entrySet()) { + GridDhtPartitionMap2 map1 = parts.get(e.getKey()); + + assert map1 != null : e.getKey(); + assert F.isEmpty(map1.map()); + + GridDhtPartitionMap2 map2 = parts.get(e.getValue()); + + assert map2 != null : e.getValue(); + assert map2.map() != null; + + map1.map(new HashMap<>(map2.map())); + } + } } /** {@inheritDoc} */ @@ -216,12 +256,18 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes writer.incrementState(); case 7: - if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes)) + if (!writer.writeMap("dupPartsData", dupPartsData, MessageCollectionItemType.INT, MessageCollectionItemType.INT)) return false; writer.incrementState(); case 8: + if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes)) + return false; + + writer.incrementState(); + + case 9: if (!writer.writeByteArray("partsBytes", partsBytes)) return false; @@ -252,7 +298,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes reader.incrementState(); case 7: - partCntrsBytes = reader.readByteArray("partCntrsBytes"); + dupPartsData = reader.readMap("dupPartsData", MessageCollectionItemType.INT, MessageCollectionItemType.INT, false); if (!reader.isLastRead()) return false; @@ -260,6 +306,14 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes reader.incrementState(); case 8: + partCntrsBytes = reader.readByteArray("partCntrsBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 9: partsBytes = reader.readByteArray("partsBytes"); if (!reader.isLastRead()) @@ -279,7 +333,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 9; + return 10; } /** {@inheritDoc} */
