Repository: ignite Updated Branches: refs/heads/ignite-5872 [created] 37e7aca66
IGNITE-5872 - Replace counters map with arrays Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5a5a4778 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5a5a4778 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5a5a4778 Branch: refs/heads/ignite-5872 Commit: 5a5a47788ff1a2f16e0512f435ad3eba1fcb72c6 Parents: d2cb2f7 Author: Alexey Goncharuk <[email protected]> Authored: Mon Aug 7 14:05:28 2017 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Mon Aug 7 14:05:28 2017 +0300 ---------------------------------------------------------------------- .../cache/CacheAffinitySharedManager.java | 2 +- .../GridCachePartitionExchangeManager.java | 12 +- .../cache/IgniteCacheOffheapManager.java | 2 +- .../cache/IgniteCacheOffheapManagerImpl.java | 7 +- .../dht/GridClientPartitionTopology.java | 55 +++---- .../dht/GridDhtPartitionTopology.java | 25 +++- .../dht/GridDhtPartitionTopologyImpl.java | 143 ++++++++++-------- .../CachePartitionFullCountersMap.java | 105 ++++++++++++++ .../CachePartitionPartialCountersMap.java | 145 +++++++++++++++++++ .../GridDhtPartitionsAbstractMessage.java | 8 - .../GridDhtPartitionsExchangeFuture.java | 26 ++-- .../preloader/GridDhtPartitionsFullMessage.java | 13 +- .../GridDhtPartitionsSingleMessage.java | 21 +-- .../GridDhtPartitionsSingleRequest.java | 8 - .../IgniteDhtPartitionCountersMap.java | 14 +- .../persistence/GridCacheOffheapManager.java | 2 +- .../continuous/GridContinuousProcessor.java | 7 +- ...ContinuousQueryFailoverAbstractSelfTest.java | 10 +- 18 files changed, 434 insertions(+), 171 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/5a5a4778/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 5d573b2..9762586 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -441,7 +441,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (clientTop != null) { grp.topology().update(topVer, clientTop.partitionMap(true), - clientTop.updateCounters(false), + clientTop.fullUpdateCounters(), Collections.<Integer>emptySet(), null); } http://git-wip-us.apache.org/repos/asf/ignite/blob/5a5a4778/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index d67d81d..e18eb55 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -962,8 +962,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana * finishUnmarshall methods are called). * @param exchId Non-null exchange ID if message is created for exchange. * @param lastVer Last version. - * @param partHistSuppliers - * @param partsToReload + * @param partHistSuppliers Partition history suppliers map. + * @param partsToReload Partitions to reload map. * @return Message. */ public GridDhtPartitionsFullMessage createPartitionsFullMessage( @@ -1007,7 +1007,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } if (exchId != null) - m.addPartitionUpdateCounters(grp.groupId(), grp.topology().updateCounters(true)); + m.addPartitionUpdateCounters(grp.groupId(), grp.topology().fullUpdateCounters()); } } @@ -1025,7 +1025,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } if (exchId != null) - m.addPartitionUpdateCounters(top.groupId(), top.updateCounters(true)); + m.addPartitionUpdateCounters(top.groupId(), top.fullUpdateCounters()); } return m; @@ -1128,7 +1128,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana grp.affinity().similarAffinityKey()); if (sndCounters) - m.partitionUpdateCounters(grp.groupId(), grp.topology().updateCounters(true)); + m.partitionUpdateCounters(grp.groupId(), grp.topology().localUpdateCounters()); } } @@ -1146,7 +1146,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana top.similarAffinityKey()); if (sndCounters) - m.partitionUpdateCounters(top.groupId(), top.updateCounters(true)); + m.partitionUpdateCounters(top.groupId(), top.localUpdateCounters()); } return m; http://git-wip-us.apache.org/repos/asf/ignite/blob/5a5a4778/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java index 001848e..4531802 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java @@ -414,7 +414,7 @@ public interface IgniteCacheOffheapManager { /** * @return Initial update counter. */ - public Long initialUpdateCounter(); + public long initialUpdateCounter(); /** * @param cctx Cache context. http://git-wip-us.apache.org/repos/asf/ignite/blob/5a5a4778/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index ba6c89d..9e48d45 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -1054,8 +1054,8 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager /** */ private final ConcurrentMap<Integer, AtomicLong> cacheSizes = new ConcurrentHashMap<>(); - /** Initialized update counter. */ - protected Long initCntr = 0L; + /** Initial update counter. */ + protected long initCntr; /** * @param partId Partition number. @@ -1600,7 +1600,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager } /** {@inheritDoc} */ - @Override public Long initialUpdateCounter() { + @Override public long initialUpdateCounter() { return initCntr; } @@ -1629,7 +1629,6 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager * @param key Key. * @param oldVal Old value. * @param newVal New value. - * @throws IgniteCheckedException If failed. */ private void updateIgfsMetrics( GridCacheContext cctx, http://git-wip-us.apache.org/repos/asf/ignite/blob/5a5a4778/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index 4b9826e..cf45120 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -37,6 +37,8 @@ import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionFullCountersMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap; @@ -46,7 +48,6 @@ import org.apache.ignite.internal.util.GridAtomicLong; import org.apache.ignite.internal.util.GridPartitionStateMap; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.Nullable; @@ -104,7 +105,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); /** Partition update counters. */ - private Map<Integer, T2<Long, Long>> cntrMap = new HashMap<>(); + private CachePartitionFullCountersMap cntrMap = new CachePartitionFullCountersMap(); /** */ private final Object similarAffKey; @@ -593,7 +594,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { @Override public boolean update( @Nullable AffinityTopologyVersion exchangeVer, GridDhtPartitionFullMap partMap, - Map<Integer, T2<Long, Long>> cntrMap, + @Nullable CachePartitionFullCountersMap cntrMap, Set<Integer> partsToReload, @Nullable AffinityTopologyVersion msgTopVer) { if (log.isDebugEnabled()) @@ -696,7 +697,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } if (cntrMap != null) - this.cntrMap = new HashMap<>(cntrMap); + this.cntrMap = new CachePartitionFullCountersMap(cntrMap); consistencyCheck(); @@ -711,17 +712,22 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } /** {@inheritDoc} */ - @Override public void applyUpdateCounters(Map<Integer, T2<Long, Long>> cntrMap) { + @Override public void collectUpdateCounters(CachePartitionPartialCountersMap cntrMap) { assert cntrMap != null; lock.writeLock().lock(); try { - for (Map.Entry<Integer, T2<Long, Long>> e : cntrMap.entrySet()) { - T2<Long, Long> cntr = this.cntrMap.get(e.getKey()); + for (int i = 0; i < cntrMap.size(); i++) { + int pId = cntrMap.partitionAt(i); - if (cntr == null || cntr.get2() < e.getValue().get2()) - this.cntrMap.put(e.getKey(), e.getValue()); + long initialUpdateCntr = cntrMap.initialUpdateCounterAt(i); + long updateCntr = cntrMap.updateCounterAt(i); + + if (this.cntrMap.updateCounter(pId) < updateCntr) { + this.cntrMap.initialUpdateCounter(pId, initialUpdateCntr); + this.cntrMap.updateCounter(pId, updateCntr); + } } } finally { @@ -729,6 +735,11 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } } + /** {@inheritDoc} */ + @Override public void applyUpdateCounters() { + // No-op on client topology. + } + /** * Method checks is new partition map more stale than current partition map * New partition map is stale if topology version or update sequence are less than of current map @@ -1043,33 +1054,23 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } /** {@inheritDoc} */ - @Override public Map<Integer, T2<Long, Long>> updateCounters(boolean skipZeros) { + @Override public CachePartitionFullCountersMap fullUpdateCounters() { lock.readLock().lock(); try { - if (skipZeros) { - Map<Integer, T2<Long, Long>> res = U.newHashMap(cntrMap.size()); - - for (Map.Entry<Integer, T2<Long, Long>> e : cntrMap.entrySet()) { - T2<Long, Long> val = e.getValue(); - - if (val.get1() == 0L && val.get2() == 0L) - continue; - - res.put(e.getKey(), e.getValue()); - } - - return res; - } - else - return new HashMap<>(cntrMap); -} + return new CachePartitionFullCountersMap(cntrMap); + } finally { lock.readLock().unlock(); } } /** {@inheritDoc} */ + @Override public CachePartitionPartialCountersMap localUpdateCounters() { + return CachePartitionPartialCountersMap.EMPTY; + } + + /** {@inheritDoc} */ @Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) { assert false : "Should not be called on non-affinity node"; http://git-wip-us.apache.org/repos/asf/ignite/blob/5a5a4778/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index 8688c4f..bab9030 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import java.util.Collection; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; @@ -29,12 +28,13 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionFullCountersMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.util.tostring.GridToStringExclude; -import org.apache.ignite.internal.util.typedef.T2; import org.jetbrains.annotations.Nullable; /** @@ -249,7 +249,7 @@ public interface GridDhtPartitionTopology { * means full map received is not related to exchange * @param partMap Update partition map. * @param cntrMap Partition update counters. - * @param partsToReload + * @param partsToReload Set of partitions that need to be reloaded. * @param msgTopVer Topology version from incoming message. This value is not null only for case message is not * related to exchange. Value should be not less than previous 'Topology version from exchange'. * @return {@code True} if local state was changed. @@ -257,7 +257,7 @@ public interface GridDhtPartitionTopology { public boolean update( @Nullable AffinityTopologyVersion exchangeVer, GridDhtPartitionFullMap partMap, - @Nullable Map<Integer, T2<Long, Long>> cntrMap, + @Nullable CachePartitionFullCountersMap cntrMap, Set<Integer> partsToReload, @Nullable AffinityTopologyVersion msgTopVer); @@ -270,9 +270,16 @@ public interface GridDhtPartitionTopology { GridDhtPartitionMap parts); /** + * Collects update counters collected during exchange. Called on coordinator. + * * @param cntrMap Counters map. */ - public void applyUpdateCounters(Map<Integer, T2<Long, Long>> cntrMap); + public void collectUpdateCounters(CachePartitionPartialCountersMap cntrMap); + + /** + * Applies update counters collected during exchange on coordinator. Called on coordinator. + */ + public void applyUpdateCounters(); /** * Checks if there is at least one owner for each partition in the cache topology. @@ -296,10 +303,14 @@ public interface GridDhtPartitionTopology { public Collection<Integer> lostPartitions(); /** - * @param skipZeros If {@code true} then filters out zero counters. * @return Partition update counters. */ - public Map<Integer, T2<Long, Long>> updateCounters(boolean skipZeros); + public CachePartitionFullCountersMap fullUpdateCounters(); + + /** + * @return Partition update counters. + */ + public CachePartitionPartialCountersMap localUpdateCounters(); /** * @param part Partition to own. http://git-wip-us.apache.org/repos/asf/ignite/blob/5a5a4778/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index d7a224c..ff6bf7f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -42,6 +42,8 @@ import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionFullCountersMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap; @@ -52,7 +54,6 @@ import org.apache.ignite.internal.util.GridPartitionStateMap; import org.apache.ignite.internal.util.StripedCompositeReadWriteLock; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.LT; @@ -130,7 +131,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { private final StripedCompositeReadWriteLock lock = new StripedCompositeReadWriteLock(16); /** Partition update counter. */ - private Map<Integer, T2<Long, Long>> cntrMap = new HashMap<>(); + private final CachePartitionFullCountersMap cntrMap; /** */ private volatile AffinityTopologyVersion rebalancedTopVer = AffinityTopologyVersion.NONE; @@ -139,8 +140,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { * @param ctx Cache shared context. * @param grp Cache group. */ - public GridDhtPartitionTopologyImpl(GridCacheSharedContext ctx, - CacheGroupContext grp) { + public GridDhtPartitionTopologyImpl( + GridCacheSharedContext ctx, + CacheGroupContext grp + ) { assert ctx != null; assert grp != null; @@ -152,6 +155,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { timeLog = ctx.logger(GridDhtPartitionsExchangeFuture.EXCHANGE_LOG); locParts = new AtomicReferenceArray<>(grp.affinityFunction().partitions()); + + cntrMap = new CachePartitionFullCountersMap(locParts.length()); } /** {@inheritDoc} */ @@ -662,10 +667,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (loc == null || loc.state() == EVICTED) { locParts.set(p, loc = new GridDhtLocalPartition(ctx, grp, p)); - T2<Long, Long> cntr = cntrMap.get(p); + long updCntr = cntrMap.updateCounter(p); - if (cntr != null) - loc.updateCounter(cntr.get2()); + if (updCntr != 0) + loc.updateCounter(updCntr); if (ctx.pageStore() != null) { try { @@ -1102,7 +1107,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { @Override public boolean update( @Nullable AffinityTopologyVersion exchangeVer, GridDhtPartitionFullMap partMap, - @Nullable Map<Integer, T2<Long, Long>> incomeCntrMap, + @Nullable CachePartitionFullCountersMap incomeCntrMap, Set<Integer> partsToReload, @Nullable AffinityTopologyVersion msgTopVer) { if (log.isDebugEnabled()) @@ -1117,14 +1122,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { return false; if (incomeCntrMap != null) { - // update local map partition counters - for (Map.Entry<Integer, T2<Long, Long>> e : incomeCntrMap.entrySet()) { - T2<Long, Long> existCntr = this.cntrMap.get(e.getKey()); - - if (existCntr == null || existCntr.get2() < e.getValue().get2()) - this.cntrMap.put(e.getKey(), e.getValue()); - } - // update local counters in partitions for (int i = 0; i < locParts.length(); i++) { GridDhtLocalPartition part = locParts.get(i); @@ -1132,10 +1129,12 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (part == null) continue; - T2<Long, Long> cntr = incomeCntrMap.get(part.id()); + if (part.state() == OWNING || part.state() == MOVING) { + long updCntr = incomeCntrMap.updateCounter(part.id()); - if (cntr != null) - part.updateCounter(cntr.get2()); + if (updCntr != 0 && updCntr > part.updateCounter()) + part.updateCounter(updCntr); + } } } @@ -1255,13 +1254,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { assert locPart != null; - if (incomeCntrMap != null) { - T2<Long, Long> cntr = incomeCntrMap.get(p); - - if (cntr != null && cntr.get2() > locPart.updateCounter()) - locPart.updateCounter(cntr.get2()); - } - if (locPart.state() == MOVING) { boolean success = locPart.own(); @@ -1281,13 +1273,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { changed = true; } - - if (incomeCntrMap != null) { - T2<Long, Long> cntr = incomeCntrMap.get(p); - - if (cntr != null && cntr.get2() > locPart.updateCounter()) - locPart.updateCounter(cntr.get2()); - } } else if (state == RENTING && partsToReload.contains(p)) { GridDhtLocalPartition locPart = locParts.get(p); @@ -1337,7 +1322,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** {@inheritDoc} */ - @Override public void applyUpdateCounters(Map<Integer, T2<Long, Long>> cntrMap) { + @Override public void collectUpdateCounters(CachePartitionPartialCountersMap cntrMap) { assert cntrMap != null; long now = U.currentTimeMillis(); @@ -1356,25 +1341,55 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (stopping) return; - for (Map.Entry<Integer, T2<Long, Long>> e : cntrMap.entrySet()) { - T2<Long, Long> cntr = this.cntrMap.get(e.getKey()); + for (int i = 0; i < cntrMap.size(); i++) { + int pId = cntrMap.partitionAt(i); + + long initialUpdateCntr = cntrMap.initialUpdateCounterAt(i); + long updateCntr = cntrMap.updateCounterAt(i); + + if (this.cntrMap.updateCounter(pId) < updateCntr) { + this.cntrMap.initialUpdateCounter(pId, initialUpdateCntr); + this.cntrMap.updateCounter(pId, updateCntr); + } + } + } + finally { + lock.writeLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public void applyUpdateCounters() { + long now = U.currentTimeMillis(); + + lock.writeLock().lock(); + + try { + long acquired = U.currentTimeMillis(); - if (cntr == null || cntr.get2() < e.getValue().get2()) - this.cntrMap.put(e.getKey(), e.getValue()); + if (acquired - now >= 100) { + if (timeLog.isInfoEnabled()) + timeLog.info("Waited too long to acquire topology write lock " + + "[cache=" + grp.groupId() + ", waitTime=" + (acquired - now) + ']'); } + if (stopping) + return; + for (int i = 0; i < locParts.length(); i++) { GridDhtLocalPartition part = locParts.get(i); if (part == null) continue; - T2<Long, Long> cntr = cntrMap.get(part.id()); + long updCntr = cntrMap.updateCounter(part.id()); - if (cntr != null && cntr.get2() > part.updateCounter()) - part.updateCounter(cntr.get2()); - else if (part.updateCounter() > 0) - this.cntrMap.put(part.id(), new T2<>(part.initialUpdateCounter(), part.updateCounter())); + if (updCntr > part.updateCounter()) + part.updateCounter(updCntr); + else if (part.updateCounter() > 0) { + cntrMap.initialUpdateCounter(part.id(), part.initialUpdateCounter()); + cntrMap.updateCounter(part.id(), part.updateCounter()); + } } } finally { @@ -2046,26 +2061,32 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** {@inheritDoc} */ - @Override public Map<Integer, T2<Long, Long>> updateCounters(boolean skipZeros) { + @Override public CachePartitionFullCountersMap fullUpdateCounters() { lock.readLock().lock(); try { - Map<Integer, T2<Long, Long>> res; + return new CachePartitionFullCountersMap(cntrMap); + } + finally { + lock.readLock().unlock(); + } + } - if (skipZeros) { - res = U.newHashMap(cntrMap.size()); + /** {@inheritDoc} */ + @Override public CachePartitionPartialCountersMap localUpdateCounters() { + lock.readLock().lock(); - for (Map.Entry<Integer, T2<Long, Long>> e : cntrMap.entrySet()) { - Long cntr = e.getValue().get2(); + try { + int locPartCnt = 0; - if (ZERO.equals(cntr)) - continue; + for (int i = 0; i < locParts.length(); i++) { + GridDhtLocalPartition part = locParts.get(i); - res.put(e.getKey(), e.getValue()); - } + if (part != null) + locPartCnt++; } - else - res = new HashMap<>(cntrMap); + + CachePartitionPartialCountersMap res = new CachePartitionPartialCountersMap(locPartCnt); for (int i = 0; i < locParts.length(); i++) { GridDhtLocalPartition part = locParts.get(i); @@ -2073,15 +2094,13 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (part == null) continue; - T2<Long, Long> cntr0 = res.get(part.id()); - Long initCntr = part.initialUpdateCounter(); + long updCntr = part.updateCounter(); + long initCntr = part.initialUpdateCounter(); - if (cntr0 == null || initCntr >= cntr0.get1()) { - if (skipZeros && initCntr == 0L && part.updateCounter() == 0L) - continue; + if (initCntr == 0L && updCntr == 0L) + continue; - res.put(part.id(), new T2<>(initCntr, part.updateCounter())); - } + res.add(part.id(), initCntr, updCntr); } return res; http://git-wip-us.apache.org/repos/asf/ignite/blob/5a5a4778/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java new file mode 100644 index 0000000..0f0d62d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; + +import java.util.Arrays; + +/** + * + */ +public class CachePartitionFullCountersMap { + /** */ + public static final CachePartitionFullCountersMap EMPTY = new CachePartitionFullCountersMap(); + + /** */ + private long[] initialUpdCntrs; + + /** */ + private long[] updCntrs; + + /** + * + */ + public CachePartitionFullCountersMap() { + // Empty map. + } + + /** + * @param other Map to copy. + */ + public CachePartitionFullCountersMap(CachePartitionFullCountersMap other) { + initialUpdCntrs = Arrays.copyOf(other.initialUpdCntrs, other.initialUpdCntrs.length); + updCntrs = Arrays.copyOf(other.updCntrs, other.updCntrs.length); + } + + /** + * @param partsCnt Total number of partitions. + */ + public CachePartitionFullCountersMap(int partsCnt) { + initialUpdCntrs = new long[partsCnt]; + updCntrs = new long[partsCnt]; + } + + /** + * Gets an initial update counter by the partition ID. + * + * @param p Partition ID. + * @return Initial update counter for the partition with the given ID. + */ + public long initialUpdateCounter(int p) { + return initialUpdCntrs[p]; + } + + /** + * Gets an update counter by the partition ID. + * + * @param p Partition ID. + * @return Update counter for the partition with the given ID. + */ + public long updateCounter(int p) { + return updCntrs[p]; + } + + /** + * Sets an initial update counter by the partition ID. + * + * @param p Partition ID. + * @param initialUpdCntr Initial update counter to set. + */ + public void initialUpdateCounter(int p, long initialUpdCntr) { + initialUpdCntrs[p] = initialUpdCntr; + } + + /** + * Sets an update counter by the partition ID. + * + * @param p Partition ID. + * @param updCntr Update counter to set. + */ + public void updateCounter(int p, long updCntr) { + updCntrs[p] = updCntr; + } + + /** + * Clears full counters map. + */ + public void clear() { + Arrays.fill(initialUpdCntrs, 0); + Arrays.fill(updCntrs, 0); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5a5a4778/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java new file mode 100644 index 0000000..c6578d3 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** + * + */ +public class CachePartitionPartialCountersMap { + /** */ + public static final CachePartitionPartialCountersMap EMPTY = new CachePartitionPartialCountersMap(); + + /** */ + private int[] partIds; + + /** */ + private long[] initialUpdCntrs; + + /** */ + private long[] updCntrs; + + /** */ + private int curIdx; + + /** */ + private CachePartitionPartialCountersMap() { + // Empty map. + } + + /** + * @param partsCnt Total number of partitions will be stored in the partial map. + */ + public CachePartitionPartialCountersMap(int partsCnt) { + partIds = new int[partsCnt]; + initialUpdCntrs = new long[partsCnt]; + updCntrs = new long[partsCnt]; + } + + /** + * @return Total number of partitions added to the map. + */ + public int size() { + return curIdx; + } + + /** + * Adds partition counters for a partition with the given ID. + * + * @param partId Partition ID to add. + * @param initialUpdCntr Partition initial update counter. + * @param updCntr Partition update counter. + */ + public void add(int partId, long initialUpdCntr, long updCntr) { + if (curIdx > 0) { + if (partIds[curIdx - 1] >= partId) + throw new IllegalArgumentException("Adding a partition in the wrong order " + + "[prevPart=" + partIds[curIdx - 1] + ", partId=" + partId + ']'); + } + + if (curIdx == partIds.length) + throw new IllegalStateException("Adding more partitions than reserved: " + partIds.length); + + partIds[curIdx] = partId; + initialUpdCntrs[curIdx] = initialUpdCntr; + updCntrs[curIdx] = updCntr; + + curIdx++; + } + + /** + * @param partId Partition ID to search. + * @return Partition index in the array. + */ + public int partitionIndex(int partId) { + return Arrays.binarySearch(partIds, 0, curIdx, partId); + } + + /** + * Gets partition ID saved at the given index. + * + * @param idx Index to get value from. + * @return Partition ID. + */ + public int partitionAt(int idx) { + return partIds[idx]; + } + + /** + * Gets initial update counter saved at the given index. + * + * @param idx Index to get value from. + * @return Initial update counter. + */ + public long initialUpdateCounterAt(int idx) { + return initialUpdCntrs[idx]; + } + + /** + * Gets update counter saved at the given index. + * + * @param idx Index to get value from. + * @return Update counter. + */ + public long updateCounterAt(int idx) { + return updCntrs[idx]; + } + + + /** + * @param cntrsMap Partial local counters map. + * @return Partition ID to partition counters map. + */ + public static Map<Integer, T2<Long, Long>> toCountersMap(CachePartitionPartialCountersMap cntrsMap) { + if (cntrsMap.size() == 0) + return Collections.emptyMap(); + + Map<Integer, T2<Long, Long>> res = U.newHashMap(cntrsMap.size()); + + for (int idx = 0; idx < cntrsMap.size(); idx++) + res.put(cntrsMap.partitionAt(idx), + new T2<>(cntrsMap.initialUpdateCounterAt(idx), cntrsMap.updateCounterAt(idx))); + + return res; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5a5a4778/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java index 441952d..cae3ce2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java @@ -19,11 +19,9 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; import java.io.Externalizable; import java.nio.ByteBuffer; -import java.util.Map; import org.apache.ignite.internal.managers.communication.GridIoMessage; import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; @@ -92,12 +90,6 @@ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage } /** - * @param grpId Cache group ID. - * @return Parition update counters. - */ - public abstract Map<Integer, T2<Long, Long>> partitionUpdateCounters(int grpId); - - /** * @return Last used version among all nodes. */ @Nullable public GridCacheVersion lastVersion() { http://git-wip-us.apache.org/repos/asf/ignite/blob/5a5a4778/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 609021b..6789718 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -624,7 +624,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (updateTop && clientTop != null) { top.update(topologyVersion(), clientTop.partitionMap(true), - clientTop.updateCounters(false), + clientTop.fullUpdateCounters(), Collections.<Integer>emptySet(), null); } @@ -1114,7 +1114,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte GridDhtPartitionsSingleMessage msg; - // Reset lost partition before send local partition to coordinator. + // Reset lost partitions before sending local partitions to coordinator. if (exchActions != null) { Set<String> caches = exchActions.cachesToResetLostPartitions(); @@ -1524,10 +1524,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte Map<Integer, Long> minCntrs = new HashMap<>(); for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : msgs.entrySet()) { - assert e.getValue().partitionUpdateCounters(top.groupId()) != null; + CachePartitionPartialCountersMap nodeCntrs = e.getValue().partitionUpdateCounters(top.groupId()); - for (Map.Entry<Integer, T2<Long, Long>> e0 : e.getValue().partitionUpdateCounters(top.groupId()).entrySet()) { - int p = e0.getKey(); + assert nodeCntrs != null; + + for (int i = 0; i < nodeCntrs.size(); i++) { + int p = nodeCntrs.partitionAt(i); UUID uuid = e.getKey(); @@ -1536,10 +1538,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (state != GridDhtPartitionState.OWNING && state != GridDhtPartitionState.MOVING) continue; - Long cntr = state == GridDhtPartitionState.MOVING ? e0.getValue().get1() : e0.getValue().get2(); - - if (cntr == null) - cntr = 0L; + long cntr = state == GridDhtPartitionState.MOVING ? + nodeCntrs.initialUpdateCounterAt(i) : + nodeCntrs.updateCounterAt(i); Long minCntr = minCntrs.get(p); @@ -1555,6 +1556,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte maxCntrs.put(p, new CounterWithNodes(cntr, uuid)); else if (cntr == maxCntr.cnt) maxCntr.nodes.add(uuid); + } } @@ -1728,10 +1730,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte GridDhtPartitionTopology top = grp != null ? grp.topology() : cctx.exchange().clientTopology(grpId, this); - Map<Integer, T2<Long, Long>> cntrs = msg0.partitionUpdateCounters(grpId); + CachePartitionPartialCountersMap cntrs = msg0.partitionUpdateCounters(grpId); if (cntrs != null) - top.applyUpdateCounters(cntrs); + top.collectUpdateCounters(cntrs); } } } @@ -1965,7 +1967,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : msg.partitions().entrySet()) { Integer grpId = entry.getKey(); - Map<Integer, T2<Long, Long>> cntrMap = msg.partitionUpdateCounters(grpId); + CachePartitionFullCountersMap cntrMap = msg.partitionUpdateCounters(grpId); CacheGroupContext grp = cctx.cache().cacheGroup(grpId); http://git-wip-us.apache.org/repos/asf/ignite/blob/5a5a4778/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index acc4dbe..ef3a58f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@ -32,7 +32,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; @@ -187,7 +186,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa * @param grpId Cache group ID. * @param cntrMap Partition update counters. */ - public void addPartitionUpdateCounters(int grpId, Map<Integer, T2<Long, Long>> cntrMap) { + public void addPartitionUpdateCounters(int grpId, CachePartitionFullCountersMap cntrMap) { if (partCntrs == null) partCntrs = new IgniteDhtPartitionCountersMap(); @@ -198,14 +197,10 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa * @param grpId Cache group ID. * @return Partition update counters. */ - @Override public Map<Integer, T2<Long, Long>> partitionUpdateCounters(int grpId) { - if (partCntrs != null) { - Map<Integer, T2<Long, Long>> res = partCntrs.get(grpId); + public CachePartitionFullCountersMap partitionUpdateCounters(int grpId) { + CachePartitionFullCountersMap res = partCntrs == null ? null : partCntrs.get(grpId); - return res != null ? res : Collections.<Integer, T2<Long, Long>>emptyMap(); - } - - return Collections.emptyMap(); + return res != null ? res : CachePartitionFullCountersMap.EMPTY; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/5a5a4778/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java index b4d25c4..3c11fa7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java @@ -17,11 +17,11 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; -import java.util.Map; -import java.util.HashMap; +import java.io.Externalizable; import java.nio.ByteBuffer; import java.util.Collections; -import java.io.Externalizable; +import java.util.HashMap; +import java.util.Map; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridDirectMap; import org.apache.ignite.internal.GridDirectTransient; @@ -30,7 +30,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; @@ -60,7 +59,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes /** Partitions update counters. */ @GridToStringInclude @GridDirectTransient - private Map<Integer, Map<Integer, T2<Long, Long>>> partCntrs; + private Map<Integer, CachePartitionPartialCountersMap> partCntrs; /** Serialized partitions counters. */ private byte[] partCntrsBytes; @@ -150,7 +149,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes * @param grpId Cache group ID. * @param cntrMap Partition update counters. */ - public void partitionUpdateCounters(int grpId, Map<Integer, T2<Long, Long>> cntrMap) { + public void partitionUpdateCounters(int grpId, CachePartitionPartialCountersMap cntrMap) { if (partCntrs == null) partCntrs = new HashMap<>(); @@ -161,14 +160,10 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes * @param grpId Cache group ID. * @return Partition update counters. */ - @Override public Map<Integer, T2<Long, Long>> partitionUpdateCounters(int grpId) { - if (partCntrs != null) { - Map<Integer, T2<Long, Long>> res = partCntrs.get(grpId); - - return res != null ? res : Collections.<Integer, T2<Long, Long>>emptyMap(); - } + public CachePartitionPartialCountersMap partitionUpdateCounters(int grpId) { + CachePartitionPartialCountersMap res = partCntrs == null ? null : partCntrs.get(grpId); - return Collections.emptyMap(); + return res == null ? CachePartitionPartialCountersMap.EMPTY : res; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/5a5a4778/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java index 4b80ee0..5fb22a4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java @@ -19,9 +19,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; import java.io.Externalizable; import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.Map; -import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; @@ -53,11 +50,6 @@ public class GridDhtPartitionsSingleRequest extends GridDhtPartitionsAbstractMes } /** {@inheritDoc} */ - @Override public Map<Integer, T2<Long, Long>> partitionUpdateCounters(int cacheId) { - return Collections.emptyMap(); - } - - /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { writer.setBuffer(buf); http://git-wip-us.apache.org/repos/asf/ignite/blob/5a5a4778/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java index 9db80ae..0124e80 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java @@ -19,10 +19,10 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; import java.io.Serializable; -import java.util.Collections; import java.util.HashMap; import java.util.Map; -import org.apache.ignite.internal.util.typedef.T2; + +import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionFullCountersMap.EMPTY; /** * Partition counters map. @@ -32,13 +32,13 @@ public class IgniteDhtPartitionCountersMap implements Serializable { private static final long serialVersionUID = 0L; /** */ - private Map<Integer, Map<Integer, T2<Long, Long>>> map; + private Map<Integer, CachePartitionFullCountersMap> map; /** * @param cacheId Cache ID. * @param cntrMap Counters map. */ - public synchronized void putIfAbsent(int cacheId, Map<Integer, T2<Long, Long>> cntrMap) { + public synchronized void putIfAbsent(int cacheId, CachePartitionFullCountersMap cntrMap) { if (map == null) map = new HashMap<>(); @@ -50,14 +50,14 @@ public class IgniteDhtPartitionCountersMap implements Serializable { * @param cacheId Cache ID. * @return Counters map. */ - public synchronized Map<Integer, T2<Long, Long>> get(int cacheId) { + public synchronized CachePartitionFullCountersMap get(int cacheId) { if (map == null) map = new HashMap<>(); - Map<Integer, T2<Long, Long>> cntrMap = map.get(cacheId); + CachePartitionFullCountersMap cntrMap = map.get(cacheId); if (cntrMap == null) - return Collections.emptyMap(); + return EMPTY; return cntrMap; } http://git-wip-us.apache.org/repos/asf/ignite/blob/5a5a4778/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java index ed6eee2..83a9f55 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java @@ -1167,7 +1167,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple } /** {@inheritDoc} */ - @Override public Long initialUpdateCounter() { + @Override public long initialUpdateCounter() { try { CacheDataStore delegate0 = init0(true); http://git-wip-us.apache.org/repos/asf/ignite/blob/5a5a4778/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 7062353..be958c7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -93,6 +93,7 @@ import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE; import static org.apache.ignite.internal.GridTopic.TOPIC_CONTINUOUS; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; +import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap.toCountersMap; import static org.apache.ignite.internal.processors.continuous.GridContinuousMessageType.MSG_EVT_ACK; import static org.apache.ignite.internal.processors.continuous.GridContinuousMessageType.MSG_EVT_NOTIFICATION; @@ -202,7 +203,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter { GridCacheContext cctx = interCache != null ? interCache.context() : null; if (cctx != null && cntrsPerNode != null && !cctx.isLocal() && cctx.affinityNode()) - cntrsPerNode.put(ctx.localNodeId(), cctx.topology().updateCounters(false)); + cntrsPerNode.put(ctx.localNodeId(), + toCountersMap(cctx.topology().localUpdateCounters())); routine.handler().updateCounters(topVer, cntrsPerNode, cntrs); } @@ -1070,7 +1072,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter { GridCacheAdapter cache = ctx.cache().internalCache(hnd0.cacheName()); if (cache != null && !cache.isLocal() && cache.context().userCache()) - req.addUpdateCounters(ctx.localNodeId(), cache.context().topology().updateCounters(false)); + req.addUpdateCounters(ctx.localNodeId(), + toCountersMap(cache.context().topology().localUpdateCounters())); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/5a5a4778/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java index 43069cd..39c31ad 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java @@ -75,6 +75,7 @@ import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.managers.communication.GridIoMessage; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap; import org.apache.ignite.internal.processors.continuous.GridContinuousHandler; import org.apache.ignite.internal.processors.continuous.GridContinuousMessage; import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor; @@ -519,11 +520,14 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC Affinity<Object> aff = grid(i).affinity(DEFAULT_CACHE_NAME); - Map<Integer, T2<Long, Long>> act = grid(i).cachex(DEFAULT_CACHE_NAME).context().topology().updateCounters(false); + CachePartitionPartialCountersMap act = grid(i).cachex(DEFAULT_CACHE_NAME).context().topology().localUpdateCounters(); for (Map.Entry<Integer, Long> e : updCntrs.entrySet()) { - if (aff.mapPartitionToPrimaryAndBackups(e.getKey()).contains(grid(i).localNode())) - assertEquals(e.getValue(), act.get(e.getKey()).get2()); + if (aff.mapPartitionToPrimaryAndBackups(e.getKey()).contains(grid(i).localNode())) { + int partIdx = act.partitionIndex(e.getKey()); + + assertEquals(e.getValue(), (Long)act.updateCounterAt(partIdx)); + } } } }
