ignite-4154
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5d4987a9 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5d4987a9 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5d4987a9 Branch: refs/heads/ignite-4154 Commit: 5d4987a9387fa2906cf8608bbbab0dc114432eb7 Parents: dc92038 Author: sboikov <[email protected]> Authored: Wed Nov 9 10:59:29 2016 +0300 Committer: sboikov <[email protected]> Committed: Wed Nov 9 10:59:29 2016 +0300 ---------------------------------------------------------------------- .../affinity/GridAffinityAssignmentCache.java | 62 +-- .../affinity/GridAffinityProcessor.java | 81 ++++ .../cache/CacheAffinitySharedManager.java | 27 +- .../cache/DynamicCacheChangeBatch.java | 2 +- .../GridCachePartitionExchangeManager.java | 275 ++++++++++--- .../dht/GridClientPartitionTopology.java | 33 +- .../dht/GridDhtPartitionTopology.java | 3 +- .../dht/GridDhtPartitionTopologyImpl.java | 27 +- .../dht/preloader/GridDhtPartitionFullMap.java | 18 + .../dht/preloader/GridDhtPartitionMap2.java | 62 ++- .../GridDhtPartitionsExchangeFuture.java | 92 +---- .../preloader/GridDhtPartitionsFullMessage.java | 89 ++++- .../GridDhtPartitionsSingleMessage.java | 68 +++- .../continuous/GridContinuousProcessor.java | 4 +- .../ignite/spi/discovery/tcp/ClientImpl.java | 26 +- ...CacheExchangeMessageDuplicatedStateTest.java | 386 +++++++++++++++++++ .../GridCacheSyncReplicatedPreloadSelfTest.java | 3 - .../IgniteCacheSyncRebalanceModeSelfTest.java | 4 +- ...ContinuousQueryFailoverAbstractSelfTest.java | 2 +- .../spi/discovery/tcp/TcpDiscoverySelfTest.java | 134 ++++++- .../testsuites/IgniteCacheTestSuite2.java | 3 + 21 files changed, 1138 insertions(+), 263 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/5d4987a9/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java index 9166b31..a388c7a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java @@ -108,7 +108,7 @@ public class GridAffinityAssignmentCache { private final AtomicInteger fullHistSize = new AtomicInteger(); /** */ - private final SimilarAffinityKey similarAffKey; + private final Object similarAffKey; /** * Constructs affinity cached calculations. @@ -147,9 +147,14 @@ public class GridAffinityAssignmentCache { affCache = new ConcurrentSkipListMap<>(); head = new AtomicReference<>(new GridAffinityAssignment(AffinityTopologyVersion.NONE)); - similarAffKey = new SimilarAffinityKey(aff.getClass(), nodeFilter.getClass(), backups, partsCnt); + similarAffKey = ctx.affinity().similaryAffinityKey(aff, nodeFilter, backups, partsCnt); + + assert similarAffKey != null; } + /** + * @return Key to find caches with similar affinity. + */ public Object similarAffinityKey() { return similarAffKey; } @@ -612,57 +617,4 @@ public class GridAffinityAssignmentCache { return S.toString(AffinityReadyFuture.class, this); } } - - /** - * - */ - private static class SimilarAffinityKey { - /** */ - private final int backups; - - /** */ - private final Class<?> affFuncCls; - - /** */ - private final Class<?> filterCls; - - /** */ - private final int partsCnt; - - /** */ - private final int hash; - - public SimilarAffinityKey(Class<?> affFuncCls, Class<?> filterCls, int backups, int partsCnt) { - this.backups = backups; - this.affFuncCls = affFuncCls; - this.filterCls = filterCls; - this.partsCnt = partsCnt; - - int hash = backups; - hash = 31 * hash + affFuncCls.hashCode(); - hash = 31 * hash + filterCls.hashCode(); - hash= 31 * hash + partsCnt; - - this.hash = hash; - } - - @Override public int hashCode() { - return hash; - } - - @Override public boolean equals(Object o) { - if (o == this) - return true; - - if (o == null || getClass() != o.getClass()) - return false; - - SimilarAffinityKey key = (SimilarAffinityKey)o; - - return backups == key.backups && - affFuncCls == key.affFuncCls && - filterCls == key.filterCls && - partsCnt == key.partsCnt; - } - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5d4987a9/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java index d3783f0..b9182ae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java @@ -58,6 +58,7 @@ import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -568,6 +569,20 @@ public class GridAffinityProcessor extends GridProcessorAdapter { return nodes.iterator().next(); } + /** + * @param aff Affinity function. + * @param nodeFilter Node class. + * @param backups Number of backups. + * @param parts Number of partitions. + * @return Key to find caches with similar affinity. + */ + public Object similaryAffinityKey(AffinityFunction aff, + IgnitePredicate<ClusterNode> nodeFilter, + int backups, + int parts) { + return new SimilarAffinityKey(aff.getClass(), nodeFilter.getClass(), backups, parts); + } + /** {@inheritDoc} */ @Override public void printMemoryStats() { X.println(">>>"); @@ -966,4 +981,70 @@ public class GridAffinityProcessor extends GridProcessorAdapter { return aff; } } + + /** + * + */ + private static class SimilarAffinityKey { + /** */ + private final int backups; + + /** */ + private final Class<?> affFuncCls; + + /** */ + private final Class<?> filterCls; + + /** */ + private final int partsCnt; + + /** */ + private final int hash; + + /** + * @param affFuncCls Affinity function class. + * @param filterCls Node filter class. + * @param backups Number of backups. + * @param partsCnt Number of partitions. + */ + SimilarAffinityKey(Class<?> affFuncCls, Class<?> filterCls, int backups, int partsCnt) { + this.backups = backups; + this.affFuncCls = affFuncCls; + this.filterCls = filterCls; + this.partsCnt = partsCnt; + + int hash = backups; + hash = 31 * hash + affFuncCls.hashCode(); + hash = 31 * hash + filterCls.hashCode(); + hash= 31 * hash + partsCnt; + + this.hash = hash; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return hash; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (o == this) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + SimilarAffinityKey key = (SimilarAffinityKey)o; + + return backups == key.backups && + affFuncCls == key.affFuncCls && + filterCls == key.filterCls && + partsCnt == key.partsCnt; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(SimilarAffinityKey.class, this); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/5d4987a9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 88f1f97..2890887 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -127,7 +127,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @param node Event node. * @param topVer Topology version. */ - public void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer) { + void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer) { if (type == EVT_NODE_JOINED && node.isLocal()) { // Clean-up in case of client reconnect. registeredCaches.clear(); @@ -153,7 +153,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @param msg Customer message. * @return {@code True} if minor topology version should be increased. */ - public boolean onCustomEvent(CacheAffinityChangeMessage msg) { + boolean onCustomEvent(CacheAffinityChangeMessage msg) { assert lateAffAssign : msg; if (msg.exchangeId() != null) { @@ -219,7 +219,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @param top Topology. * @param checkCacheId Cache ID. */ - public void checkRebalanceState(GridDhtPartitionTopology top, Integer checkCacheId) { + void checkRebalanceState(GridDhtPartitionTopology top, Integer checkCacheId) { if (!lateAffAssign) return; @@ -1246,6 +1246,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @param aff Affinity. * @param rebalanceInfo Rebalance information. * @param latePrimary If {@code true} delays primary assignment if it is not owner. + * @param affCache Already calculated assignments (to reduce data stored in history). * @throws IgniteCheckedException If failed. */ private void initAffinityOnNodeJoin(GridDhtPartitionsExchangeFuture fut, @@ -1303,10 +1304,10 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } /** - * @param aff - * @param assign - * @param affCache - * @return + * @param aff Assignment cache. + * @param assign Assignment. + * @param affCache Assignments already calculated for other caches. + * @return Assignment. */ private List<List<ClusterNode>> cachedAssignment(GridAffinityAssignmentCache aff, List<List<ClusterNode>> assign, @@ -1393,7 +1394,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @return Affinity assignment. * @throws IgniteCheckedException If failed. */ - public Map<Integer, Map<Integer, List<UUID>>> initAffinityOnNodeLeft0(final GridDhtPartitionsExchangeFuture fut) + private Map<Integer, Map<Integer, List<UUID>>> initAffinityOnNodeLeft0(final GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException { final AffinityTopologyVersion topVer = fut.topologyVersion(); @@ -1580,7 +1581,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @param aff Affinity cache. * @param initAff Existing affinity cache. */ - public CacheHolder(boolean rebalanceEnabled, GridAffinityAssignmentCache aff, @Nullable GridAffinityAssignmentCache initAff) { + CacheHolder(boolean rebalanceEnabled, + GridAffinityAssignmentCache aff, + @Nullable GridAffinityAssignmentCache initAff) { this.aff = aff; if (initAff != null) @@ -1632,7 +1635,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap /** * Created cache is started on coordinator. */ - class CacheHolder1 extends CacheHolder { + private class CacheHolder1 extends CacheHolder { /** */ private final GridCacheContext cctx; @@ -1640,7 +1643,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @param cctx Cache context. * @param initAff Current affinity. */ - public CacheHolder1(GridCacheContext cctx, @Nullable GridAffinityAssignmentCache initAff) { + CacheHolder1(GridCacheContext cctx, @Nullable GridAffinityAssignmentCache initAff) { super(cctx.rebalanceEnabled(), cctx.affinity().affinityCache(), initAff); assert !cctx.isLocal() : cctx.name(); @@ -1677,7 +1680,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap /** * Created if cache is not started on coordinator. */ - static class CacheHolder2 extends CacheHolder { + private static class CacheHolder2 extends CacheHolder { /** */ private final GridCacheSharedContext cctx; http://git-wip-us.apache.org/repos/asf/ignite/blob/5d4987a9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java index 4dcff9b..39e1c50 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java @@ -86,7 +86,7 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage { * @param clientNodes Client nodes map. */ public void clientNodes(Map<String, Map<UUID, Boolean>> clientNodes) { - this.clientNodes = clientNodes; + this.clientNodes = null;//clientNodes; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/5d4987a9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index a81bf0f..928500f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -21,6 +21,7 @@ import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Comparator; import java.util.Date; import java.util.HashMap; @@ -44,7 +45,9 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.cache.affinity.AffinityFunction; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; @@ -56,6 +59,7 @@ import org.apache.ignite.internal.events.DiscoveryCustomEvent; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage; @@ -72,6 +76,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.util.GridListSet; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -81,6 +86,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.GPC; import org.apache.ignite.internal.util.typedef.internal.S; @@ -532,8 +538,23 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (top != null) return top; + Object affKey = null; + + DynamicCacheDescriptor desc = cctx.cache().cacheDescriptor(cacheId); + + if (desc != null) { + CacheConfiguration ccfg = desc.cacheConfiguration(); + + AffinityFunction aff = ccfg.getAffinity(); + + affKey = cctx.kernalContext().affinity().similaryAffinityKey(aff, + ccfg.getNodeFilter(), + ccfg.getBackups(), + aff.partitions()); + } + GridClientPartitionTopology old = clientTops.putIfAbsent(cacheId, - top = new GridClientPartitionTopology(cctx, cacheId, exchFut)); + top = new GridClientPartitionTopology(cctx, cacheId, exchFut, affKey)); return old != null ? old : top; } @@ -762,43 +783,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana * @param nodes Nodes. * @return {@code True} if message was sent, {@code false} if node left grid. */ - private boolean sendAllPartitions(Collection<? extends ClusterNode> nodes) { - GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(null, null, AffinityTopologyVersion.NONE); - - boolean useOldApi = false; - boolean compress = true; - - for (ClusterNode node : nodes) { - if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0) { - useOldApi = true; - compress = false; - - break; - } - else if (node.version().compareToIgnoreTimestamp(GridDhtPartitionsAbstractMessage.PART_MAP_COMPRESS_SINCE) < 0) - compress = false; - } - - m.compress(compress); - - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (!cacheCtx.isLocal() && cacheCtx.started()) { - GridDhtPartitionFullMap locMap = cacheCtx.topology().partitionMap(true); - - if (useOldApi) { - locMap = new GridDhtPartitionFullMap(locMap.nodeId(), - locMap.nodeOrder(), - locMap.updateSequence(), - locMap); - } - - m.addFullPartitionsMap(cacheCtx.cacheId(), locMap); - } - } - - // It is important that client topologies be added after contexts. - for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) - m.addFullPartitionsMap(top.cacheId(), top.partitionMap(true)); + private boolean sendAllPartitions(Collection<ClusterNode> nodes) { + GridDhtPartitionsFullMessage m = createPartitionsFullMessage(nodes, null, null, true); if (log.isDebugEnabled()) log.debug("Sending all partitions [nodeIds=" + U.nodeIds(nodes) + ", msg=" + m + ']'); @@ -821,32 +807,140 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** - * @param node Node. - * @param id ID. + * @param nodes Target nodes. + * @param exchId Non-null exchange ID if message is created for exchange. + * @param lastVer Last version. + * @param compress {@code True} if it is possible to use compression for message. + * @return Message. */ - private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) { - GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id, - cctx.kernalContext().clientNode(), - cctx.versions().last(), - node.version().compareToIgnoreTimestamp(GridDhtPartitionsSingleMessage.PART_MAP_COMPRESS_SINCE) >= 0); + public GridDhtPartitionsFullMessage createPartitionsFullMessage(Collection<ClusterNode> nodes, + @Nullable GridDhtPartitionExchangeId exchId, + @Nullable GridCacheVersion lastVer, + boolean compress) { + GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(exchId, + lastVer, + exchId != null ? exchId.topologyVersion() : AffinityTopologyVersion.NONE); + + boolean useOldApi = false; + + if (nodes != null) { + for (ClusterNode node : nodes) { + if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0) { + useOldApi = true; + compress = false; + + break; + } + else if (node.version().compareToIgnoreTimestamp(GridDhtPartitionsAbstractMessage.PART_MAP_COMPRESS_SINCE) < 0) + compress = false; + } + } + + m.compress(compress); + + Map<Object, T2<Integer, GridDhtPartitionFullMap>> dupData = new HashMap<>(); for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (!cacheCtx.isLocal()) { - GridDhtPartitionMap2 locMap = cacheCtx.topology().localPartitionMap(); + boolean ready; - if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0) - locMap = new GridDhtPartitionMap(locMap.nodeId(), locMap.updateSequence(), locMap.map()); + if (exchId != null) { + AffinityTopologyVersion startTopVer = cacheCtx.startTopologyVersion(); + + ready = startTopVer == null || startTopVer.compareTo(exchId.topologyVersion()) <= 0; + } + else + ready = cacheCtx.started(); - m.addLocalPartitionMap(cacheCtx.cacheId(), locMap); + if (ready) { + GridDhtPartitionFullMap locMap = cacheCtx.topology().partitionMap(true); + + if (useOldApi) { + locMap = new GridDhtPartitionFullMap(locMap.nodeId(), + locMap.nodeOrder(), + locMap.updateSequence(), + locMap); + } + + addFullPartitionsMap(m, + dupData, + compress, + cacheCtx.cacheId(), + locMap, + cacheCtx.affinity().affinityCache().similarAffinityKey()); + + if (exchId != null) + m.addPartitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters(true)); + } } } - for (GridClientPartitionTopology top : clientTops.values()) { - GridDhtPartitionMap2 locMap = top.localPartitionMap(); + // It is important that client topologies be added after contexts. + for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) { + GridDhtPartitionFullMap map = top.partitionMap(true); + + addFullPartitionsMap(m, + dupData, + compress, + top.cacheId(), + map, + top.similarAffinityKey()); + + if (exchId != null) + m.addPartitionUpdateCounters(top.cacheId(), top.updateCounters(true)); + } + + return m; + } + + /** + * @param m Message. + * @param dupData Duplicated data map. + * @param compress {@code True} if need check for duplicated partition state data. + * @param cacheId Cache ID. + * @param map Map to add. + * @param affKey Cache affinity key. + */ + private void addFullPartitionsMap(GridDhtPartitionsFullMessage m, + Map<Object, T2<Integer, GridDhtPartitionFullMap>> dupData, + boolean compress, + Integer cacheId, + GridDhtPartitionFullMap map, + Object affKey) { + Integer dupDataCache = null; - m.addLocalPartitionMap(top.cacheId(), locMap); + if (compress && affKey != null && !m.containsCache(cacheId)) { + T2<Integer, GridDhtPartitionFullMap> state0 = dupData.get(affKey); + + if (state0 != null && state0.get2().partitionStateEquals(map)) { + GridDhtPartitionFullMap map0 = new GridDhtPartitionFullMap(map.nodeId(), + map.nodeOrder(), + map.updateSequence()); + + for (Map.Entry<UUID, GridDhtPartitionMap2> e : map.entrySet()) + map0.put(e.getKey(), e.getValue().emptyCopy()); + + map = map0; + + dupDataCache = state0.get1(); + } + else + dupData.put(affKey, new T2<>(cacheId, map)); } + m.addFullPartitionsMap(cacheId, map, dupDataCache); + } + + /** + * @param node Node. + * @param id ID. + */ + private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) { + GridDhtPartitionsSingleMessage m = createPartitionsSingleMessage(node, + id, + cctx.kernalContext().clientNode(), + false); + if (log.isDebugEnabled()) log.debug("Sending local partitions [nodeId=" + node.id() + ", msg=" + m + ']'); @@ -864,6 +958,81 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** + * @param targetNode Target node. + * @param exchangeId ID. + * @param clientOnlyExchange Client exchange flag. + * @param sndCounters {@code True} if need send partition update counters. + * @return Message. + */ + public GridDhtPartitionsSingleMessage createPartitionsSingleMessage(ClusterNode targetNode, + @Nullable GridDhtPartitionExchangeId exchangeId, + boolean clientOnlyExchange, + boolean sndCounters) + { + boolean compress = + targetNode.version().compareToIgnoreTimestamp(GridDhtPartitionsSingleMessage.PART_MAP_COMPRESS_SINCE) >= 0; + + GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(exchangeId, + clientOnlyExchange, + cctx.versions().last(), + compress); + + Map<Object, T2<Integer,Map<Integer, GridDhtPartitionState>>> dupData = new HashMap<>(); + + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { + if (!cacheCtx.isLocal()) { + GridDhtPartitionMap2 locMap = cacheCtx.topology().localPartitionMap(); + + if (targetNode.version().compareTo(GridDhtPartitionMap2.SINCE) < 0) + locMap = new GridDhtPartitionMap(locMap.nodeId(), locMap.updateSequence(), locMap.map()); + + addPartitionMap(m, + dupData, + compress, + cacheCtx.cacheId(), + locMap, + cacheCtx.affinity().affinityCache().similarAffinityKey()); + + if (sndCounters) + m.partitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters(true)); + } + } + + return m; + } + + /** + * @param m Message. + * @param dupData Duplicated data map. + * @param compress {@code True} if need check for duplicated partition state data. + * @param cacheId Cache ID. + * @param map Map to add. + * @param affKey Cache affinity key. + */ + private void addPartitionMap(GridDhtPartitionsSingleMessage m, + Map<Object, T2<Integer,Map<Integer, GridDhtPartitionState>>> dupData, + boolean compress, + Integer cacheId, + GridDhtPartitionMap2 map, + Object affKey) { + Integer dupDataCache = null; + + if (compress) { + T2<Integer, Map<Integer, GridDhtPartitionState>> state0 = dupData.get(affKey); + + if (state0 != null && state0.get2().equals(map.map())) { + dupDataCache = state0.get1(); + + map.map(U.<Integer, GridDhtPartitionState>newHashMap(0)); + } + else + dupData.put(affKey, new T2<>(cacheId, map.map())); + } + + m.addLocalPartitionMap(cacheId, map, dupDataCache); + } + + /** * @param nodeId Cause node ID. * @param topVer Topology version. * @param evt Event type. @@ -880,7 +1049,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana * @param affChangeMsg Affinity change message. * @return Exchange future. */ - GridDhtPartitionsExchangeFuture exchangeFuture(GridDhtPartitionExchangeId exchId, + private GridDhtPartitionsExchangeFuture exchangeFuture(GridDhtPartitionExchangeId exchId, @Nullable DiscoveryEvent discoEvt, @Nullable Collection<DynamicCacheChangeRequest> reqs, @Nullable CacheAffinityChangeMessage affChangeMsg) { http://git-wip-us.apache.org/repos/asf/ignite/blob/5d4987a9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index 58933b7..5efb317 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -61,6 +61,9 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { /** Flag to control amount of output for full map. */ private static final boolean FULL_MAP_DEBUG = false; + /** */ + private static final Long ZERO = 0L; + /** Cache shared context. */ private GridCacheSharedContext cctx; @@ -97,18 +100,24 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { /** Partition update counters. */ private Map<Integer, Long> cntrMap = new HashMap<>(); + /** */ + private final Object similarAffKey; + /** * @param cctx Context. * @param cacheId Cache ID. * @param exchFut Exchange ID. + * @param similarAffKey Key to find caches with similar affinity. */ public GridClientPartitionTopology( GridCacheSharedContext cctx, int cacheId, - GridDhtPartitionsExchangeFuture exchFut + GridDhtPartitionsExchangeFuture exchFut, + Object similarAffKey ) { this.cctx = cctx; this.cacheId = cacheId; + this.similarAffKey = similarAffKey; topVer = exchFut.topologyVersion(); @@ -125,6 +134,13 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } /** + * @return Key to find caches with similar affinity. + */ + @Nullable public Object similarAffinityKey() { + return similarAffKey; + } + + /** * @return Full map string representation. */ @SuppressWarnings( {"ConstantConditions"}) @@ -873,11 +889,22 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } /** {@inheritDoc} */ - @Override public Map<Integer, Long> updateCounters() { + @Override public Map<Integer, Long> updateCounters(boolean skipZeros) { lock.readLock().lock(); try { - return new HashMap<>(cntrMap); + if (skipZeros) { + Map<Integer, Long> res = U.newHashMap(cntrMap.size()); + + for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) { + if (!e.getValue().equals(ZERO)) + res.put(e.getKey(), e.getValue()); + } + + return res; + } + else + return new HashMap<>(cntrMap); } finally { lock.readLock().unlock(); http://git-wip-us.apache.org/repos/asf/ignite/blob/5d4987a9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index 6e9b907..4ae4e47 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -220,9 +220,10 @@ public interface GridDhtPartitionTopology { @Nullable Map<Integer, Long> cntrMap); /** + * @param skipZeros If {@code true} then filters out zero counters. * @return Partition update counters. */ - public Map<Integer, Long> updateCounters(); + public Map<Integer, Long> updateCounters(boolean skipZeros); /** * @param part Partition to own. http://git-wip-us.apache.org/repos/asf/ignite/blob/5d4987a9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 871a084..f3751ac 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -71,6 +71,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** Flag to control amount of output for full map. */ private static final boolean FULL_MAP_DEBUG = false; + /** */ + private static final Long ZERO = 0L; + /** Context. */ private final GridCacheContext<?, ?> cctx; @@ -1500,11 +1503,26 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** {@inheritDoc} */ - @Override public Map<Integer, Long> updateCounters() { + @Override public Map<Integer, Long> updateCounters(boolean skipZeros) { lock.readLock().lock(); try { - Map<Integer, Long> res = new HashMap<>(cntrMap); + Map<Integer, Long> res; + + if (skipZeros) { + res = U.newHashMap(cntrMap.size()); + + for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) { + Long cntr = e.getValue(); + + if (ZERO.equals(cntr)) + continue; + + res.put(e.getKey(), cntr); + } + } + else + res = new HashMap<>(cntrMap); for (int i = 0; i < locParts.length; i++) { GridDhtLocalPartition part = locParts[i]; @@ -1513,7 +1531,10 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { continue; Long cntr0 = res.get(part.id()); - Long cntr1 = part.updateCounter(); + long cntr1 = part.updateCounter(); + + if (skipZeros && cntr1 == 0L) + continue; if (cntr0 == null || cntr1 > cntr0) res.put(part.id(), cntr1); http://git-wip-us.apache.org/repos/asf/ignite/blob/5d4987a9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java index 498d492..8f5ad17 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java @@ -158,6 +158,24 @@ public class GridDhtPartitionFullMap extends HashMap<UUID, GridDhtPartitionMap2> } /** + * @param fullMap Map. + * @return {@code True} if this map and given map contain the same data. + */ + public boolean partitionStateEquals(GridDhtPartitionFullMap fullMap) { + if (size() != fullMap.size()) + return false; + + for (Map.Entry<UUID, GridDhtPartitionMap2> e : entrySet()) { + GridDhtPartitionMap2 m = fullMap.get(e.getKey()); + + if (m == null || !m.map().equals(e.getValue().map())) + return false; + } + + return true; + } + + /** * @param updateSeq New update sequence value. * @return Old update sequence value. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/5d4987a9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java index 15b5a2e..dc308ba 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java @@ -63,25 +63,15 @@ public class GridDhtPartitionMap2 implements Comparable<GridDhtPartitionMap2>, E /** * @param nodeId Node ID. * @param updateSeq Update sequence number. - */ - public GridDhtPartitionMap2(UUID nodeId, long updateSeq) { - assert nodeId != null; - assert updateSeq > 0; - - this.nodeId = nodeId; - this.updateSeq = updateSeq; - - map = new HashMap<>(); - } - - /** - * @param nodeId Node ID. - * @param updateSeq Update sequence number. + * @param top Topology version. * @param m Map to copy. * @param onlyActive If {@code true}, then only active states will be included. */ - public GridDhtPartitionMap2(UUID nodeId, long updateSeq, AffinityTopologyVersion top, - Map<Integer, GridDhtPartitionState> m, boolean onlyActive) { + public GridDhtPartitionMap2(UUID nodeId, + long updateSeq, + AffinityTopologyVersion top, + Map<Integer, GridDhtPartitionState> m, + boolean onlyActive) { assert nodeId != null; assert updateSeq > 0; @@ -100,6 +90,36 @@ public class GridDhtPartitionMap2 implements Comparable<GridDhtPartitionMap2>, E } /** + * @param nodeId Node ID. + * @param updateSeq Update sequence number. + * @param top Topology version. + * @param map Map. + * @param moving Number of moving partitions. + */ + private GridDhtPartitionMap2(UUID nodeId, + long updateSeq, + AffinityTopologyVersion top, + Map<Integer, GridDhtPartitionState> map, + int moving) { + this.nodeId = nodeId; + this.updateSeq = updateSeq; + this.top = top; + this.map = map; + this.moving = moving; + } + + /** + * @return Copy with empty partition state map. + */ + public GridDhtPartitionMap2 emptyCopy() { + return new GridDhtPartitionMap2(nodeId, + updateSeq, + top, + U.<Integer, GridDhtPartitionState>newHashMap(0), + moving); + } + + /** * Empty constructor required for {@link Externalizable}. */ public GridDhtPartitionMap2() { @@ -174,6 +194,13 @@ public class GridDhtPartitionMap2 implements Comparable<GridDhtPartitionMap2>, E } /** + * @param map Partition states map. + */ + public void map(Map<Integer, GridDhtPartitionState> map) { + this.map = map; + } + + /** * @return Node ID. */ public UUID nodeId() { @@ -277,9 +304,8 @@ public class GridDhtPartitionMap2 implements Comparable<GridDhtPartitionMap2>, E long ver = in.readLong(); int minorVer = in.readInt(); - if (ver != 0) { + if (ver != 0) top = new AffinityTopologyVersion(ver, minorVer); - } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/5d4987a9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 6a17583..a79aba3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -54,6 +54,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; @@ -64,6 +65,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.S; @@ -544,7 +546,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT exchId.topologyVersion().equals(cacheCtx.startTopologyVersion()); if (updateTop && clientTop != null) - cacheCtx.topology().update(exchId, clientTop.partitionMap(true), clientTop.updateCounters()); + cacheCtx.topology().update(exchId, clientTop.partitionMap(true), clientTop.updateCounters(false)); } top.updateTopologyVersion(exchId, this, updSeq, stopping(cacheCtx.cacheId())); @@ -668,7 +670,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (top.cacheId() == cacheCtx.cacheId()) { cacheCtx.topology().update(exchId, top.partitionMap(true), - top.updateCounters()); + top.updateCounters(false)); break; } @@ -678,7 +680,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } else { if (!centralizedAff) - sendLocalPartitions(crd, exchId); + sendLocalPartitions(crd); initDone(); @@ -928,28 +930,14 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT /** * @param node Node. - * @param id ID. * @throws IgniteCheckedException If failed. */ - private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) + private void sendLocalPartitions(ClusterNode node) throws IgniteCheckedException { - GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id, + GridDhtPartitionsSingleMessage m = cctx.exchange().createPartitionsSingleMessage(node, + exchangeId(), clientOnlyExchange, - cctx.versions().last(), - node.version().compareToIgnoreTimestamp(GridDhtPartitionsSingleMessage.PART_MAP_COMPRESS_SINCE) >= 0); - - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (!cacheCtx.isLocal()) { - GridDhtPartitionMap2 locMap = cacheCtx.topology().localPartitionMap(); - - if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0) - locMap = new GridDhtPartitionMap(locMap.nodeId(), locMap.updateSequence(), locMap.map()); - - m.addLocalPartitionMap(cacheCtx.cacheId(), locMap); - - m.partitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters()); - } - } + true); if (log.isDebugEnabled()) log.debug("Sending local partitions [nodeId=" + node.id() + ", exchId=" + exchId + ", msg=" + m + ']'); @@ -965,60 +953,16 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT /** * @param nodes Target nodes. - * @return Message; + * @param compress {@code True} if it is possible to use compression for message. + * @return Message. */ - private GridDhtPartitionsFullMessage createPartitionsMessage(Collection<ClusterNode> nodes) { + private GridDhtPartitionsFullMessage createPartitionsMessage(Collection<ClusterNode> nodes, boolean compress) { GridCacheVersion last = lastVer.get(); - GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(exchangeId(), + return cctx.exchange().createPartitionsFullMessage(nodes, + exchangeId(), last != null ? last : cctx.versions().last(), - topologyVersion()); - - boolean useOldApi = false; - boolean compress = true; - - if (nodes != null) { - for (ClusterNode node : nodes) { - if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0) { - useOldApi = true; - compress = false; - - break; - } - else if (node.version().compareToIgnoreTimestamp(GridDhtPartitionsAbstractMessage.PART_MAP_COMPRESS_SINCE) < 0) - compress = false; - } - } - - m.compress(compress); - - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (!cacheCtx.isLocal()) { - AffinityTopologyVersion startTopVer = cacheCtx.startTopologyVersion(); - - boolean ready = startTopVer == null || startTopVer.compareTo(topologyVersion()) <= 0; - - if (ready) { - GridDhtPartitionFullMap locMap = cacheCtx.topology().partitionMap(true); - - if (useOldApi) - locMap = new GridDhtPartitionFullMap(locMap.nodeId(), locMap.nodeOrder(), locMap.updateSequence(), locMap); - - m.addFullPartitionsMap(cacheCtx.cacheId(), locMap); - - m.addPartitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters()); - } - } - } - - // It is important that client topologies be added after contexts. - for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) { - m.addFullPartitionsMap(top.cacheId(), top.partitionMap(true)); - - m.addPartitionUpdateCounters(top.cacheId(), top.updateCounters()); - } - - return m; + compress); } /** @@ -1026,7 +970,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT * @throws IgniteCheckedException If failed. */ private void sendAllPartitions(Collection<ClusterNode> nodes) throws IgniteCheckedException { - GridDhtPartitionsFullMessage m = createPartitionsMessage(nodes); + GridDhtPartitionsFullMessage m = createPartitionsMessage(nodes, true); if (log.isDebugEnabled()) log.debug("Sending full partition map [nodeIds=" + F.viewReadOnly(nodes, F.node2id()) + @@ -1040,7 +984,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT */ private void sendPartitions(ClusterNode oldestNode) { try { - sendLocalPartitions(oldestNode, exchId); + sendLocalPartitions(oldestNode); } catch (ClusterTopologyCheckedException ignore) { if (log.isDebugEnabled()) @@ -1244,7 +1188,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT Map<Integer, Map<Integer, List<UUID>>> assignmentChange = fut.get(); - GridDhtPartitionsFullMessage m = createPartitionsMessage(null); + GridDhtPartitionsFullMessage m = createPartitionsMessage(null, false); CacheAffinityChangeMessage msg = new CacheAffinityChangeMessage(exchId, m, assignmentChange); http://git-wip-us.apache.org/repos/asf/ignite/blob/5d4987a9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index 63d63e2..3d2d380 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@ -22,14 +22,19 @@ import java.nio.ByteBuffer; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.UUID; + import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.GridDirectMap; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.jetbrains.annotations.NotNull; @@ -48,6 +53,10 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa private Map<Integer, GridDhtPartitionFullMap> parts; /** */ + @GridDirectMap(keyType = Integer.class, valueType = Integer.class) + private Map<Integer, Integer> dupPartsData; + + /** */ private byte[] partsBytes; /** Partitions update counters. */ @@ -63,7 +72,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa /** */ @GridDirectTransient - private boolean compress; + private transient boolean compress; /** * Required by {@link Externalizable}. @@ -87,6 +96,9 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa this.topVer = topVer; } + /** + * @param compress {@code True} if it is possible to use compression for message. + */ public void compress(boolean compress) { this.compress = compress; } @@ -100,14 +112,33 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa /** * @param cacheId Cache ID. + * @return {@code True} if message contains full map for given cache. + */ + public boolean containsCache(int cacheId) { + return parts != null && parts.containsKey(cacheId); + } + + /** + * @param cacheId Cache ID. * @param fullMap Full partitions map. + * @param dupDataCache Optional ID of cache with the same partition state map. */ - public void addFullPartitionsMap(int cacheId, GridDhtPartitionFullMap fullMap) { + public void addFullPartitionsMap(int cacheId, GridDhtPartitionFullMap fullMap, @Nullable Integer dupDataCache) { if (parts == null) parts = new HashMap<>(); - if (!parts.containsKey(cacheId)) + if (!parts.containsKey(cacheId)) { parts.put(cacheId, fullMap); + + if (dupDataCache != null) { + assert parts.containsKey(dupDataCache); + + if (dupPartsData == null) + dupPartsData = new HashMap<>(); + + dupPartsData.put(cacheId, dupDataCache); + } + } } /** @@ -197,6 +228,34 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa parts = U.unmarshalZip(ctx.marshaller(), partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); else parts = U.unmarshal(ctx, partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + + if (dupPartsData != null) { + assert parts != null; + + for (Map.Entry<Integer, Integer> e : dupPartsData.entrySet()) { + GridDhtPartitionFullMap map1 = parts.get(e.getKey()); + + assert map1 != null : e.getKey(); + + GridDhtPartitionFullMap map2 = parts.get(e.getValue()); + + assert map2 != null : e.getValue(); + + for (Map.Entry<UUID, GridDhtPartitionMap2> e0 : map2.entrySet()) { + GridDhtPartitionMap2 partMap1 = map1.get(e0.getKey()); + + assert partMap1 != null && partMap1.map().isEmpty() : partMap1; + assert !partMap1.hasMovingPartitions() : partMap1; + + GridDhtPartitionMap2 partMap2 = e0.getValue(); + + assert partMap2 != null; + + for (Map.Entry<Integer, GridDhtPartitionState> stateEntry : partMap2.entrySet()) + partMap1.put(stateEntry.getKey(), stateEntry.getValue()); + } + } + } } if (parts == null) @@ -229,18 +288,24 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa switch (writer.state()) { case 6: - if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes)) + if (!writer.writeMap("dupPartsData", dupPartsData, MessageCollectionItemType.INT, MessageCollectionItemType.INT)) return false; writer.incrementState(); case 7: - if (!writer.writeByteArray("partsBytes", partsBytes)) + if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes)) return false; writer.incrementState(); case 8: + if (!writer.writeByteArray("partsBytes", partsBytes)) + return false; + + writer.incrementState(); + + case 9: if (!writer.writeMessage("topVer", topVer)) return false; @@ -263,7 +328,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa switch (reader.state()) { case 6: - partCntrsBytes = reader.readByteArray("partCntrsBytes"); + dupPartsData = reader.readMap("dupPartsData", MessageCollectionItemType.INT, MessageCollectionItemType.INT, false); if (!reader.isLastRead()) return false; @@ -271,7 +336,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa reader.incrementState(); case 7: - partsBytes = reader.readByteArray("partsBytes"); + partCntrsBytes = reader.readByteArray("partCntrsBytes"); if (!reader.isLastRead()) return false; @@ -279,6 +344,14 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa reader.incrementState(); case 8: + partsBytes = reader.readByteArray("partsBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 9: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -298,7 +371,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 9; + return 10; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/5d4987a9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java index a37e092..416d298 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java @@ -23,12 +23,16 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.GridDirectMap; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.jetbrains.annotations.Nullable; @@ -45,6 +49,10 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes @GridDirectTransient private Map<Integer, GridDhtPartitionMap2> parts; + /** */ + @GridDirectMap(keyType = Integer.class, valueType = Integer.class) + private Map<Integer, Integer> dupPartsData; + /** Serialized partitions. */ private byte[] partsBytes; @@ -60,7 +68,8 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes private boolean client; /** */ - private boolean compress; + @GridDirectTransient + private transient boolean compress; /** * Required by {@link Externalizable}. @@ -73,6 +82,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes * @param exchId Exchange ID. * @param client Client message flag. * @param lastVer Last version. + * @param compress {@code True} if it is possible to use compression for message. */ public GridDhtPartitionsSingleMessage(GridDhtPartitionExchangeId exchId, boolean client, @@ -92,16 +102,24 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes } /** - * Adds partition map to this message. - * * @param cacheId Cache ID to add local partition for. * @param locMap Local partition map. + * @param dupDataCache Optional ID of cache with the same partition state map. */ - public void addLocalPartitionMap(int cacheId, GridDhtPartitionMap2 locMap) { + public void addLocalPartitionMap(int cacheId, GridDhtPartitionMap2 locMap, @Nullable Integer dupDataCache) { if (parts == null) parts = new HashMap<>(); parts.put(cacheId, locMap); + + if (dupDataCache != null) { + assert F.isEmpty(locMap.map()); + + if (dupPartsData == null) + dupPartsData = new HashMap<>(); + + dupPartsData.put(cacheId, dupDataCache); + } } /** @@ -183,7 +201,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes if (compressed()) parts = U.unmarshalZip(ctx.marshaller(), partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); else - parts =U.unmarshal(ctx, partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + parts = U.unmarshal(ctx, partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); } if (partCntrsBytes != null && partCntrs == null) { @@ -192,6 +210,26 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes else partCntrs = U.unmarshal(ctx, partCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); } + + if (dupPartsData != null) { + assert parts != null; + + for (Map.Entry<Integer, Integer> e : dupPartsData.entrySet()) { + GridDhtPartitionMap2 map1 = parts.get(e.getKey()); + + assert map1 != null : e.getKey(); + assert F.isEmpty(map1.map()); + assert !map1.hasMovingPartitions(); + + GridDhtPartitionMap2 map2 = parts.get(e.getValue()); + + assert map2 != null : e.getValue(); + assert map2.map() != null; + + for (Map.Entry<Integer, GridDhtPartitionState> e0 : map2.map().entrySet()) + map1.put(e0.getKey(), e0.getValue()); + } + } } /** {@inheritDoc} */ @@ -216,12 +254,18 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes writer.incrementState(); case 7: - if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes)) + if (!writer.writeMap("dupPartsData", dupPartsData, MessageCollectionItemType.INT, MessageCollectionItemType.INT)) return false; writer.incrementState(); case 8: + if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes)) + return false; + + writer.incrementState(); + + case 9: if (!writer.writeByteArray("partsBytes", partsBytes)) return false; @@ -252,7 +296,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes reader.incrementState(); case 7: - partCntrsBytes = reader.readByteArray("partCntrsBytes"); + dupPartsData = reader.readMap("dupPartsData", MessageCollectionItemType.INT, MessageCollectionItemType.INT, false); if (!reader.isLastRead()) return false; @@ -260,6 +304,14 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes reader.incrementState(); case 8: + partCntrsBytes = reader.readByteArray("partCntrsBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 9: partsBytes = reader.readByteArray("partsBytes"); if (!reader.isLastRead()) @@ -279,7 +331,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 9; + return 10; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/5d4987a9/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 3a559e7..9fd9b6d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -239,7 +239,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { GridCacheContext cctx = interCache != null ? interCache.context() : null; if (cctx != null && cntrsPerNode != null && !cctx.isLocal() && cctx.affinityNode()) - cntrsPerNode.put(ctx.localNodeId(), cctx.topology().updateCounters()); + cntrsPerNode.put(ctx.localNodeId(), cctx.topology().updateCounters(false)); routine.handler().updateCounters(topVer, cntrsPerNode, cntrs); } @@ -1049,7 +1049,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { GridCacheAdapter cache = ctx.cache().internalCache(hnd0.cacheName()); if (cache != null && !cache.isLocal() && cache.context().userCache()) - req.addUpdateCounters(ctx.localNodeId(), cache.context().topology().updateCounters()); + req.addUpdateCounters(ctx.localNodeId(), cache.context().topology().updateCounters(false)); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/5d4987a9/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index f929121..f8e38d1 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -1049,7 +1049,7 @@ class ClientImpl extends TcpDiscoveryImpl { /** {@inheritDoc} */ @Override protected void body() throws InterruptedException { - TcpDiscoveryAbstractMessage msg = null; + TcpDiscoveryAbstractMessage msg; while (!Thread.currentThread().isInterrupted()) { Socket sock; @@ -1063,8 +1063,7 @@ class ClientImpl extends TcpDiscoveryImpl { continue; } - if (msg == null) - msg = queue.poll(); + msg = queue.poll(); if (msg == null) { mux.wait(); @@ -1121,19 +1120,7 @@ class ClientImpl extends TcpDiscoveryImpl { } } } - catch (IOException e) { - if (log.isDebugEnabled()) - U.error(log, "Failed to send node left message (will stop anyway) " + - "[sock=" + sock + ", msg=" + msg + ']', e); - - U.closeQuiet(sock); - - synchronized (mux) { - if (sock == this.sock) - this.sock = null; // Connection has dead. - } - } - catch (IgniteCheckedException e) { + catch (Exception e) { if (spi.getSpiContext().isStopping()) { if (log.isDebugEnabled()) log.debug("Failed to send message, node is stopping [msg=" + msg + ", err=" + e + ']'); @@ -1141,7 +1128,12 @@ class ClientImpl extends TcpDiscoveryImpl { else U.error(log, "Failed to send message: " + msg, e); - msg = null; + U.closeQuiet(sock); + + synchronized (mux) { + if (sock == this.sock) + this.sock = null; // Connection has dead. + } } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/5d4987a9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java new file mode 100644 index 0000000..d07fdd3 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java @@ -0,0 +1,386 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.ignite.Ignite; +import org.apache.ignite.Ignition; +import org.apache.ignite.cache.affinity.fair.FairAffinityFunction; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * + */ +public class CacheExchangeMessageDuplicatedStateTest extends GridCommonAbstractTest { + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final String AFF1_CACHE1 = "a1c1"; + + /** */ + private static final String AFF1_CACHE2 = "a1c2"; + + /** */ + private static final String AFF2_CACHE1 = "a2c1"; + + /** */ + private static final String AFF2_CACHE2 = "a2c2"; + + /** */ + private static final String AFF3_CACHE1 = "a3c1"; + + /** */ + private static final String AFF4_FILTER_CACHE1 = "a4c1"; + + /** */ + private static final String AFF4_FILTER_CACHE2 = "a4c2"; + + /** */ + private static final String AFF5_FILTER_CACHE1 = "a5c1"; + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + cfg.setClientMode(client); + + TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi(); + + commSpi.record(GridDhtPartitionsSingleMessage.class, GridDhtPartitionsFullMessage.class); + + cfg.setCommunicationSpi(commSpi); + + List<CacheConfiguration> ccfgs = new ArrayList<>(); + + { + CacheConfiguration ccfg = new CacheConfiguration(); + ccfg.setName(AFF1_CACHE1); + ccfg.setAffinity(new RendezvousAffinityFunction()); + ccfgs.add(ccfg); + } + { + CacheConfiguration ccfg = new CacheConfiguration(); + ccfg.setName(AFF1_CACHE2); + ccfg.setAffinity(new RendezvousAffinityFunction()); + ccfgs.add(ccfg); + } + { + CacheConfiguration ccfg = new CacheConfiguration(); + ccfg.setName(AFF2_CACHE1); + ccfg.setAffinity(new FairAffinityFunction()); + ccfgs.add(ccfg); + } + { + CacheConfiguration ccfg = new CacheConfiguration(); + ccfg.setName(AFF2_CACHE2); + ccfg.setAffinity(new FairAffinityFunction()); + ccfgs.add(ccfg); + } + { + CacheConfiguration ccfg = new CacheConfiguration(); + ccfg.setName(AFF3_CACHE1); + ccfg.setBackups(3); + + RendezvousAffinityFunction aff = new RendezvousAffinityFunction(false, 64); + ccfg.setAffinity(aff); + + ccfgs.add(ccfg); + } + { + CacheConfiguration ccfg = new CacheConfiguration(); + ccfg.setName(AFF4_FILTER_CACHE1); + ccfg.setNodeFilter(new TestNodeFilter()); + ccfg.setAffinity(new RendezvousAffinityFunction()); + ccfgs.add(ccfg); + } + { + CacheConfiguration ccfg = new CacheConfiguration(); + ccfg.setName(AFF4_FILTER_CACHE2); + ccfg.setNodeFilter(new TestNodeFilter()); + ccfg.setAffinity(new RendezvousAffinityFunction()); + ccfgs.add(ccfg); + } + { + CacheConfiguration ccfg = new CacheConfiguration(); + ccfg.setName(AFF5_FILTER_CACHE1); + ccfg.setNodeFilter(new TestNodeFilter()); + ccfg.setAffinity(new FairAffinityFunction()); + ccfgs.add(ccfg); + } + + cfg.setCacheConfiguration(ccfgs.toArray(new CacheConfiguration[ccfgs.size()])); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGrid(0); + } + + /** + * @throws Exception If failed. + */ + public void testExchangeMessages() throws Exception { + ignite(0); + + startGrid(1); + + awaitPartitionMapExchange(); + + checkMessages(0, true); + + startGrid(2); + + awaitPartitionMapExchange(); + + checkMessages(0, true); + + client = true; + + startGrid(3); + + awaitPartitionMapExchange(); + + checkMessages(0, false); + + stopGrid(0); + + awaitPartitionMapExchange(); + + checkMessages(1, true); + } + + /** + * @param crdIdx Coordinator node index. + * @param checkSingle {@code True} if need check single messages. + */ + private void checkMessages(int crdIdx, boolean checkSingle) { + checkFullMessages(crdIdx); + + if (checkSingle) + checkSingleMessages(crdIdx); + } + + /** + * @param crdIdx Coordinator node index. + */ + private void checkFullMessages(int crdIdx) { + TestRecordingCommunicationSpi commSpi0 = + (TestRecordingCommunicationSpi)ignite(crdIdx).configuration().getCommunicationSpi(); + + List<Object> msgs = commSpi0.recordedMessages(false); + + assertTrue(msgs.size() > 0); + + for (Object msg : msgs) { + assertTrue("Unexpected messages: " + msg, msg instanceof GridDhtPartitionsFullMessage); + + checkFullMessage((GridDhtPartitionsFullMessage)msg); + } + } + + /** + * @param crdIdx Coordinator node index. + */ + private void checkSingleMessages(int crdIdx) { + int cnt = 0; + + for (Ignite ignite : Ignition.allGrids()) { + if (getTestGridName(crdIdx).equals(ignite.name()) || ignite.configuration().isClientMode()) + continue; + + TestRecordingCommunicationSpi commSpi0 = + (TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi(); + + List<Object> msgs = commSpi0.recordedMessages(false); + + assertTrue(msgs.size() > 0); + + for (Object msg : msgs) { + assertTrue("Unexpected messages: " + msg, msg instanceof GridDhtPartitionsSingleMessage); + + checkSingleMessage((GridDhtPartitionsSingleMessage)msg); + } + + cnt++; + } + + assertTrue(cnt > 0); + } + + /** + * @param msg Message. + */ + private void checkFullMessage(GridDhtPartitionsFullMessage msg) { + Map<Integer, Integer> dupPartsData = GridTestUtils.getFieldValue(msg, "dupPartsData"); + + assertNotNull(dupPartsData); + + checkFullMessage(AFF1_CACHE1, AFF1_CACHE2, dupPartsData, msg); + checkFullMessage(AFF2_CACHE1, AFF2_CACHE2, dupPartsData, msg); + checkFullMessage(AFF4_FILTER_CACHE1, AFF4_FILTER_CACHE2, dupPartsData, msg); + + assertFalse(dupPartsData.containsKey(CU.cacheId(AFF3_CACHE1))); + assertFalse(dupPartsData.containsKey(CU.cacheId(AFF5_FILTER_CACHE1))); + + Map<Integer, Map<Integer, Long>> partCntrs = GridTestUtils.getFieldValue(msg, "partCntrs"); + + if (partCntrs != null) { + for (Map<Integer, Long> cntrs : partCntrs.values()) + assertTrue(cntrs.isEmpty()); + } + } + + /** + * @param msg Message. + */ + private void checkSingleMessage(GridDhtPartitionsSingleMessage msg) { + Map<Integer, Integer> dupPartsData = GridTestUtils.getFieldValue(msg, "dupPartsData"); + + assertNotNull(dupPartsData); + + checkSingleMessage(AFF1_CACHE1, AFF1_CACHE2, dupPartsData, msg); + checkSingleMessage(AFF2_CACHE1, AFF2_CACHE2, dupPartsData, msg); + checkSingleMessage(AFF4_FILTER_CACHE1, AFF4_FILTER_CACHE2, dupPartsData, msg); + + assertFalse(dupPartsData.containsKey(CU.cacheId(AFF3_CACHE1))); + assertFalse(dupPartsData.containsKey(CU.cacheId(AFF5_FILTER_CACHE1))); + + Map<Integer, Map<Integer, Long>> partCntrs = GridTestUtils.getFieldValue(msg, "partCntrs"); + + if (partCntrs != null) { + for (Map<Integer, Long> cntrs : partCntrs.values()) + assertTrue(cntrs.isEmpty()); + } + } + + /** + * @param cache1 Cache 1. + * @param cache2 Cache 2. + * @param dupPartsData Duplicated data map. + * @param msg Message. + */ + private void checkFullMessage(String cache1, + String cache2, + Map<Integer, Integer> dupPartsData, + GridDhtPartitionsFullMessage msg) + { + Integer cacheId; + Integer dupCacheId; + + if (dupPartsData.containsKey(CU.cacheId(cache1))) { + cacheId = CU.cacheId(cache1); + dupCacheId = CU.cacheId(cache2); + } + else { + cacheId = CU.cacheId(cache2); + dupCacheId = CU.cacheId(cache1); + } + + assertTrue(dupPartsData.containsKey(cacheId)); + assertEquals(dupCacheId, dupPartsData.get(cacheId)); + assertFalse(dupPartsData.containsKey(dupCacheId)); + + Map<Integer, GridDhtPartitionFullMap> parts = msg.partitions(); + + GridDhtPartitionFullMap emptyFullMap = parts.get(cacheId); + + for (GridDhtPartitionMap2 map : emptyFullMap.values()) + assertEquals(0, map.map().size()); + + GridDhtPartitionFullMap fullMap = parts.get(dupCacheId); + + for (GridDhtPartitionMap2 map : fullMap.values()) + assertFalse(map.map().isEmpty()); + } + + /** + * @param cache1 Cache 1. + * @param cache2 Cache 2. + * @param dupPartsData Duplicated data map. + * @param msg Message. + */ + private void checkSingleMessage(String cache1, + String cache2, + Map<Integer, Integer> dupPartsData, + GridDhtPartitionsSingleMessage msg) + { + Integer cacheId; + Integer dupCacheId; + + if (dupPartsData.containsKey(CU.cacheId(cache1))) { + cacheId = CU.cacheId(cache1); + dupCacheId = CU.cacheId(cache2); + } + else { + cacheId = CU.cacheId(cache2); + dupCacheId = CU.cacheId(cache1); + } + + assertTrue(dupPartsData.containsKey(cacheId)); + assertEquals(dupCacheId, dupPartsData.get(cacheId)); + assertFalse(dupPartsData.containsKey(dupCacheId)); + + Map<Integer, GridDhtPartitionMap2> parts = msg.partitions(); + + GridDhtPartitionMap2 emptyMap = parts.get(cacheId); + + assertEquals(0, emptyMap.map().size()); + + GridDhtPartitionMap2 map = parts.get(dupCacheId); + + assertFalse(map.map().isEmpty()); + } + + /** + * + */ + private static class TestNodeFilter implements IgnitePredicate<ClusterNode> { + /** {@inheritDoc} */ + @Override public boolean apply(ClusterNode node) { + // Do not start cache on coordinator. + return node.order() > 1; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5d4987a9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java index 87d02a5..cde6b8d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java @@ -41,9 +41,6 @@ public class GridCacheSyncReplicatedPreloadSelfTest extends GridCommonAbstractTe /** */ private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - /** */ - private static final boolean DISCO_DEBUG_MODE = false; - /** * Constructs test. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/5d4987a9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheSyncRebalanceModeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheSyncRebalanceModeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheSyncRebalanceModeSelfTest.java index 9b0637e..f3942d5 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheSyncRebalanceModeSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheSyncRebalanceModeSelfTest.java @@ -34,7 +34,9 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; public class IgniteCacheSyncRebalanceModeSelfTest extends GridCommonAbstractTest { /** Entry count. */ public static final int CNT = 100_000; - public static final String STATIC_CACHE_NAME = "static"; + + /** */ + private static final String STATIC_CACHE_NAME = "static"; /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
