Repository: ignite Updated Branches: refs/heads/master fa42218e5 -> 129be29e9
ignite-5872 Fixed backward compatibility (cherry picked from commit cca9117) Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/129be29e Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/129be29e Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/129be29e Branch: refs/heads/master Commit: 129be29e96cfaba3bb7645c66981de62646f820c Parents: fa42218 Author: sboikov <[email protected]> Authored: Mon Aug 21 18:39:12 2017 +0300 Committer: sboikov <[email protected]> Committed: Wed Aug 23 12:22:15 2017 +0300 ---------------------------------------------------------------------- .../GridCachePartitionExchangeManager.java | 47 ++++++++--- .../dht/GridClientPartitionTopology.java | 9 +++ .../dht/GridDhtPartitionTopology.java | 5 ++ .../dht/GridDhtPartitionTopologyImpl.java | 5 ++ .../CachePartitionFullCountersMap.java | 36 +++++++++ .../CachePartitionPartialCountersMap.java | 23 ++++++ .../GridDhtPartitionsExchangeFuture.java | 41 +++++++--- .../preloader/GridDhtPartitionsFullMessage.java | 84 +++++++++++++++++--- .../GridDhtPartitionsSingleMessage.java | 23 ++++-- .../IgniteDhtPartitionCountersMap.java | 14 ++-- .../IgniteDhtPartitionCountersMap2.java | 69 ++++++++++++++++ 11 files changed, 315 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/129be29e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 200f677..984721b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -65,6 +65,8 @@ import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCach import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionFullCountersMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.ForceRebalanceExchangeTask; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; @@ -973,7 +975,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana */ private void sendAllPartitions(Collection<ClusterNode> nodes, AffinityTopologyVersion msgTopVer) { - GridDhtPartitionsFullMessage m = createPartitionsFullMessage(true, null, null, null, null); + GridDhtPartitionsFullMessage m = createPartitionsFullMessage(true, false, null, null, null, null); m.topologyVersion(msgTopVer); @@ -1000,6 +1002,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana /** * @param compress {@code True} if possible to compress message (properly work only if prepareMarshall/ * finishUnmarshall methods are called). + * @param newCntrMap {@code True} if possible to use {@link CachePartitionFullCountersMap}. * @param exchId Non-null exchange ID if message is created for exchange. * @param lastVer Last version. * @param partHistSuppliers Partition history suppliers map. @@ -1008,6 +1011,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana */ public GridDhtPartitionsFullMessage createPartitionsFullMessage( boolean compress, + boolean newCntrMap, @Nullable final GridDhtPartitionExchangeId exchId, @Nullable GridCacheVersion lastVer, @Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers, @@ -1046,8 +1050,16 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana affCache.similarAffinityKey()); } - if (exchId != null) - m.addPartitionUpdateCounters(grp.groupId(), grp.topology().fullUpdateCounters()); + if (exchId != null) { + CachePartitionFullCountersMap cntrsMap = grp.topology().fullUpdateCounters(); + + if (newCntrMap) + m.addPartitionUpdateCounters(grp.groupId(), cntrsMap); + else { + m.addPartitionUpdateCounters(grp.groupId(), + CachePartitionFullCountersMap.toCountersMap(cntrsMap)); + } + } } } @@ -1064,8 +1076,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana top.similarAffinityKey()); } - if (exchId != null) - m.addPartitionUpdateCounters(top.groupId(), top.fullUpdateCounters()); + if (exchId != null) { + CachePartitionFullCountersMap cntrsMap = top.fullUpdateCounters(); + + if (newCntrMap) + m.addPartitionUpdateCounters(top.groupId(), cntrsMap); + else + m.addPartitionUpdateCounters(top.groupId(), CachePartitionFullCountersMap.toCountersMap(cntrsMap)); + } } return m; @@ -1119,6 +1137,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana GridDhtPartitionsSingleMessage m = createPartitionsSingleMessage(id, cctx.kernalContext().clientNode(), false, + false, null); if (log.isDebugEnabled()) @@ -1141,12 +1160,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana * @param exchangeId Exchange ID. * @param clientOnlyExchange Client exchange flag. * @param sndCounters {@code True} if need send partition update counters. + * @param newCntrMap {@code True} if possible to use {@link CachePartitionPartialCountersMap}. * @return Message. */ public GridDhtPartitionsSingleMessage createPartitionsSingleMessage( @Nullable GridDhtPartitionExchangeId exchangeId, boolean clientOnlyExchange, boolean sndCounters, + boolean newCntrMap, ExchangeActions exchActions ) { GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(exchangeId, @@ -1167,8 +1188,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana locMap, grp.affinity().similarAffinityKey()); - if (sndCounters) - m.partitionUpdateCounters(grp.groupId(), grp.topology().localUpdateCounters(true)); + if (sndCounters) { + CachePartitionPartialCountersMap cntrsMap = grp.topology().localUpdateCounters(true); + + m.addPartitionUpdateCounters(grp.groupId(), + newCntrMap ? cntrsMap : CachePartitionPartialCountersMap.toCountersMap(cntrsMap)); + } } } @@ -1185,8 +1210,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana locMap, top.similarAffinityKey()); - if (sndCounters) - m.partitionUpdateCounters(top.groupId(), top.localUpdateCounters(true)); + if (sndCounters) { + CachePartitionPartialCountersMap cntrsMap = top.localUpdateCounters(true); + + m.addPartitionUpdateCounters(top.groupId(), + newCntrMap ? cntrsMap : CachePartitionPartialCountersMap.toCountersMap(cntrsMap)); + } } return m; http://git-wip-us.apache.org/repos/asf/ignite/blob/129be29e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index 77792c7..c8856fd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -115,6 +115,9 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { /** */ private volatile DiscoCache discoCache; + /** */ + private final int parts; + /** * @param cctx Context. * @param grpId Group ID. @@ -130,6 +133,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { this.cctx = cctx; this.grpId = grpId; this.similarAffKey = similarAffKey; + this.parts = parts; topVer = AffinityTopologyVersion.NONE; @@ -142,6 +146,11 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { cntrMap = new CachePartitionFullCountersMap(parts); } + /** {@inheritDoc} */ + @Override public int partitions() { + return parts; + } + /** * @return Key to find caches with similar affinity. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/129be29e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index 22205ea..4ae68ef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -43,6 +43,11 @@ import org.jetbrains.annotations.Nullable; @GridToStringExclude public interface GridDhtPartitionTopology { /** + * @return Total cache partitions. + */ + public int partitions(); + + /** * Locks the topology, usually during mapping on locks or transactions. */ public void readLock(); http://git-wip-us.apache.org/repos/asf/ignite/blob/129be29e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 16fe012..f25ae21 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -161,6 +161,11 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** {@inheritDoc} */ + @Override public int partitions() { + return grp.affinityFunction().partitions(); + } + + /** {@inheritDoc} */ @Override public int groupId() { return grp.groupId(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/129be29e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java index 1384a55..ebc993c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java @@ -19,6 +19,9 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; import java.io.Serializable; import java.util.Arrays; +import java.util.Map; +import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.internal.U; /** * @@ -96,4 +99,37 @@ public class CachePartitionFullCountersMap implements Serializable { Arrays.fill(initialUpdCntrs, 0); Arrays.fill(updCntrs, 0); } + + /** + * @param map Full counters map. + * @return Regular java map with counters. + */ + public static Map<Integer, T2<Long, Long>> toCountersMap(CachePartitionFullCountersMap map) { + int partsCnt = map.updCntrs.length; + + Map<Integer, T2<Long, Long>> map0 = U.newHashMap(partsCnt); + + for (int p = 0; p < partsCnt; p++) + map0.put(p, new T2<>(map.initialUpdCntrs[p], map.updCntrs[p])); + + return map0; + } + + /** + * @param map Regular java map with counters. + * @param partsCnt Total cache partitions. + * @return Full counters map. + */ + static CachePartitionFullCountersMap fromCountersMap(Map<Integer, T2<Long, Long>> map, int partsCnt) { + CachePartitionFullCountersMap map0 = new CachePartitionFullCountersMap(partsCnt); + + for (Map.Entry<Integer, T2<Long, Long>> e : map.entrySet()) { + T2<Long, Long> cntrs = e.getValue(); + + map0.initialUpdCntrs[e.getKey()] = cntrs.get1(); + map0.updCntrs[e.getKey()] = cntrs.get2(); + } + + return map0; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/129be29e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java index 851ffed..83c0231 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java @@ -21,8 +21,10 @@ import java.io.Serializable; import java.util.Arrays; import java.util.Collections; import java.util.Map; +import java.util.TreeMap; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteProductVersion; /** * @@ -32,6 +34,9 @@ public class CachePartitionPartialCountersMap implements Serializable { private static final long serialVersionUID = 0L; /** */ + static final IgniteProductVersion PARTIAL_COUNTERS_MAP_SINCE = IgniteProductVersion.fromString("2.1.4"); + + /** */ public static final CachePartitionPartialCountersMap EMPTY = new CachePartitionPartialCountersMap(); /** */ @@ -158,4 +163,22 @@ public class CachePartitionPartialCountersMap implements Serializable { return res; } + + /** + * @param map Partition ID to partition counters map. + * @param partsCnt Total cache partitions. + * @return Partial local counters map. + */ + static CachePartitionPartialCountersMap fromCountersMap(Map<Integer, T2<Long, Long>> map, int partsCnt) { + CachePartitionPartialCountersMap map0 = new CachePartitionPartialCountersMap(partsCnt); + + TreeMap<Integer, T2<Long, Long>> sorted = new TreeMap<>(map); + + for (Map.Entry<Integer, T2<Long, Long>> e : sorted.entrySet()) + map0.add(e.getKey(), e.getValue().get1(), e.getValue().get2()); + + map0.trim(); + + return map0; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/129be29e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index ceb5abc..8e0deb9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -92,6 +92,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.lang.IgniteRunnable; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; @@ -106,6 +107,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; +import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap.PARTIAL_COUNTERS_MAP_SINCE; /** * Future for exchanging partition maps. @@ -1231,6 +1233,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte msg = cctx.exchange().createPartitionsSingleMessage(exchangeId(), false, true, + node.version().compareToIgnoreTimestamp(PARTIAL_COUNTERS_MAP_SINCE) >= 0, exchActions); Map<Integer, Map<Integer, Long>> partHistReserved0 = partHistReserved; @@ -1258,13 +1261,16 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte /** * @param compress Message compress flag. + * @param newCntrMap {@code True} if possible to use {@link CachePartitionFullCountersMap}. * @return Message. */ - private GridDhtPartitionsFullMessage createPartitionsMessage(boolean compress) { + private GridDhtPartitionsFullMessage createPartitionsMessage(boolean compress, + boolean newCntrMap) { GridCacheVersion last = lastVer.get(); GridDhtPartitionsFullMessage m = cctx.exchange().createPartitionsFullMessage( compress, + newCntrMap, exchangeId(), last != null ? last : cctx.versions().last(), partHistSuppliers, @@ -1797,9 +1803,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (finishState0 == null) { assert firstDiscoEvt.type() == EVT_NODE_JOINED && CU.clientNode(firstDiscoEvt.eventNode()) : this; + ClusterNode node = cctx.node(nodeId); + + if (node == null) + return; + finishState0 = new FinishState(cctx.localNodeId(), initialVersion(), - createPartitionsMessage(true)); + createPartitionsMessage(true, node.version().compareToIgnoreTimestamp(PARTIAL_COUNTERS_MAP_SINCE) >= 0)); } sendAllPartitionsToNode(finishState0, msg, nodeId); @@ -1937,7 +1948,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte Map<Integer, Map<Integer, List<UUID>>> assignmentChange = fut.get(); - GridDhtPartitionsFullMessage m = createPartitionsMessage(false); + GridDhtPartitionsFullMessage m = createPartitionsMessage(false, false); CacheAffinityChangeMessage msg = new CacheAffinityChangeMessage(exchId, m, assignmentChange); @@ -1959,7 +1970,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte Map<Integer, Long> minCntrs = new HashMap<>(); for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : msgs.entrySet()) { - CachePartitionPartialCountersMap nodeCntrs = e.getValue().partitionUpdateCounters(top.groupId()); + CachePartitionPartialCountersMap nodeCntrs = e.getValue().partitionUpdateCounters(top.groupId(), + top.partitions()); assert nodeCntrs != null; @@ -2235,7 +2247,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte GridDhtPartitionTopology top = grp != null ? grp.topology() : cctx.exchange().clientTopology(grpId); - CachePartitionPartialCountersMap cntrs = msg.partitionUpdateCounters(grpId); + CachePartitionPartialCountersMap cntrs = msg.partitionUpdateCounters(grpId, + top.partitions()); if (cntrs != null) top.collectUpdateCounters(cntrs); @@ -2283,7 +2296,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte cctx.versions().onExchange(lastVer.get().order()); - GridDhtPartitionsFullMessage msg = createPartitionsMessage(true); + IgniteProductVersion minVer = exchCtx.events().discoveryCache().minimumNodeVersion(); + + GridDhtPartitionsFullMessage msg = createPartitionsMessage(true, + minVer.compareToIgnoreTimestamp(PARTIAL_COUNTERS_MAP_SINCE) >= 0); if (exchCtx.mergeExchanges()) { assert !centralizedAff; @@ -2571,6 +2587,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte msg.restoreExchangeId(), cctx.kernalContext().clientNode(), true, + node.version().compareToIgnoreTimestamp(PARTIAL_COUNTERS_MAP_SINCE) >= 0, exchActions); if (localJoinExchange() && finishState0 == null) @@ -2745,11 +2762,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : msg.partitions().entrySet()) { Integer grpId = entry.getKey(); - CachePartitionFullCountersMap cntrMap = msg.partitionUpdateCounters(grpId); - CacheGroupContext grp = cctx.cache().cacheGroup(grpId); if (grp != null) { + CachePartitionFullCountersMap cntrMap = msg.partitionUpdateCounters(grpId, + grp.topology().partitions()); + grp.topology().update(resTopVer, entry.getValue(), cntrMap, @@ -2760,7 +2778,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE); if (oldest != null && oldest.isLocal()) { - cctx.exchange().clientTopology(grpId).update(resTopVer, + GridDhtPartitionTopology top = cctx.exchange().clientTopology(grpId); + + CachePartitionFullCountersMap cntrMap = msg.partitionUpdateCounters(grpId, + top.partitions()); + + top.update(resTopVer, entry.getValue(), cntrMap, Collections.<Integer>emptySet(), http://git-wip-us.apache.org/repos/asf/ignite/blob/129be29e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index 2bb19cd..edbfc23 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@ -33,6 +33,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; @@ -68,6 +69,14 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa /** Serialized partitions counters. */ private byte[] partCntrsBytes; + /** Partitions update counters. */ + @GridToStringInclude + @GridDirectTransient + private IgniteDhtPartitionCountersMap2 partCntrs2; + + /** Serialized partitions counters. */ + private byte[] partCntrsBytes2; + /** Partitions history suppliers. */ @GridToStringInclude @GridDirectTransient @@ -149,6 +158,8 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa cp.partsBytes = partsBytes; cp.partCntrs = partCntrs; cp.partCntrsBytes = partCntrsBytes; + cp.partCntrs2 = partCntrs2; + cp.partCntrsBytes2 = partCntrsBytes2; cp.partHistSuppliers = partHistSuppliers; cp.partHistSuppliersBytes = partHistSuppliersBytes; cp.partsToReload = partsToReload; @@ -275,7 +286,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa * @param grpId Cache group ID. * @param cntrMap Partition update counters. */ - public void addPartitionUpdateCounters(int grpId, CachePartitionFullCountersMap cntrMap) { + public void addPartitionUpdateCounters(int grpId, Map<Integer, T2<Long, Long>> cntrMap) { if (partCntrs == null) partCntrs = new IgniteDhtPartitionCountersMap(); @@ -284,10 +295,30 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa /** * @param grpId Cache group ID. + * @param cntrMap Partition update counters. + */ + public void addPartitionUpdateCounters(int grpId, CachePartitionFullCountersMap cntrMap) { + if (partCntrs2 == null) + partCntrs2 = new IgniteDhtPartitionCountersMap2(); + + partCntrs2.putIfAbsent(grpId, cntrMap); + } + + /** + * @param grpId Cache group ID. + * @param partsCnt Total cache partitions. * @return Partition update counters. */ - public CachePartitionFullCountersMap partitionUpdateCounters(int grpId) { - return partCntrs == null ? null : partCntrs.get(grpId); + public CachePartitionFullCountersMap partitionUpdateCounters(int grpId, int partsCnt) { + if (partCntrs2 != null) + return partCntrs2.get(grpId); + + if (partCntrs == null) + return null; + + Map<Integer, T2<Long, Long>> map = partCntrs.get(grpId); + + return map != null ? CachePartitionFullCountersMap.fromCountersMap(map, partsCnt) : null; } /** @@ -327,6 +358,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa boolean marshal = (!F.isEmpty(parts) && partsBytes == null) || (partCntrs != null && !partCntrs.empty() && partCntrsBytes == null) || + (partCntrs2 != null && !partCntrs2.empty() && partCntrsBytes2 == null) || (partHistSuppliers != null && partHistSuppliersBytes == null) || (partsToReload != null && partsToReloadBytes == null) || (!F.isEmpty(errs) && errsBytes == null); @@ -334,6 +366,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa if (marshal) { byte[] partsBytes0 = null; byte[] partCntrsBytes0 = null; + byte[] partCntrsBytes20 = null; byte[] partHistSuppliersBytes0 = null; byte[] partsToReloadBytes0 = null; byte[] errsBytes0 = null; @@ -344,6 +377,9 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa if (partCntrs != null && !partCntrs.empty() && partCntrsBytes == null) partCntrsBytes0 = U.marshal(ctx, partCntrs); + if (partCntrs2 != null && !partCntrs2.empty() && partCntrsBytes2 == null) + partCntrsBytes20 = U.marshal(ctx, partCntrs2); + if (partHistSuppliers != null && partHistSuppliersBytes == null) partHistSuppliersBytes0 = U.marshal(ctx, partHistSuppliers); @@ -359,12 +395,14 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa try { byte[] partsBytesZip = U.zip(partsBytes0); byte[] partCntrsBytesZip = U.zip(partCntrsBytes0); + byte[] partCntrsBytes2Zip = U.zip(partCntrsBytes20); byte[] partHistSuppliersBytesZip = U.zip(partHistSuppliersBytes0); byte[] partsToReloadBytesZip = U.zip(partsToReloadBytes0); byte[] exsBytesZip = U.zip(errsBytes0); partsBytes0 = partsBytesZip; partCntrsBytes0 = partCntrsBytesZip; + partCntrsBytes20 = partCntrsBytes2Zip; partHistSuppliersBytes0 = partHistSuppliersBytesZip; partsToReloadBytes0 = partsToReloadBytesZip; errsBytes0 = exsBytesZip; @@ -378,6 +416,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa partsBytes = partsBytes0; partCntrsBytes = partCntrsBytes0; + partCntrsBytes2 = partCntrsBytes20; partHistSuppliersBytes = partHistSuppliersBytes0; partsToReloadBytes = partsToReloadBytes0; errsBytes = errsBytes0; @@ -446,6 +485,13 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa partCntrs = U.unmarshal(ctx, partCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); } + if (partCntrsBytes2 != null && partCntrs2 == null) { + if (compressed()) + partCntrs2 = U.unmarshalZip(ctx.marshaller(), partCntrsBytes2, U.resolveClassLoader(ldr, ctx.gridConfig())); + else + partCntrs2 = U.unmarshal(ctx, partCntrsBytes2, U.resolveClassLoader(ldr, ctx.gridConfig())); + } + if (partHistSuppliersBytes != null && partHistSuppliers == null) { if (compressed()) partHistSuppliers = U.unmarshalZip(ctx.marshaller(), partHistSuppliersBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); @@ -520,30 +566,36 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa writer.incrementState(); case 10: - if (!writer.writeByteArray("partHistSuppliersBytes", partHistSuppliersBytes)) + if (!writer.writeByteArray("partCntrsBytes2", partCntrsBytes2)) return false; writer.incrementState(); case 11: - if (!writer.writeByteArray("partsBytes", partsBytes)) + if (!writer.writeByteArray("partHistSuppliersBytes", partHistSuppliersBytes)) return false; writer.incrementState(); case 12: - if (!writer.writeByteArray("partsToReloadBytes", partsToReloadBytes)) + if (!writer.writeByteArray("partsBytes", partsBytes)) return false; writer.incrementState(); case 13: - if (!writer.writeMessage("resTopVer", resTopVer)) + if (!writer.writeByteArray("partsToReloadBytes", partsToReloadBytes)) return false; writer.incrementState(); case 14: + if (!writer.writeMessage("resTopVer", resTopVer)) + return false; + + writer.incrementState(); + + case 15: if (!writer.writeMessage("topVer", topVer)) return false; @@ -606,7 +658,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa reader.incrementState(); case 10: - partHistSuppliersBytes = reader.readByteArray("partHistSuppliersBytes"); + partCntrsBytes2 = reader.readByteArray("partCntrsBytes2"); if (!reader.isLastRead()) return false; @@ -614,7 +666,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa reader.incrementState(); case 11: - partsBytes = reader.readByteArray("partsBytes"); + partHistSuppliersBytes = reader.readByteArray("partHistSuppliersBytes"); if (!reader.isLastRead()) return false; @@ -622,7 +674,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa reader.incrementState(); case 12: - partsToReloadBytes = reader.readByteArray("partsToReloadBytes"); + partsBytes = reader.readByteArray("partsBytes"); if (!reader.isLastRead()) return false; @@ -630,7 +682,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa reader.incrementState(); case 13: - resTopVer = reader.readMessage("resTopVer"); + partsToReloadBytes = reader.readByteArray("partsToReloadBytes"); if (!reader.isLastRead()) return false; @@ -638,6 +690,14 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa reader.incrementState(); case 14: + resTopVer = reader.readMessage("resTopVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 15: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -657,7 +717,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 15; + return 16; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/129be29e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java index 44815ca..215152d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java @@ -32,6 +32,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; @@ -61,7 +62,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes /** Partitions update counters. */ @GridToStringInclude @GridDirectTransient - private Map<Integer, CachePartitionPartialCountersMap> partCntrs; + private Map<Integer, Object> partCntrs; /** Serialized partitions counters. */ private byte[] partCntrsBytes; @@ -189,7 +190,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes * @param grpId Cache group ID. * @param cntrMap Partition update counters. */ - public void partitionUpdateCounters(int grpId, CachePartitionPartialCountersMap cntrMap) { + public void addPartitionUpdateCounters(int grpId, Object cntrMap) { if (partCntrs == null) partCntrs = new HashMap<>(); @@ -198,12 +199,24 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes /** * @param grpId Cache group ID. + * @param partsCnt Total cache partitions. * @return Partition update counters. */ - public CachePartitionPartialCountersMap partitionUpdateCounters(int grpId) { - CachePartitionPartialCountersMap res = partCntrs == null ? null : partCntrs.get(grpId); + @SuppressWarnings("unchecked") + public CachePartitionPartialCountersMap partitionUpdateCounters(int grpId, int partsCnt) { + Object res = partCntrs == null ? null : partCntrs.get(grpId); - return res == null ? CachePartitionPartialCountersMap.EMPTY : res; + if (res == null) + return CachePartitionPartialCountersMap.EMPTY; + + if (res instanceof CachePartitionPartialCountersMap) + return (CachePartitionPartialCountersMap)res; + + assert res instanceof Map : res; + + Map<Integer, T2<Long, Long>> map = (Map<Integer, T2<Long, Long>>)res; + + return CachePartitionPartialCountersMap.fromCountersMap(map, partsCnt); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/129be29e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java index e7954d9..dc2fbf8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java @@ -19,8 +19,10 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; import java.io.Serializable; +import java.util.Collections; import java.util.HashMap; import java.util.Map; +import org.apache.ignite.internal.util.typedef.T2; /** * Partition counters map. @@ -30,7 +32,7 @@ public class IgniteDhtPartitionCountersMap implements Serializable { private static final long serialVersionUID = 0L; /** */ - private Map<Integer, CachePartitionFullCountersMap> map; + private Map<Integer, Map<Integer, T2<Long, Long>>> map; /** * @return {@code True} if map is empty. @@ -43,7 +45,7 @@ public class IgniteDhtPartitionCountersMap implements Serializable { * @param cacheId Cache ID. * @param cntrMap Counters map. */ - public synchronized void putIfAbsent(int cacheId, CachePartitionFullCountersMap cntrMap) { + public synchronized void putIfAbsent(int cacheId, Map<Integer, T2<Long, Long>> cntrMap) { if (map == null) map = new HashMap<>(); @@ -55,14 +57,14 @@ public class IgniteDhtPartitionCountersMap implements Serializable { * @param cacheId Cache ID. * @return Counters map. */ - public synchronized CachePartitionFullCountersMap get(int cacheId) { + public synchronized Map<Integer, T2<Long, Long>> get(int cacheId) { if (map == null) - return null; + map = new HashMap<>(); - CachePartitionFullCountersMap cntrMap = map.get(cacheId); + Map<Integer, T2<Long, Long>> cntrMap = map.get(cacheId); if (cntrMap == null) - return null; + return Collections.emptyMap(); return cntrMap; } http://git-wip-us.apache.org/repos/asf/ignite/blob/129be29e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap2.java new file mode 100644 index 0000000..d1e6d99 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap2.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +/** + * Partition counters map. + */ +public class IgniteDhtPartitionCountersMap2 implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private Map<Integer, CachePartitionFullCountersMap> map; + + /** + * @return {@code True} if map is empty. + */ + public synchronized boolean empty() { + return map == null || map.isEmpty(); + } + + /** + * @param cacheId Cache ID. + * @param cntrMap Counters map. + */ + public synchronized void putIfAbsent(int cacheId, CachePartitionFullCountersMap cntrMap) { + if (map == null) + map = new HashMap<>(); + + if (!map.containsKey(cacheId)) + map.put(cacheId, cntrMap); + } + + /** + * @param cacheId Cache ID. + * @return Counters map. + */ + public synchronized CachePartitionFullCountersMap get(int cacheId) { + if (map == null) + return null; + + CachePartitionFullCountersMap cntrMap = map.get(cacheId); + + if (cntrMap == null) + return null; + + return cntrMap; + } +}
