ignite-4154 Optimize amount of data stored in discovery history Discovery history optimizations: - remove discarded message for discovery pending messages - remove duplicated data from TcpDiscoveryNodeAddedMessage.oldNodesDiscoData - do not store unnecessary data in discovery EnsuredMessageHistory - use special property for EnsuredMessageHistory size instead of IGNITE_DISCOVERY_HISTORY_SIZE Affinity history optimizations: - do not store calculated primary/backup maps in history - try save the same assignments instance for caches with similar affinity Exchange messages optimizations: - do not send duplicated partition state maps for caches with similar affinity - use zip compression for data sent in exchange messages
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7128a395 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7128a395 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7128a395 Branch: refs/heads/ignite-4242 Commit: 7128a395085b60e86436f807b4bdbca83627d41a Parents: 8bb8bdd Author: sboikov <[email protected]> Authored: Fri Nov 11 15:29:38 2016 +0300 Committer: sboikov <[email protected]> Committed: Fri Nov 11 15:29:38 2016 +0300 ---------------------------------------------------------------------- .../apache/ignite/IgniteSystemProperties.java | 4 + .../processors/affinity/AffinityAssignment.java | 88 +++++ .../affinity/GridAffinityAssignment.java | 8 +- .../affinity/GridAffinityAssignmentCache.java | 35 +- .../affinity/GridAffinityProcessor.java | 89 ++++- .../processors/affinity/GridAffinityUtils.java | 8 +- .../affinity/HistoryAffinityAssignment.java | 169 ++++++++ .../cache/CacheAffinitySharedManager.java | 57 ++- .../cache/DynamicCacheChangeBatch.java | 7 + .../cache/GridCacheAffinityManager.java | 6 +- .../GridCachePartitionExchangeManager.java | 284 ++++++++++++-- .../processors/cache/GridCacheProcessor.java | 5 +- .../dht/GridClientPartitionTopology.java | 33 +- .../dht/GridDhtPartitionTopology.java | 3 +- .../dht/GridDhtPartitionTopologyImpl.java | 31 +- .../dht/preloader/GridDhtPartitionFullMap.java | 18 + .../dht/preloader/GridDhtPartitionMap2.java | 53 ++- .../GridDhtPartitionsAbstractMessage.java | 40 +- .../GridDhtPartitionsExchangeFuture.java | 84 +--- .../preloader/GridDhtPartitionsFullMessage.java | 150 ++++++- .../GridDhtPartitionsSingleMessage.java | 132 ++++++- .../GridDhtPartitionsSingleRequest.java | 4 +- .../dht/preloader/GridDhtPreloader.java | 4 +- .../continuous/GridContinuousProcessor.java | 4 +- .../ignite/internal/util/IgniteUtils.java | 64 +++ .../ignite/spi/discovery/tcp/ClientImpl.java | 26 +- .../ignite/spi/discovery/tcp/ServerImpl.java | 234 +++++++++-- .../TcpDiscoveryNodeAddFinishedMessage.java | 11 + .../messages/TcpDiscoveryNodeAddedMessage.java | 33 +- ...CacheExchangeMessageDuplicatedStateTest.java | 393 +++++++++++++++++++ .../cache/IgniteCachePeekModesAbstractTest.java | 2 +- .../distributed/IgniteCacheGetRestartTest.java | 3 + ...cingDelayedPartitionMapExchangeSelfTest.java | 8 +- .../GridCacheRebalancingSyncSelfTest.java | 18 +- .../GridCacheSyncReplicatedPreloadSelfTest.java | 3 - .../IgniteCacheSyncRebalanceModeSelfTest.java | 4 +- ...ContinuousQueryFailoverAbstractSelfTest.java | 2 +- .../IgniteNoCustomEventsOnNodeStart.java | 7 + .../spi/discovery/tcp/TcpDiscoverySelfTest.java | 134 ++++++- .../junits/common/GridCommonAbstractTest.java | 25 +- .../testsuites/IgniteCacheTestSuite2.java | 3 + .../cache/IgniteCacheOffheapEvictQueryTest.java | 7 + ...lientQueryReplicatedNodeRestartSelfTest.java | 7 + ...butedQueryStopOnCancelOrTimeoutSelfTest.java | 7 + .../query/h2/sql/GridQueryParsingTest.java | 11 +- .../src/test/config/incorrect-store-cache.xml | 2 + .../src/test/config/jdbc-pojo-store-builtin.xml | 3 + .../src/test/config/jdbc-pojo-store-obj.xml | 3 + modules/spring/src/test/config/node.xml | 2 + modules/spring/src/test/config/node1.xml | 2 + .../test/config/pojo-incorrect-store-cache.xml | 2 + modules/spring/src/test/config/store-cache.xml | 2 + modules/spring/src/test/config/store-cache1.xml | 2 + 53 files changed, 2061 insertions(+), 275 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index ab6403f..a75027b 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -383,6 +383,10 @@ public final class IgniteSystemProperties { /** Maximum size for discovery messages history. */ public static final String IGNITE_DISCOVERY_HISTORY_SIZE = "IGNITE_DISCOVERY_HISTORY_SIZE"; + /** Maximum number of discovery message history used to support client reconnect. */ + public static final String IGNITE_DISCOVERY_CLIENT_RECONNECT_HISTORY_SIZE = + "IGNITE_DISCOVERY_CLIENT_RECONNECT_HISTORY_SIZE"; + /** Number of cache operation retries in case of topology exceptions. */ public static final String IGNITE_CACHE_RETRIES_COUNT = "IGNITE_CACHE_RETRIES_COUNT"; http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java new file mode 100644 index 0000000..06207d3 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.affinity; + +import org.apache.ignite.cluster.ClusterNode; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +/** + * Cached affinity calculations. + */ +public interface AffinityAssignment { + /** + * @return {@code True} if related discovery event did not not cause affinity assignment change and + * this assignment is just reference to the previous one. + */ + public boolean clientEventChange(); + + /** + * @return Affinity assignment computed by affinity function. + */ + public List<List<ClusterNode>> idealAssignment(); + + /** + * @return Affinity assignment. + */ + public List<List<ClusterNode>> assignment(); + + /** + * @return Topology version. + */ + public AffinityTopologyVersion topologyVersion(); + + /** + * Get affinity nodes for partition. + * + * @param part Partition. + * @return Affinity nodes. + */ + public List<ClusterNode> get(int part); + + /** + * Get affinity node IDs for partition. + * + * @param part Partition. + * @return Affinity nodes IDs. + */ + public HashSet<UUID> getIds(int part); + + /** + * @return Nodes having primary partitions assignments. + */ + public Set<ClusterNode> primaryPartitionNodes(); + + /** + * Get primary partitions for specified node ID. + * + * @param nodeId Node ID to get primary partitions for. + * @return Primary partitions for specified node ID. + */ + public Set<Integer> primaryPartitions(UUID nodeId); + + /** + * Get backup partitions for specified node ID. + * + * @param nodeId Node ID to get backup partitions for. + * @return Backup partitions for specified node ID. + */ + public Set<Integer> backupPartitions(UUID nodeId); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java index 568e4e8..35130a3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java @@ -32,7 +32,7 @@ import org.apache.ignite.internal.util.typedef.internal.S; /** * Cached affinity calculations. */ -public class GridAffinityAssignment implements Serializable { +public class GridAffinityAssignment implements AffinityAssignment, Serializable { /** */ private static final long serialVersionUID = 0L; @@ -86,7 +86,7 @@ public class GridAffinityAssignment implements Serializable { this.topVer = topVer; this.assignment = assignment; - this.idealAssignment = idealAssignment; + this.idealAssignment = idealAssignment.equals(assignment) ? assignment : idealAssignment; primary = new HashMap<>(); backup = new HashMap<>(); @@ -274,10 +274,10 @@ public class GridAffinityAssignment implements Serializable { if (o == this) return true; - if (o == null || getClass() != o.getClass()) + if (o == null || !(o instanceof AffinityAssignment)) return false; - return topVer.equals(((GridAffinityAssignment)o).topVer); + return topVer.equals(((AffinityAssignment)o).topologyVersion()); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java index a81b34d..a388c7a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java @@ -78,7 +78,7 @@ public class GridAffinityAssignmentCache { private final int partsCnt; /** Affinity calculation results cache: topology version => partition => nodes. */ - private final ConcurrentNavigableMap<AffinityTopologyVersion, GridAffinityAssignment> affCache; + private final ConcurrentNavigableMap<AffinityTopologyVersion, HistoryAffinityAssignment> affCache; /** */ private List<List<ClusterNode>> idealAssignment; @@ -107,6 +107,9 @@ public class GridAffinityAssignmentCache { /** Full history size. */ private final AtomicInteger fullHistSize = new AtomicInteger(); + /** */ + private final Object similarAffKey; + /** * Constructs affinity cached calculations. * @@ -127,6 +130,7 @@ public class GridAffinityAssignmentCache { { assert ctx != null; assert aff != null; + assert nodeFilter != null; this.ctx = ctx; this.aff = aff; @@ -142,6 +146,17 @@ public class GridAffinityAssignmentCache { partsCnt = aff.partitions(); affCache = new ConcurrentSkipListMap<>(); head = new AtomicReference<>(new GridAffinityAssignment(AffinityTopologyVersion.NONE)); + + similarAffKey = ctx.affinity().similaryAffinityKey(aff, nodeFilter, backups, partsCnt); + + assert similarAffKey != null; + } + + /** + * @return Key to find caches with similar affinity. + */ + public Object similarAffinityKey() { + return similarAffKey; } /** @@ -170,7 +185,7 @@ public class GridAffinityAssignmentCache { GridAffinityAssignment assignment = new GridAffinityAssignment(topVer, affAssignment, idealAssignment); - affCache.put(topVer, assignment); + affCache.put(topVer, new HistoryAffinityAssignment(assignment)); head.set(assignment); for (Map.Entry<AffinityTopologyVersion, AffinityReadyFuture> entry : readyFuts.entrySet()) { @@ -300,7 +315,7 @@ public class GridAffinityAssignmentCache { GridAffinityAssignment assignmentCpy = new GridAffinityAssignment(topVer, aff); - affCache.put(topVer, assignmentCpy); + affCache.put(topVer, new HistoryAffinityAssignment(assignmentCpy)); head.set(assignmentCpy); for (Map.Entry<AffinityTopologyVersion, AffinityReadyFuture> entry : readyFuts.entrySet()) { @@ -328,7 +343,7 @@ public class GridAffinityAssignmentCache { * @return Affinity assignment. */ public List<List<ClusterNode>> assignments(AffinityTopologyVersion topVer) { - GridAffinityAssignment aff = cachedAffinity(topVer); + AffinityAssignment aff = cachedAffinity(topVer); return aff.assignment(); } @@ -427,7 +442,7 @@ public class GridAffinityAssignmentCache { * @param topVer Topology version. * @return Cached affinity. */ - public GridAffinityAssignment cachedAffinity(AffinityTopologyVersion topVer) { + public AffinityAssignment cachedAffinity(AffinityTopologyVersion topVer) { if (topVer.equals(AffinityTopologyVersion.NONE)) topVer = lastVersion(); else @@ -435,7 +450,7 @@ public class GridAffinityAssignmentCache { assert topVer.topologyVersion() >= 0 : topVer; - GridAffinityAssignment cache = head.get(); + AffinityAssignment cache = head.get(); if (!cache.topologyVersion().equals(topVer)) { cache = affCache.get(topVer); @@ -463,7 +478,7 @@ public class GridAffinityAssignmentCache { * @return {@code True} if primary changed or required affinity version not found in history. */ public boolean primaryChanged(int part, AffinityTopologyVersion startVer, AffinityTopologyVersion endVer) { - GridAffinityAssignment aff = affCache.get(startVer); + AffinityAssignment aff = affCache.get(startVer); if (aff == null) return false; @@ -475,7 +490,7 @@ public class GridAffinityAssignmentCache { ClusterNode primary = nodes.get(0); - for (GridAffinityAssignment assignment : affCache.tailMap(startVer, false).values()) { + for (AffinityAssignment assignment : affCache.tailMap(startVer, false).values()) { List<ClusterNode> nodes0 = assignment.assignment().get(part); if (nodes0.isEmpty()) @@ -549,10 +564,10 @@ public class GridAffinityAssignmentCache { } if (rmvCnt > 0) { - Iterator<GridAffinityAssignment> it = affCache.values().iterator(); + Iterator<HistoryAffinityAssignment> it = affCache.values().iterator(); while (it.hasNext() && rmvCnt > 0) { - GridAffinityAssignment aff0 = it.next(); + AffinityAssignment aff0 = it.next(); it.remove(); http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java index 1726d02..b9182ae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java @@ -58,6 +58,7 @@ import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -385,10 +386,16 @@ public class GridAffinityProcessor extends GridProcessorAdapter { } try { + AffinityAssignment assign0 = cctx.affinity().assignment(topVer); + + GridAffinityAssignment assign = assign0 instanceof GridAffinityAssignment ? + (GridAffinityAssignment)assign0 : + new GridAffinityAssignment(topVer, assign0.assignment(), assign0.idealAssignment()); + AffinityInfo info = new AffinityInfo( cctx.config().getAffinity(), cctx.config().getAffinityMapper(), - new GridAffinityAssignment(topVer, cctx.affinity().assignment(topVer)), + assign, cctx.cacheObjectContext()); IgniteInternalFuture<AffinityInfo> old = affMap.putIfAbsent(key, new GridFinishedFuture<>(info)); @@ -562,6 +569,20 @@ public class GridAffinityProcessor extends GridProcessorAdapter { return nodes.iterator().next(); } + /** + * @param aff Affinity function. + * @param nodeFilter Node class. + * @param backups Number of backups. + * @param parts Number of partitions. + * @return Key to find caches with similar affinity. + */ + public Object similaryAffinityKey(AffinityFunction aff, + IgnitePredicate<ClusterNode> nodeFilter, + int backups, + int parts) { + return new SimilarAffinityKey(aff.getClass(), nodeFilter.getClass(), backups, parts); + } + /** {@inheritDoc} */ @Override public void printMemoryStats() { X.println(">>>"); @@ -960,4 +981,70 @@ public class GridAffinityProcessor extends GridProcessorAdapter { return aff; } } + + /** + * + */ + private static class SimilarAffinityKey { + /** */ + private final int backups; + + /** */ + private final Class<?> affFuncCls; + + /** */ + private final Class<?> filterCls; + + /** */ + private final int partsCnt; + + /** */ + private final int hash; + + /** + * @param affFuncCls Affinity function class. + * @param filterCls Node filter class. + * @param backups Number of backups. + * @param partsCnt Number of partitions. + */ + SimilarAffinityKey(Class<?> affFuncCls, Class<?> filterCls, int backups, int partsCnt) { + this.backups = backups; + this.affFuncCls = affFuncCls; + this.filterCls = filterCls; + this.partsCnt = partsCnt; + + int hash = backups; + hash = 31 * hash + affFuncCls.hashCode(); + hash = 31 * hash + filterCls.hashCode(); + hash= 31 * hash + partsCnt; + + this.hash = hash; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return hash; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (o == this) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + SimilarAffinityKey key = (SimilarAffinityKey)o; + + return backups == key.backups && + affFuncCls == key.affFuncCls && + filterCls == key.filterCls && + partsCnt == key.partsCnt; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(SimilarAffinityKey.class, this); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java index c24dd2d..abd5292 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java @@ -180,10 +180,16 @@ class GridAffinityUtils { cctx.affinity().affinityReadyFuture(topVer).get(); + AffinityAssignment assign0 = cctx.affinity().assignment(topVer); + + GridAffinityAssignment assign = assign0 instanceof GridAffinityAssignment ? + (GridAffinityAssignment)assign0 : + new GridAffinityAssignment(topVer, assign0.assignment(), assign0.idealAssignment()); + return F.t( affinityMessage(ctx, cctx.config().getAffinity()), affinityMessage(ctx, cctx.config().getAffinityMapper()), - new GridAffinityAssignment(topVer, cctx.affinity().assignment(topVer))); + assign); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java new file mode 100644 index 0000000..e502dd5 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.affinity; + +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +/** + * + */ +public class HistoryAffinityAssignment implements AffinityAssignment { + /** */ + private final AffinityTopologyVersion topVer; + + /** */ + private final List<List<ClusterNode>> assignment; + + /** */ + private final List<List<ClusterNode>> idealAssignment; + + /** */ + private final boolean clientEvtChange; + + /** + * @param assign Assignment. + */ + public HistoryAffinityAssignment(GridAffinityAssignment assign) { + this.topVer = assign.topologyVersion(); + this.assignment = assign.assignment(); + this.idealAssignment = assign.idealAssignment(); + this.clientEvtChange = assign.clientEventChange(); + } + + /** {@inheritDoc} */ + @Override public boolean clientEventChange() { + return clientEvtChange; + } + + /** {@inheritDoc} */ + @Override public List<List<ClusterNode>> idealAssignment() { + return idealAssignment; + } + + /** {@inheritDoc} */ + @Override public List<List<ClusterNode>> assignment() { + return assignment; + } + + /** {@inheritDoc} */ + @Override public AffinityTopologyVersion topologyVersion() { + return topVer; + } + + /** {@inheritDoc} */ + @Override public List<ClusterNode> get(int part) { + assert part >= 0 && part < assignment.size() : "Affinity partition is out of range" + + " [part=" + part + ", partitions=" + assignment.size() + ']'; + + return assignment.get(part); + } + + /** {@inheritDoc} */ + @Override public HashSet<UUID> getIds(int part) { + assert part >= 0 && part < assignment.size() : "Affinity partition is out of range" + + " [part=" + part + ", partitions=" + assignment.size() + ']'; + + List<ClusterNode> nodes = assignment.get(part); + + HashSet<UUID> ids = U.newHashSet(nodes.size()); + + for (int i = 0; i < nodes.size(); i++) + ids.add(nodes.get(i).id()); + + return ids; + } + + /** {@inheritDoc} */ + @Override public Set<ClusterNode> primaryPartitionNodes() { + Set<ClusterNode> res = new HashSet<>(); + + for (int p = 0; p < assignment.size(); p++) { + List<ClusterNode> nodes = assignment.get(p); + + if (!F.isEmpty(nodes)) + res.add(nodes.get(0)); + } + + return res; + } + + /** {@inheritDoc} */ + @Override public Set<Integer> primaryPartitions(UUID nodeId) { + Set<Integer> res = new HashSet<>(); + + for (int p = 0; p < assignment.size(); p++) { + List<ClusterNode> nodes = assignment.get(p); + + if (!F.isEmpty(nodes) && nodes.get(0).id().equals(nodeId)) + res.add(p); + } + + return res; + } + + /** {@inheritDoc} */ + @Override public Set<Integer> backupPartitions(UUID nodeId) { + Set<Integer> res = new HashSet<>(); + + for (int p = 0; p < assignment.size(); p++) { + List<ClusterNode> nodes = assignment.get(p); + + for (int i = 1; i < nodes.size(); i++) { + ClusterNode node = nodes.get(i); + + if (node.id().equals(nodeId)) { + res.add(p); + + break; + } + } + } + + return res; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return topVer.hashCode(); + } + + /** {@inheritDoc} */ + @SuppressWarnings("SimplifiableIfStatement") + @Override public boolean equals(Object o) { + if (o == this) + return true; + + if (o == null || !(o instanceof AffinityAssignment)) + return false; + + return topVer.equals(((AffinityAssignment)o).topologyVersion()); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HistoryAffinityAssignment.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 1aedf4e..2890887 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -127,7 +127,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @param node Event node. * @param topVer Topology version. */ - public void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer) { + void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer) { if (type == EVT_NODE_JOINED && node.isLocal()) { // Clean-up in case of client reconnect. registeredCaches.clear(); @@ -153,7 +153,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @param msg Customer message. * @return {@code True} if minor topology version should be increased. */ - public boolean onCustomEvent(CacheAffinityChangeMessage msg) { + boolean onCustomEvent(CacheAffinityChangeMessage msg) { assert lateAffAssign : msg; if (msg.exchangeId() != null) { @@ -219,7 +219,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @param top Topology. * @param checkCacheId Cache ID. */ - public void checkRebalanceState(GridDhtPartitionTopology top, Integer checkCacheId) { + void checkRebalanceState(GridDhtPartitionTopology top, Integer checkCacheId) { if (!lateAffAssign) return; @@ -508,6 +508,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap assert assignment != null; + final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>(); + forAllCaches(crd, new IgniteInClosureX<GridAffinityAssignmentCache>() { @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException { List<List<ClusterNode>> idealAssignment = aff.idealAssignment(); @@ -527,7 +529,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap else newAssignment = idealAssignment; - aff.initialize(topVer, newAssignment); + aff.initialize(topVer, cachedAssignment(aff, newAssignment, affCache)); } }); } @@ -562,6 +564,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap final Map<Integer, IgniteUuid> deploymentIds = msg.cacheDeploymentIds(); + final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>(); + forAllCaches(crd, new IgniteInClosureX<GridAffinityAssignmentCache>() { @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException { AffinityTopologyVersion affTopVer = aff.lastVersion(); @@ -602,7 +606,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap assignment.set(part, nodes); } - aff.initialize(topVer, assignment); + aff.initialize(topVer, cachedAssignment(aff, assignment, affCache)); } else aff.clientEventTopologyChange(exchFut.discoveryEvent(), topVer); @@ -1206,6 +1210,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap throws IgniteCheckedException { AffinityTopologyVersion topVer = fut.topologyVersion(); + final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>(); + if (!crd) { for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (cacheCtx.isLocal()) @@ -1213,7 +1219,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap boolean latePrimary = cacheCtx.rebalanceEnabled(); - initAffinityOnNodeJoin(fut, cacheCtx.affinity().affinityCache(), null, latePrimary); + initAffinityOnNodeJoin(fut, cacheCtx.affinity().affinityCache(), null, latePrimary, affCache); } return null; @@ -1227,7 +1233,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap boolean latePrimary = cache.rebalanceEnabled; - initAffinityOnNodeJoin(fut, cache.affinity(), waitRebalanceInfo, latePrimary); + initAffinityOnNodeJoin(fut, cache.affinity(), waitRebalanceInfo, latePrimary, affCache); } }); @@ -1240,12 +1246,14 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @param aff Affinity. * @param rebalanceInfo Rebalance information. * @param latePrimary If {@code true} delays primary assignment if it is not owner. + * @param affCache Already calculated assignments (to reduce data stored in history). * @throws IgniteCheckedException If failed. */ private void initAffinityOnNodeJoin(GridDhtPartitionsExchangeFuture fut, GridAffinityAssignmentCache aff, WaitRebalanceInfo rebalanceInfo, - boolean latePrimary) + boolean latePrimary, + Map<Object, List<List<ClusterNode>>> affCache) throws IgniteCheckedException { assert lateAffAssign; @@ -1292,7 +1300,26 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (newAssignment == null) newAssignment = idealAssignment; - aff.initialize(fut.topologyVersion(), newAssignment); + aff.initialize(fut.topologyVersion(), cachedAssignment(aff, newAssignment, affCache)); + } + + /** + * @param aff Assignment cache. + * @param assign Assignment. + * @param affCache Assignments already calculated for other caches. + * @return Assignment. + */ + private List<List<ClusterNode>> cachedAssignment(GridAffinityAssignmentCache aff, + List<List<ClusterNode>> assign, + Map<Object, List<List<ClusterNode>>> affCache) { + List<List<ClusterNode>> assign0 = affCache.get(aff.similarAffinityKey()); + + if (assign0 != null && assign0.equals(assign)) + assign = assign0; + else + affCache.put(aff.similarAffinityKey(), assign); + + return assign; } /** @@ -1367,7 +1394,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @return Affinity assignment. * @throws IgniteCheckedException If failed. */ - public Map<Integer, Map<Integer, List<UUID>>> initAffinityOnNodeLeft0(final GridDhtPartitionsExchangeFuture fut) + private Map<Integer, Map<Integer, List<UUID>>> initAffinityOnNodeLeft0(final GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException { final AffinityTopologyVersion topVer = fut.topologyVersion(); @@ -1554,7 +1581,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @param aff Affinity cache. * @param initAff Existing affinity cache. */ - public CacheHolder(boolean rebalanceEnabled, GridAffinityAssignmentCache aff, @Nullable GridAffinityAssignmentCache initAff) { + CacheHolder(boolean rebalanceEnabled, + GridAffinityAssignmentCache aff, + @Nullable GridAffinityAssignmentCache initAff) { this.aff = aff; if (initAff != null) @@ -1606,7 +1635,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap /** * Created cache is started on coordinator. */ - class CacheHolder1 extends CacheHolder { + private class CacheHolder1 extends CacheHolder { /** */ private final GridCacheContext cctx; @@ -1614,7 +1643,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @param cctx Cache context. * @param initAff Current affinity. */ - public CacheHolder1(GridCacheContext cctx, @Nullable GridAffinityAssignmentCache initAff) { + CacheHolder1(GridCacheContext cctx, @Nullable GridAffinityAssignmentCache initAff) { super(cctx.rebalanceEnabled(), cctx.affinity().affinityCache(), initAff); assert !cctx.isLocal() : cctx.name(); @@ -1651,7 +1680,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap /** * Created if cache is not started on coordinator. */ - static class CacheHolder2 extends CacheHolder { + private static class CacheHolder2 extends CacheHolder { /** */ private final GridCacheSharedContext cctx; http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java index e10e5aa..4dcff9b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java @@ -62,6 +62,13 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage { } /** + * @param id Message ID. + */ + public void id(IgniteUuid id) { + this.id = id; + } + + /** * @return Collection of change requests. */ public Collection<DynamicCacheChangeRequest> requests() { http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java index 71ae5c9..c6e7ee6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java @@ -25,8 +25,8 @@ import org.apache.ignite.cache.affinity.AffinityKeyMapper; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.affinity.GridAffinityAssignment; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; import org.apache.ignite.internal.util.GridLeanSet; import org.apache.ignite.internal.util.future.GridFinishedFuture; @@ -48,7 +48,7 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter { private static final AffinityTopologyVersion LOC_CACHE_TOP_VER = new AffinityTopologyVersion(1); /** */ - public static final String FAILED_TO_FIND_CACHE_ERR_MSG = "Failed to find cache (cache was not started " + + private static final String FAILED_TO_FIND_CACHE_ERR_MSG = "Failed to find cache (cache was not started " + "yet or cache was already stopped): "; /** Affinity cached function. */ @@ -265,7 +265,7 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter { * @param topVer Topology version. * @return Affinity assignment. */ - public GridAffinityAssignment assignment(AffinityTopologyVersion topVer) { + public AffinityAssignment assignment(AffinityTopologyVersion topVer) { if (cctx.isLocal()) topVer = LOC_CACHE_TOP_VER; http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 00d2d16..503b334 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -44,7 +44,9 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.cache.affinity.AffinityFunction; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; @@ -56,6 +58,7 @@ import org.apache.ignite.internal.events.DiscoveryCustomEvent; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage; @@ -64,6 +67,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage; @@ -71,6 +75,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.util.GridListSet; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -80,6 +85,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.GPC; import org.apache.ignite.internal.util.typedef.internal.S; @@ -531,8 +537,23 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (top != null) return top; + Object affKey = null; + + DynamicCacheDescriptor desc = cctx.cache().cacheDescriptor(cacheId); + + if (desc != null) { + CacheConfiguration ccfg = desc.cacheConfiguration(); + + AffinityFunction aff = ccfg.getAffinity(); + + affKey = cctx.kernalContext().affinity().similaryAffinityKey(aff, + ccfg.getNodeFilter(), + ccfg.getBackups(), + aff.partitions()); + } + GridClientPartitionTopology old = clientTops.putIfAbsent(cacheId, - top = new GridClientPartitionTopology(cctx, cacheId, exchFut)); + top = new GridClientPartitionTopology(cctx, cacheId, exchFut, affKey)); return old != null ? old : top; } @@ -761,40 +782,16 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana * @param nodes Nodes. * @return {@code True} if message was sent, {@code false} if node left grid. */ - private boolean sendAllPartitions(Collection<? extends ClusterNode> nodes) { - GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(null, null, AffinityTopologyVersion.NONE); - - boolean useOldApi = false; - - for (ClusterNode node : nodes) { - if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0) - useOldApi = true; - } - - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (!cacheCtx.isLocal() && cacheCtx.started()) { - GridDhtPartitionFullMap locMap = cacheCtx.topology().partitionMap(true); - - if (useOldApi) { - locMap = new GridDhtPartitionFullMap(locMap.nodeId(), - locMap.nodeOrder(), - locMap.updateSequence(), - locMap); - } - - m.addFullPartitionsMap(cacheCtx.cacheId(), locMap); - } - } - - // It is important that client topologies be added after contexts. - for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) - m.addFullPartitionsMap(top.cacheId(), top.partitionMap(true)); + private boolean sendAllPartitions(Collection<ClusterNode> nodes) { + GridDhtPartitionsFullMessage m = createPartitionsFullMessage(nodes, null, null, true); if (log.isDebugEnabled()) log.debug("Sending all partitions [nodeIds=" + U.nodeIds(nodes) + ", msg=" + m + ']'); for (ClusterNode node : nodes) { try { + assert !node.equals(cctx.localNode()); + cctx.io().sendNoRetry(node, m, SYSTEM_POOL); } catch (ClusterTopologyCheckedException ignore) { @@ -811,31 +808,140 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** - * @param node Node. - * @param id ID. + * @param nodes Target nodes. + * @param exchId Non-null exchange ID if message is created for exchange. + * @param lastVer Last version. + * @param compress {@code True} if it is possible to use compression for message. + * @return Message. */ - private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) { - GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id, - cctx.kernalContext().clientNode(), - cctx.versions().last()); + public GridDhtPartitionsFullMessage createPartitionsFullMessage(Collection<ClusterNode> nodes, + @Nullable GridDhtPartitionExchangeId exchId, + @Nullable GridCacheVersion lastVer, + boolean compress) { + GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(exchId, + lastVer, + exchId != null ? exchId.topologyVersion() : AffinityTopologyVersion.NONE); + + boolean useOldApi = false; + + if (nodes != null) { + for (ClusterNode node : nodes) { + if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0) { + useOldApi = true; + compress = false; + + break; + } + else if (node.version().compareToIgnoreTimestamp(GridDhtPartitionsAbstractMessage.PART_MAP_COMPRESS_SINCE) < 0) + compress = false; + } + } + + m.compress(compress); + + Map<Object, T2<Integer, GridDhtPartitionFullMap>> dupData = new HashMap<>(); for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (!cacheCtx.isLocal()) { - GridDhtPartitionMap2 locMap = cacheCtx.topology().localPartitionMap(); + boolean ready; - if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0) - locMap = new GridDhtPartitionMap(locMap.nodeId(), locMap.updateSequence(), locMap.map()); + if (exchId != null) { + AffinityTopologyVersion startTopVer = cacheCtx.startTopologyVersion(); + + ready = startTopVer == null || startTopVer.compareTo(exchId.topologyVersion()) <= 0; + } + else + ready = cacheCtx.started(); + + if (ready) { + GridDhtPartitionFullMap locMap = cacheCtx.topology().partitionMap(true); - m.addLocalPartitionMap(cacheCtx.cacheId(), locMap); + if (useOldApi) { + locMap = new GridDhtPartitionFullMap(locMap.nodeId(), + locMap.nodeOrder(), + locMap.updateSequence(), + locMap); + } + + addFullPartitionsMap(m, + dupData, + compress, + cacheCtx.cacheId(), + locMap, + cacheCtx.affinity().affinityCache().similarAffinityKey()); + + if (exchId != null) + m.addPartitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters(true)); + } } } - for (GridClientPartitionTopology top : clientTops.values()) { - GridDhtPartitionMap2 locMap = top.localPartitionMap(); + // It is important that client topologies be added after contexts. + for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) { + GridDhtPartitionFullMap map = top.partitionMap(true); + + addFullPartitionsMap(m, + dupData, + compress, + top.cacheId(), + map, + top.similarAffinityKey()); + + if (exchId != null) + m.addPartitionUpdateCounters(top.cacheId(), top.updateCounters(true)); + } + + return m; + } - m.addLocalPartitionMap(top.cacheId(), locMap); + /** + * @param m Message. + * @param dupData Duplicated data map. + * @param compress {@code True} if need check for duplicated partition state data. + * @param cacheId Cache ID. + * @param map Map to add. + * @param affKey Cache affinity key. + */ + private void addFullPartitionsMap(GridDhtPartitionsFullMessage m, + Map<Object, T2<Integer, GridDhtPartitionFullMap>> dupData, + boolean compress, + Integer cacheId, + GridDhtPartitionFullMap map, + Object affKey) { + Integer dupDataCache = null; + + if (compress && affKey != null && !m.containsCache(cacheId)) { + T2<Integer, GridDhtPartitionFullMap> state0 = dupData.get(affKey); + + if (state0 != null && state0.get2().partitionStateEquals(map)) { + GridDhtPartitionFullMap map0 = new GridDhtPartitionFullMap(map.nodeId(), + map.nodeOrder(), + map.updateSequence()); + + for (Map.Entry<UUID, GridDhtPartitionMap2> e : map.entrySet()) + map0.put(e.getKey(), e.getValue().emptyCopy()); + + map = map0; + + dupDataCache = state0.get1(); + } + else + dupData.put(affKey, new T2<>(cacheId, map)); } + m.addFullPartitionsMap(cacheId, map, dupDataCache); + } + + /** + * @param node Node. + * @param id ID. + */ + private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) { + GridDhtPartitionsSingleMessage m = createPartitionsSingleMessage(node, + id, + cctx.kernalContext().clientNode(), + false); + if (log.isDebugEnabled()) log.debug("Sending local partitions [nodeId=" + node.id() + ", msg=" + m + ']'); @@ -853,6 +959,98 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** + * @param targetNode Target node. + * @param exchangeId ID. + * @param clientOnlyExchange Client exchange flag. + * @param sndCounters {@code True} if need send partition update counters. + * @return Message. + */ + public GridDhtPartitionsSingleMessage createPartitionsSingleMessage(ClusterNode targetNode, + @Nullable GridDhtPartitionExchangeId exchangeId, + boolean clientOnlyExchange, + boolean sndCounters) + { + boolean compress = + targetNode.version().compareToIgnoreTimestamp(GridDhtPartitionsSingleMessage.PART_MAP_COMPRESS_SINCE) >= 0; + + GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(exchangeId, + clientOnlyExchange, + cctx.versions().last(), + compress); + + Map<Object, T2<Integer,Map<Integer, GridDhtPartitionState>>> dupData = new HashMap<>(); + + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { + if (!cacheCtx.isLocal()) { + GridDhtPartitionMap2 locMap = cacheCtx.topology().localPartitionMap(); + + if (targetNode.version().compareTo(GridDhtPartitionMap2.SINCE) < 0) + locMap = new GridDhtPartitionMap(locMap.nodeId(), locMap.updateSequence(), locMap.map()); + + addPartitionMap(m, + dupData, + compress, + cacheCtx.cacheId(), + locMap, + cacheCtx.affinity().affinityCache().similarAffinityKey()); + + if (sndCounters) + m.partitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters(true)); + } + } + + for (GridClientPartitionTopology top : clientTops.values()) { + if (m.partitions() != null && m.partitions().containsKey(top.cacheId())) + continue; + + GridDhtPartitionMap2 locMap = top.localPartitionMap(); + + addPartitionMap(m, + dupData, + compress, + top.cacheId(), + locMap, + top.similarAffinityKey()); + + if (sndCounters) + m.partitionUpdateCounters(top.cacheId(), top.updateCounters(true)); + } + + return m; + } + + /** + * @param m Message. + * @param dupData Duplicated data map. + * @param compress {@code True} if need check for duplicated partition state data. + * @param cacheId Cache ID. + * @param map Map to add. + * @param affKey Cache affinity key. + */ + private void addPartitionMap(GridDhtPartitionsSingleMessage m, + Map<Object, T2<Integer, Map<Integer, GridDhtPartitionState>>> dupData, + boolean compress, + Integer cacheId, + GridDhtPartitionMap2 map, + Object affKey) { + Integer dupDataCache = null; + + if (compress) { + T2<Integer, Map<Integer, GridDhtPartitionState>> state0 = dupData.get(affKey); + + if (state0 != null && state0.get2().equals(map.map())) { + dupDataCache = state0.get1(); + + map = map.emptyCopy(); + } + else + dupData.put(affKey, new T2<>(cacheId, map.map())); + } + + m.addLocalPartitionMap(cacheId, map, dupDataCache); + } + + /** * @param nodeId Cause node ID. * @param topVer Topology version. * @param evt Event type. @@ -869,7 +1067,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana * @param affChangeMsg Affinity change message. * @return Exchange future. */ - GridDhtPartitionsExchangeFuture exchangeFuture(GridDhtPartitionExchangeId exchId, + private GridDhtPartitionsExchangeFuture exchangeFuture(GridDhtPartitionExchangeId exchId, @Nullable DiscoveryEvent discoEvt, @Nullable Collection<DynamicCacheChangeRequest> reqs, @Nullable CacheAffinityChangeMessage affChangeMsg) { http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index fd6abbd..5e777fd 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -1958,8 +1958,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { req.template(true); - req.deploymentId(desc.deploymentId()); - reqs.add(req); } @@ -1972,6 +1970,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { batch.clientReconnect(reconnect); + // Reset random batch ID so that serialized batches with the same descriptors will be exactly the same. + batch.id(null); + return batch; } http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index 58933b7..5efb317 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -61,6 +61,9 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { /** Flag to control amount of output for full map. */ private static final boolean FULL_MAP_DEBUG = false; + /** */ + private static final Long ZERO = 0L; + /** Cache shared context. */ private GridCacheSharedContext cctx; @@ -97,18 +100,24 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { /** Partition update counters. */ private Map<Integer, Long> cntrMap = new HashMap<>(); + /** */ + private final Object similarAffKey; + /** * @param cctx Context. * @param cacheId Cache ID. * @param exchFut Exchange ID. + * @param similarAffKey Key to find caches with similar affinity. */ public GridClientPartitionTopology( GridCacheSharedContext cctx, int cacheId, - GridDhtPartitionsExchangeFuture exchFut + GridDhtPartitionsExchangeFuture exchFut, + Object similarAffKey ) { this.cctx = cctx; this.cacheId = cacheId; + this.similarAffKey = similarAffKey; topVer = exchFut.topologyVersion(); @@ -125,6 +134,13 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } /** + * @return Key to find caches with similar affinity. + */ + @Nullable public Object similarAffinityKey() { + return similarAffKey; + } + + /** * @return Full map string representation. */ @SuppressWarnings( {"ConstantConditions"}) @@ -873,11 +889,22 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } /** {@inheritDoc} */ - @Override public Map<Integer, Long> updateCounters() { + @Override public Map<Integer, Long> updateCounters(boolean skipZeros) { lock.readLock().lock(); try { - return new HashMap<>(cntrMap); + if (skipZeros) { + Map<Integer, Long> res = U.newHashMap(cntrMap.size()); + + for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) { + if (!e.getValue().equals(ZERO)) + res.put(e.getKey(), e.getValue()); + } + + return res; + } + else + return new HashMap<>(cntrMap); } finally { lock.readLock().unlock(); http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index 6e9b907..4ae4e47 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -220,9 +220,10 @@ public interface GridDhtPartitionTopology { @Nullable Map<Integer, Long> cntrMap); /** + * @param skipZeros If {@code true} then filters out zero counters. * @return Partition update counters. */ - public Map<Integer, Long> updateCounters(); + public Map<Integer, Long> updateCounters(boolean skipZeros); /** * @param part Partition to own. http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 50f7f0f..f3751ac 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -35,8 +35,8 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.affinity.GridAffinityAssignment; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; @@ -71,6 +71,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** Flag to control amount of output for full map. */ private static final boolean FULL_MAP_DEBUG = false; + /** */ + private static final Long ZERO = 0L; + /** Context. */ private final GridCacheContext<?, ?> cctx; @@ -859,7 +862,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** {@inheritDoc} */ @Override public List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer) { - GridAffinityAssignment affAssignment = cctx.affinity().assignment(topVer); + AffinityAssignment affAssignment = cctx.affinity().assignment(topVer); List<ClusterNode> affNodes = affAssignment.get(p); @@ -1500,11 +1503,26 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** {@inheritDoc} */ - @Override public Map<Integer, Long> updateCounters() { + @Override public Map<Integer, Long> updateCounters(boolean skipZeros) { lock.readLock().lock(); try { - Map<Integer, Long> res = new HashMap<>(cntrMap); + Map<Integer, Long> res; + + if (skipZeros) { + res = U.newHashMap(cntrMap.size()); + + for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) { + Long cntr = e.getValue(); + + if (ZERO.equals(cntr)) + continue; + + res.put(e.getKey(), cntr); + } + } + else + res = new HashMap<>(cntrMap); for (int i = 0; i < locParts.length; i++) { GridDhtLocalPartition part = locParts[i]; @@ -1513,7 +1531,10 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { continue; Long cntr0 = res.get(part.id()); - Long cntr1 = part.updateCounter(); + long cntr1 = part.updateCounter(); + + if (skipZeros && cntr1 == 0L) + continue; if (cntr0 == null || cntr1 > cntr0) res.put(part.id(), cntr1); http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java index 498d492..8f5ad17 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java @@ -158,6 +158,24 @@ public class GridDhtPartitionFullMap extends HashMap<UUID, GridDhtPartitionMap2> } /** + * @param fullMap Map. + * @return {@code True} if this map and given map contain the same data. + */ + public boolean partitionStateEquals(GridDhtPartitionFullMap fullMap) { + if (size() != fullMap.size()) + return false; + + for (Map.Entry<UUID, GridDhtPartitionMap2> e : entrySet()) { + GridDhtPartitionMap2 m = fullMap.get(e.getKey()); + + if (m == null || !m.map().equals(e.getValue().map())) + return false; + } + + return true; + } + + /** * @param updateSeq New update sequence value. * @return Old update sequence value. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java index 15b5a2e..ce36a11 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java @@ -61,27 +61,24 @@ public class GridDhtPartitionMap2 implements Comparable<GridDhtPartitionMap2>, E private volatile int moving; /** - * @param nodeId Node ID. - * @param updateSeq Update sequence number. + * Empty constructor required for {@link Externalizable}. */ - public GridDhtPartitionMap2(UUID nodeId, long updateSeq) { - assert nodeId != null; - assert updateSeq > 0; - - this.nodeId = nodeId; - this.updateSeq = updateSeq; - - map = new HashMap<>(); + public GridDhtPartitionMap2() { + // No-op. } /** * @param nodeId Node ID. * @param updateSeq Update sequence number. + * @param top Topology version. * @param m Map to copy. * @param onlyActive If {@code true}, then only active states will be included. */ - public GridDhtPartitionMap2(UUID nodeId, long updateSeq, AffinityTopologyVersion top, - Map<Integer, GridDhtPartitionState> m, boolean onlyActive) { + public GridDhtPartitionMap2(UUID nodeId, + long updateSeq, + AffinityTopologyVersion top, + Map<Integer, GridDhtPartitionState> m, + boolean onlyActive) { assert nodeId != null; assert updateSeq > 0; @@ -100,10 +97,33 @@ public class GridDhtPartitionMap2 implements Comparable<GridDhtPartitionMap2>, E } /** - * Empty constructor required for {@link Externalizable}. + * @param nodeId Node ID. + * @param updateSeq Update sequence number. + * @param top Topology version. + * @param map Map. + * @param moving Number of moving partitions. */ - public GridDhtPartitionMap2() { - // No-op. + private GridDhtPartitionMap2(UUID nodeId, + long updateSeq, + AffinityTopologyVersion top, + Map<Integer, GridDhtPartitionState> map, + int moving) { + this.nodeId = nodeId; + this.updateSeq = updateSeq; + this.top = top; + this.map = map; + this.moving = moving; + } + + /** + * @return Copy with empty partition state map. + */ + public GridDhtPartitionMap2 emptyCopy() { + return new GridDhtPartitionMap2(nodeId, + updateSeq, + top, + U.<Integer, GridDhtPartitionState>newHashMap(0), + 0); } /** @@ -277,9 +297,8 @@ public class GridDhtPartitionMap2 implements Comparable<GridDhtPartitionMap2>, E long ver = in.readLong(); int minorVer = in.readInt(); - if (ver != 0) { + if (ver != 0) top = new AffinityTopologyVersion(ver, minorVer); - } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java index 4e714ed..6e69161 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.jetbrains.annotations.Nullable; @@ -29,7 +30,13 @@ import org.jetbrains.annotations.Nullable; /** * Request for single partition info. */ -abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage { +public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage { + /** */ + public static final IgniteProductVersion PART_MAP_COMPRESS_SINCE = IgniteProductVersion.fromString("1.6.11"); + + /** */ + protected static final byte COMPRESSED_FLAG_MASK = 1; + /** */ private static final long serialVersionUID = 0L; @@ -39,6 +46,9 @@ abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage { /** Last used cache version. */ private GridCacheVersion lastVer; + /** */ + private byte flags; + /** * Required by {@link Externalizable}. */ @@ -79,6 +89,20 @@ abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage { return lastVer; } + /** + * @return {@code True} if message data is compressed. + */ + protected final boolean compressed() { + return (flags & COMPRESSED_FLAG_MASK) != 0; + } + + /** + * @param compressed {@code True} if message data is compressed. + */ + protected final void compressed(boolean compressed) { + flags = compressed ? (byte)(flags | COMPRESSED_FLAG_MASK) : (byte)(flags & ~COMPRESSED_FLAG_MASK); + } + /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { writer.setBuffer(buf); @@ -101,6 +125,12 @@ abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage { writer.incrementState(); case 4: + if (!writer.writeByte("flags", flags)) + return false; + + writer.incrementState(); + + case 5: if (!writer.writeMessage("lastVer", lastVer)) return false; @@ -131,6 +161,14 @@ abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage { reader.incrementState(); case 4: + flags = reader.readByte("flags"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 5: lastVer = reader.readMessage("lastVer"); if (!reader.isLastRead()) http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 80b3768..f391265 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -54,6 +54,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; @@ -64,6 +65,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.S; @@ -544,7 +546,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT exchId.topologyVersion().equals(cacheCtx.startTopologyVersion()); if (updateTop && clientTop != null) - cacheCtx.topology().update(exchId, clientTop.partitionMap(true), clientTop.updateCounters()); + cacheCtx.topology().update(exchId, clientTop.partitionMap(true), clientTop.updateCounters(false)); } top.updateTopologyVersion(exchId, this, updSeq, stopping(cacheCtx.cacheId())); @@ -668,7 +670,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (top.cacheId() == cacheCtx.cacheId()) { cacheCtx.topology().update(exchId, top.partitionMap(true), - top.updateCounters()); + top.updateCounters(false)); break; } @@ -678,7 +680,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } else { if (!centralizedAff) - sendLocalPartitions(crd, exchId); + sendLocalPartitions(crd); initDone(); @@ -928,27 +930,14 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT /** * @param node Node. - * @param id ID. * @throws IgniteCheckedException If failed. */ - private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) + private void sendLocalPartitions(ClusterNode node) throws IgniteCheckedException { - GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id, + GridDhtPartitionsSingleMessage m = cctx.exchange().createPartitionsSingleMessage(node, + exchangeId(), clientOnlyExchange, - cctx.versions().last()); - - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (!cacheCtx.isLocal()) { - GridDhtPartitionMap2 locMap = cacheCtx.topology().localPartitionMap(); - - if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0) - locMap = new GridDhtPartitionMap(locMap.nodeId(), locMap.updateSequence(), locMap.map()); - - m.addLocalPartitionMap(cacheCtx.cacheId(), locMap); - - m.partitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters()); - } - } + true); if (log.isDebugEnabled()) log.debug("Sending local partitions [nodeId=" + node.id() + ", exchId=" + exchId + ", msg=" + m + ']'); @@ -964,51 +953,16 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT /** * @param nodes Target nodes. - * @return Message; + * @param compress {@code True} if it is possible to use compression for message. + * @return Message. */ - private GridDhtPartitionsFullMessage createPartitionsMessage(Collection<ClusterNode> nodes) { + private GridDhtPartitionsFullMessage createPartitionsMessage(Collection<ClusterNode> nodes, boolean compress) { GridCacheVersion last = lastVer.get(); - GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(exchangeId(), + return cctx.exchange().createPartitionsFullMessage(nodes, + exchangeId(), last != null ? last : cctx.versions().last(), - topologyVersion()); - - boolean useOldApi = false; - - if (nodes != null) { - for (ClusterNode node : nodes) { - if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0) - useOldApi = true; - } - } - - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (!cacheCtx.isLocal()) { - AffinityTopologyVersion startTopVer = cacheCtx.startTopologyVersion(); - - boolean ready = startTopVer == null || startTopVer.compareTo(topologyVersion()) <= 0; - - if (ready) { - GridDhtPartitionFullMap locMap = cacheCtx.topology().partitionMap(true); - - if (useOldApi) - locMap = new GridDhtPartitionFullMap(locMap.nodeId(), locMap.nodeOrder(), locMap.updateSequence(), locMap); - - m.addFullPartitionsMap(cacheCtx.cacheId(), locMap); - - m.addPartitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters()); - } - } - } - - // It is important that client topologies be added after contexts. - for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) { - m.addFullPartitionsMap(top.cacheId(), top.partitionMap(true)); - - m.addPartitionUpdateCounters(top.cacheId(), top.updateCounters()); - } - - return m; + compress); } /** @@ -1016,7 +970,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT * @throws IgniteCheckedException If failed. */ private void sendAllPartitions(Collection<ClusterNode> nodes) throws IgniteCheckedException { - GridDhtPartitionsFullMessage m = createPartitionsMessage(nodes); + GridDhtPartitionsFullMessage m = createPartitionsMessage(nodes, true); + + assert !nodes.contains(cctx.localNode()); if (log.isDebugEnabled()) log.debug("Sending full partition map [nodeIds=" + F.viewReadOnly(nodes, F.node2id()) + @@ -1030,7 +986,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT */ private void sendPartitions(ClusterNode oldestNode) { try { - sendLocalPartitions(oldestNode, exchId); + sendLocalPartitions(oldestNode); } catch (ClusterTopologyCheckedException ignore) { if (log.isDebugEnabled()) @@ -1234,7 +1190,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT Map<Integer, Map<Integer, List<UUID>>> assignmentChange = fut.get(); - GridDhtPartitionsFullMessage m = createPartitionsMessage(null); + GridDhtPartitionsFullMessage m = createPartitionsMessage(null, false); CacheAffinityChangeMessage msg = new CacheAffinityChangeMessage(exchId, m, assignmentChange);
