Repository: ignite Updated Branches: refs/heads/ignite-4154 85475c0c8 -> a099e9117
ignite-4154 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a099e911 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a099e911 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a099e911 Branch: refs/heads/ignite-4154 Commit: a099e911703578389526095df8af2019e4f25f58 Parents: 85475c0 Author: sboikov <[email protected]> Authored: Wed Nov 9 15:54:31 2016 +0300 Committer: sboikov <[email protected]> Committed: Wed Nov 9 17:24:07 2016 +0300 ---------------------------------------------------------------------- .../GridCachePartitionExchangeManager.java | 24 +++++++++++++++++--- .../dht/preloader/GridDhtPartitionMap2.java | 16 ++++++------- .../GridDhtPartitionsAbstractMessage.java | 6 +++++ .../GridDhtPartitionsExchangeFuture.java | 2 ++ .../preloader/GridDhtPartitionsFullMessage.java | 5 ++-- .../GridDhtPartitionsSingleMessage.java | 1 + 6 files changed, 40 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a099e911/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 928500f..07d1d4c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -21,7 +21,6 @@ import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.Comparator; import java.util.Date; import java.util.HashMap; @@ -791,6 +790,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana for (ClusterNode node : nodes) { try { + assert !node.equals(cctx.localNode()); + cctx.io().sendNoRetry(node, m, SYSTEM_POOL); } catch (ClusterTopologyCheckedException ignore) { @@ -998,6 +999,23 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } } + for (GridClientPartitionTopology top : clientTops.values()) { + if (m.partitions() != null && m.partitions().containsKey(top.cacheId())) + continue; + + GridDhtPartitionMap2 locMap = top.localPartitionMap(); + + addPartitionMap(m, + dupData, + compress, + top.cacheId(), + locMap, + top.similarAffinityKey()); + + if (sndCounters) + m.partitionUpdateCounters(top.cacheId(), top.updateCounters(true)); + } + return m; } @@ -1010,7 +1028,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana * @param affKey Cache affinity key. */ private void addPartitionMap(GridDhtPartitionsSingleMessage m, - Map<Object, T2<Integer,Map<Integer, GridDhtPartitionState>>> dupData, + Map<Object, T2<Integer, Map<Integer, GridDhtPartitionState>>> dupData, boolean compress, Integer cacheId, GridDhtPartitionMap2 map, @@ -1023,7 +1041,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (state0 != null && state0.get2().equals(map.map())) { dupDataCache = state0.get1(); - map.map(U.<Integer, GridDhtPartitionState>newHashMap(0)); + map = map.emptyCopy(); } else dupData.put(affKey, new T2<>(cacheId, map.map())); http://git-wip-us.apache.org/repos/asf/ignite/blob/a099e911/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java index dc308ba..724debe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java @@ -61,6 +61,13 @@ public class GridDhtPartitionMap2 implements Comparable<GridDhtPartitionMap2>, E private volatile int moving; /** + * Empty constructor required for {@link Externalizable}. + */ + public GridDhtPartitionMap2() { + // No-op. + } + + /** * @param nodeId Node ID. * @param updateSeq Update sequence number. * @param top Topology version. @@ -116,14 +123,7 @@ public class GridDhtPartitionMap2 implements Comparable<GridDhtPartitionMap2>, E updateSeq, top, U.<Integer, GridDhtPartitionState>newHashMap(0), - moving); - } - - /** - * Empty constructor required for {@link Externalizable}. - */ - public GridDhtPartitionMap2() { - // No-op. + 0); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/a099e911/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java index a3bb5f6..6e69161 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java @@ -89,10 +89,16 @@ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage return lastVer; } + /** + * @return {@code True} if message data is compressed. + */ protected final boolean compressed() { return (flags & COMPRESSED_FLAG_MASK) != 0; } + /** + * @param compressed {@code True} if message data is compressed. + */ protected final void compressed(boolean compressed) { flags = compressed ? (byte)(flags | COMPRESSED_FLAG_MASK) : (byte)(flags & ~COMPRESSED_FLAG_MASK); } http://git-wip-us.apache.org/repos/asf/ignite/blob/a099e911/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index a79aba3..f391265 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -972,6 +972,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT private void sendAllPartitions(Collection<ClusterNode> nodes) throws IgniteCheckedException { GridDhtPartitionsFullMessage m = createPartitionsMessage(nodes, true); + assert !nodes.contains(cctx.localNode()); + if (log.isDebugEnabled()) log.debug("Sending full partition map [nodeIds=" + F.viewReadOnly(nodes, F.node2id()) + ", exchId=" + exchId + ", msg=" + m + ']'); http://git-wip-us.apache.org/repos/asf/ignite/blob/a099e911/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index 3d2d380..053b0aa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@ -234,12 +234,11 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa for (Map.Entry<Integer, Integer> e : dupPartsData.entrySet()) { GridDhtPartitionFullMap map1 = parts.get(e.getKey()); - - assert map1 != null : e.getKey(); - GridDhtPartitionFullMap map2 = parts.get(e.getValue()); + assert map1 != null : e.getKey(); assert map2 != null : e.getValue(); + assert map1.size() == map2.size(); for (Map.Entry<UUID, GridDhtPartitionMap2> e0 : map2.entrySet()) { GridDhtPartitionMap2 partMap1 = map1.get(e0.getKey()); http://git-wip-us.apache.org/repos/asf/ignite/blob/a099e911/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java index 416d298..f135ea6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java @@ -114,6 +114,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes if (dupDataCache != null) { assert F.isEmpty(locMap.map()); + assert parts.containsKey(dupDataCache); if (dupPartsData == null) dupPartsData = new HashMap<>();
