Repository: ignite Updated Branches: refs/heads/ignite-4154-opt2 6e1028bde -> 160cb202a
ignite-4154 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/160cb202 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/160cb202 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/160cb202 Branch: refs/heads/ignite-4154-opt2 Commit: 160cb202ad2002087629ba3773bed4b8f66d3498 Parents: 6e1028b Author: sboikov <[email protected]> Authored: Wed Nov 16 15:18:03 2016 +0300 Committer: sboikov <[email protected]> Committed: Wed Nov 16 15:18:03 2016 +0300 ---------------------------------------------------------------------- .../configuration/CacheConfiguration.java | 2 +- .../affinity/AffinityCalculateCache.java | 87 +++++++++ .../affinity/AffinityConfiguration.java | 59 ++++++ .../affinity/GridAffinityAssignmentCache.java | 52 ++++-- .../cache/CacheAffinitySharedManager.java | 182 ++++++++++++++++--- .../cache/GridCacheAffinityManager.java | 7 +- 6 files changed, 341 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/160cb202/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java index f65bf52..354ea10 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java @@ -2402,7 +2402,7 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { /** * Filter that accepts all nodes. */ - public static class IgniteAllNodesPredicate implements IgnitePredicate<ClusterNode> { + public static class IgniteAllNodesPredicate implements IgnitePredicate<ClusterNode> { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/ignite/blob/160cb202/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityCalculateCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityCalculateCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityCalculateCache.java new file mode 100644 index 0000000..80ee238 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityCalculateCache.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.affinity; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.ignite.cache.affinity.AffinityFunction; +import org.apache.ignite.cache.affinity.AffinityFunctionContext; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.events.DiscoveryEvent; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public class AffinityCalculateCache { + /** */ + private final Map<Object, List<List<ClusterNode>>> assignCache = new HashMap<>(); + + /** */ + private final AffinityTopologyVersion topVer; + + /** */ + private final DiscoveryEvent discoEvt; + + /** */ + private Map<Integer, List<List<ClusterNode>>> grpAssign; + + public AffinityCalculateCache(AffinityTopologyVersion topVer, DiscoveryEvent discoEvt) { + this.topVer = topVer; + this.discoEvt = discoEvt; + } + + public List<List<ClusterNode>> assignPartitions(AffinityFunction aff, + int backups, + List<ClusterNode> nodes, + List<List<ClusterNode>> prevAssignment, + @Nullable Integer affGrp, + Object affKey) { + if (affGrp != null && grpAssign != null) { + List<List<ClusterNode>> calcAssign = grpAssign.get(affGrp); + + if (calcAssign != null) + return calcAssign; + } + + AffinityFunctionContext ctx = new GridAffinityFunctionContextImpl(nodes, + prevAssignment, + discoEvt, + topVer, + backups); + + List<List<ClusterNode>> assign = aff.assignPartitions(ctx); + + List<List<ClusterNode>> assign0 = assignCache.get(affKey); + + if (assign0 != null && assign0.equals(assign)) + assign = assign0; + else + assignCache.put(affKey, assign); + + if (affGrp != null) { + if (grpAssign == null) + grpAssign = new HashMap<>(); + + grpAssign.put(affGrp, assign); + } + + return assign; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/160cb202/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityConfiguration.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityConfiguration.java new file mode 100644 index 0000000..c9ebbbc --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityConfiguration.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.affinity; + +import org.apache.ignite.cache.affinity.AffinityFunction; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.lang.IgnitePredicate; + +/** + * + */ +public class AffinityConfiguration { + /** */ + private final AffinityFunction aff; + + /** */ + private final IgnitePredicate<ClusterNode> nodeFilter; + + /** */ + private final int backups; + + /** + * @param aff + * @param nodeFilter + * @param backups + */ + public AffinityConfiguration(AffinityFunction aff, IgnitePredicate<ClusterNode> nodeFilter, int backups) { + this.aff = aff; + this.nodeFilter = nodeFilter; + this.backups = backups; + } + + public AffinityFunction affinityFunction() { + return aff; + } + + public IgnitePredicate<ClusterNode> nodeFilter() { + return nodeFilter; + } + + public int backups() { + return backups; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/160cb202/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java index a388c7a..3b62858 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java @@ -34,6 +34,7 @@ import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.affinity.AffinityCentralizedFunction; import org.apache.ignite.cache.affinity.AffinityFunction; +import org.apache.ignite.cache.affinity.AffinityFunctionContext; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.internal.GridKernalContext; @@ -110,35 +111,35 @@ public class GridAffinityAssignmentCache { /** */ private final Object similarAffKey; + /** */ + private final Integer affGrp; + /** * Constructs affinity cached calculations. * * @param ctx Kernal context. * @param cacheName Cache name. - * @param aff Affinity function. - * @param nodeFilter Node filter. - * @param backups Number of backups. + * @param affCfg Affinity configuration. * @param locCache Local cache flag. */ @SuppressWarnings("unchecked") public GridAffinityAssignmentCache(GridKernalContext ctx, String cacheName, - AffinityFunction aff, - IgnitePredicate<ClusterNode> nodeFilter, - int backups, + AffinityConfiguration affCfg, boolean locCache) { assert ctx != null; - assert aff != null; - assert nodeFilter != null; this.ctx = ctx; - this.aff = aff; - this.nodeFilter = nodeFilter; this.cacheName = cacheName; - this.backups = backups; + this.aff = affCfg.affinityFunction(); + this.nodeFilter = affCfg.nodeFilter(); + this.backups = affCfg.backups(); this.locCache = locCache; + assert aff != null; + assert nodeFilter != null; + cacheId = CU.cacheId(cacheName); log = ctx.log(GridAffinityAssignmentCache.class); @@ -148,8 +149,9 @@ public class GridAffinityAssignmentCache { head = new AtomicReference<>(new GridAffinityAssignment(AffinityTopologyVersion.NONE)); similarAffKey = ctx.affinity().similaryAffinityKey(aff, nodeFilter, backups, partsCnt); - assert similarAffKey != null; + + affGrp = locCache ? null : ctx.cache().context().affinity().equalAffinityGroup(cacheId, affCfg); } /** @@ -255,7 +257,9 @@ public class GridAffinityAssignmentCache { * @return Affinity assignments. */ @SuppressWarnings("IfMayBeConditional") - public List<List<ClusterNode>> calculate(AffinityTopologyVersion topVer, DiscoveryEvent discoEvt) { + public List<List<ClusterNode>> calculate(AffinityTopologyVersion topVer, + DiscoveryEvent discoEvt, + AffinityCalculateCache cache) { if (log.isDebugEnabled()) log.debug("Calculating affinity [topVer=" + topVer + ", locNodeId=" + ctx.localNodeId() + ", discoEvt=" + discoEvt + ']'); @@ -273,20 +277,28 @@ public class GridAffinityAssignmentCache { else sorted = Collections.singletonList(ctx.discovery().localNode()); - List<List<ClusterNode>> assignment; + List<List<ClusterNode>> assignment = null; if (prevAssignment != null && discoEvt != null) { boolean affNode = CU.affinityNode(discoEvt.eventNode(), nodeFilter); if (!affNode) assignment = prevAssignment; - else - assignment = aff.assignPartitions(new GridAffinityFunctionContextImpl(sorted, prevAssignment, - discoEvt, topVer, backups)); } - else - assignment = aff.assignPartitions(new GridAffinityFunctionContextImpl(sorted, prevAssignment, discoEvt, - topVer, backups)); + + if (assignment == null) { + if (cache != null) + assignment = cache.assignPartitions(aff, backups, sorted, prevAssignment, affGrp, similarAffKey); + else { + AffinityFunctionContext ctx = new GridAffinityFunctionContextImpl(sorted, + prevAssignment, + discoEvt, + topVer, + backups); + + assignment = aff.assignPartitions(ctx); + } + } assert assignment != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/160cb202/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index b50479d..bae3e9e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -30,12 +30,18 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.affinity.AffinityFunction; +import org.apache.ignite.cache.affinity.AffinityNodeAddressHashResolver; +import org.apache.ignite.cache.affinity.AffinityNodeIdHashResolver; +import org.apache.ignite.cache.affinity.fair.FairAffinityFunction; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; +import org.apache.ignite.internal.processors.affinity.AffinityCalculateCache; +import org.apache.ignite.internal.processors.affinity.AffinityConfiguration; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse; @@ -97,6 +103,109 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap private final ConcurrentMap<T2<Integer, AffinityTopologyVersion>, GridDhtAssignmentFetchFuture> pendingAssignmentFetchFuts = new ConcurrentHashMap8<>(); + /** */ + private final Map<Integer, EqualAffinityCacheGroup> eqAffCacheGroups = new HashMap<>(); + + /** + * + */ + private class EqualAffinityCacheGroup { + /** */ + private final Map<Integer, AffinityConfiguration> caches = new HashMap<>(); + + private EqualAffinityCacheGroup(Integer cacheId, AffinityConfiguration cfg) { + caches.put(cacheId, cfg); + } + + void add(Integer cacheId, AffinityConfiguration cfg) { + caches.put(cacheId, cfg); + } + + /** + * @param cfg Affinity configuration. + * @return {@code True} if cache configurations have exactly the same affinity configuration. + */ + boolean equalAffinity(AffinityConfiguration cfg) { + assert !caches.isEmpty(); + + AffinityConfiguration cfg0 = F.firstValue(caches); + + assert cfg0 != null; + + if (cfg0.backups() != cfg.backups()) + return false; + + if (!cfg0.nodeFilter().equals(cfg.nodeFilter())) + return false; + + if (cfg0.affinityFunction().getClass() != cfg.affinityFunction().getClass()) + return false; + + if (cfg0.affinityFunction() == cfg.affinityFunction()) + return true; + + if (cfg0.affinityFunction() instanceof RendezvousAffinityFunction) { + RendezvousAffinityFunction f1 = (RendezvousAffinityFunction)cfg0.affinityFunction(); + RendezvousAffinityFunction f2 = (RendezvousAffinityFunction)cfg.affinityFunction(); + + if (f1.getHashIdResolver() != f2.getHashIdResolver()) { + if (f1.getHashIdResolver() == null || f2.getHashIdResolver() == null) + return false; + + boolean eqRslvr = (f1.getHashIdResolver().getClass() == f2.getHashIdResolver().getClass()) && + (f1.getHashIdResolver().getClass() == AffinityNodeAddressHashResolver.class || + f1.getHashIdResolver().getClass() == AffinityNodeIdHashResolver.class); + + if (!eqRslvr) + return false; + } + + return f1.partitions() == f2.partitions() && + f1.isExcludeNeighbors() == f2.isExcludeNeighbors() && + f1.getBackupFilter() == f2.getBackupFilter() && + f1.getAffinityBackupFilter() == f2.getAffinityBackupFilter(); + } + else if (cfg0.affinityFunction() instanceof FairAffinityFunction) { + FairAffinityFunction f1 = (FairAffinityFunction)cfg0.affinityFunction(); + FairAffinityFunction f2 = (FairAffinityFunction)cfg.affinityFunction(); + + return f1.partitions() == f2.partitions() && + f1.isExcludeNeighbors() == f2.isExcludeNeighbors() && + f1.getBackupFilter() == f2.getBackupFilter() && + f1.getAffinityBackupFilter() == f2.getAffinityBackupFilter(); + } + else + return false; + } + } + + @Nullable public Integer equalAffinityGroup(Integer cacheId, AffinityConfiguration cfg) { + if (!(cfg.affinityFunction().getClass() == RendezvousAffinityFunction.class || + cfg.affinityFunction().getClass() == FairAffinityFunction.class)) + return null; + + synchronized (eqAffCacheGroups) { + for (Map.Entry<Integer, EqualAffinityCacheGroup> e : eqAffCacheGroups.entrySet()) { + EqualAffinityCacheGroup grp = e.getValue(); + + if (grp.caches.containsKey(cacheId)) + return e.getKey(); + + if (e.getValue().equalAffinity(cfg)) { + e.getValue().add(cacheId, cfg); + + return e.getKey(); + } + } + + Integer grp = eqAffCacheGroups.size(); + + eqAffCacheGroups.put(grp, new EqualAffinityCacheGroup(cacheId, cfg)); + + return grp; + } + } + /** Discovery listener. */ private final GridLocalEventListener discoLsnr = new GridLocalEventListener() { @Override public void onEvent(Event evt) { @@ -372,6 +481,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap Set<Integer> stoppedCaches = null; + AffinityCalculateCache affCache = new AffinityCalculateCache(fut.topologyVersion(), fut.discoveryEvent()); + for (DynamicCacheChangeRequest req : reqs) { if (!(req.clientStartOnly() || req.close())) clientOnly = false; @@ -394,7 +505,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap req.clientStartOnly() && req.initiatingNodeId().equals(cctx.localNodeId()); if (clientCacheStarted) - initAffinity(cacheCtx.affinity().affinityCache(), fut, lateAffAssign); + initAffinity(cacheCtx.affinity().affinityCache(), fut, lateAffAssign, affCache); else if (!req.clientStartOnly()) { assert fut.topologyVersion().equals(cacheCtx.startTopologyVersion()); @@ -403,7 +514,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap assert aff.lastVersion().equals(AffinityTopologyVersion.NONE) : aff.lastVersion(); List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(), - fut.discoveryEvent()); + fut.discoveryEvent(), + affCache); aff.initialize(fut.topologyVersion(), assignment); } @@ -753,7 +865,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap assert old == null : old; - List<List<ClusterNode>> newAff = cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent()); + List<List<ClusterNode>> newAff = cache.affinity().calculate(fut.topologyVersion(), + fut.discoveryEvent(), + null); cache.affinity().initialize(fut.topologyVersion(), newAff); } @@ -785,13 +899,16 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } if (crd && lateAffAssign) { + final AffinityCalculateCache affCache = new AffinityCalculateCache(fut.topologyVersion(), + fut.discoveryEvent()); + forAllRegisteredCaches(new IgniteInClosureX<DynamicCacheDescriptor>() { @Override public void applyx(DynamicCacheDescriptor desc) throws IgniteCheckedException { CacheHolder cache = cache(fut, desc); if (cache.affinity().lastVersion().equals(AffinityTopologyVersion.NONE)) { List<List<ClusterNode>> assignment = - cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent()); + cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), affCache); cache.affinity().initialize(fut.topologyVersion(), assignment); } @@ -799,10 +916,13 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap }); } else { + final AffinityCalculateCache affCache = new AffinityCalculateCache(fut.topologyVersion(), + fut.discoveryEvent()); + forAllCaches(false, new IgniteInClosureX<GridAffinityAssignmentCache>() { @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException { if (aff.lastVersion().equals(AffinityTopologyVersion.NONE)) - initAffinity(aff, fut, false); + initAffinity(aff, fut, false, affCache); } }); } @@ -814,10 +934,13 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @param fetch Force fetch flag. * @throws IgniteCheckedException If failed. */ - private void initAffinity(GridAffinityAssignmentCache aff, GridDhtPartitionsExchangeFuture fut, boolean fetch) + private void initAffinity(GridAffinityAssignmentCache aff, + GridDhtPartitionsExchangeFuture fut, + boolean fetch, + AffinityCalculateCache affCache) throws IgniteCheckedException { if (!fetch && canCalculateAffinity(aff, fut)) { - List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(), fut.discoveryEvent()); + List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), affCache); aff.initialize(fut.topologyVersion(), assignment); } @@ -872,13 +995,18 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (lateAffAssign) { if (locJoin) { if (crd) { + final AffinityCalculateCache affCache = new AffinityCalculateCache(fut.topologyVersion(), + fut.discoveryEvent()); + forAllRegisteredCaches(new IgniteInClosureX<DynamicCacheDescriptor>() { @Override public void applyx(DynamicCacheDescriptor cacheDesc) throws IgniteCheckedException { AffinityTopologyVersion topVer = fut.topologyVersion(); CacheHolder cache = cache(fut, cacheDesc); - List<List<ClusterNode>> newAff = cache.affinity().calculate(topVer, fut.discoveryEvent()); + List<List<ClusterNode>> newAff = cache.affinity().calculate(topVer, + fut.discoveryEvent(), + affCache); cache.affinity().initialize(topVer, newAff); } @@ -939,6 +1067,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap List<GridDhtAssignmentFetchFuture> fetchFuts = new ArrayList<>(); + AffinityCalculateCache affCache = new AffinityCalculateCache(fut.topologyVersion(), fut.discoveryEvent()); + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (cacheCtx.isLocal()) continue; @@ -946,8 +1076,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap DynamicCacheDescriptor cacheDesc = registeredCaches.get(cacheCtx.cacheId()); if (cctx.localNodeId().equals(cacheDesc.receivedFrom())) { - List<List<ClusterNode>> assignment = - cacheCtx.affinity().affinityCache().calculate(fut.topologyVersion(), fut.discoveryEvent()); + List<List<ClusterNode>> assignment = cacheCtx.affinity().affinityCache().calculate(fut.topologyVersion(), + fut.discoveryEvent(), + affCache); cacheCtx.affinity().affinityCache().initialize(fut.topologyVersion(), assignment); } @@ -990,7 +1121,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap GridDhtAffinityAssignmentResponse res = fetchFut.get(); if (res == null) { - List<List<ClusterNode>> aff = affCache.calculate(topVer, fut.discoveryEvent()); + List<List<ClusterNode>> aff = affCache.calculate(topVer, fut.discoveryEvent(), null); affCache.initialize(topVer, aff); } @@ -1002,7 +1133,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap else { assert !affCache.centralizedAffinityFunction() || !lateAffAssign; - affCache.calculate(topVer, fut.discoveryEvent()); + affCache.calculate(topVer, fut.discoveryEvent(), null); } List<List<ClusterNode>> aff = res.affinityAssignment(cctx.discovery()); @@ -1028,11 +1159,14 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap boolean centralizedAff; if (lateAffAssign) { + AffinityCalculateCache affCache = new AffinityCalculateCache(fut.topologyVersion(), + fut.discoveryEvent()); + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (cacheCtx.isLocal()) continue; - cacheCtx.affinity().affinityCache().calculate(fut.topologyVersion(), fut.discoveryEvent()); + cacheCtx.affinity().affinityCache().calculate(fut.topologyVersion(), fut.discoveryEvent(), affCache); } centralizedAff = true; @@ -1061,11 +1195,13 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap long start = System.currentTimeMillis(); + AffinityCalculateCache affCache = new AffinityCalculateCache(fut.topologyVersion(), fut.discoveryEvent()); + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (cacheCtx.isLocal()) continue; - initAffinity(cacheCtx.affinity().affinityCache(), fut, false); + initAffinity(cacheCtx.affinity().affinityCache(), fut, false, affCache); } log.info("Affinity init time [topVer=" + fut.topologyVersion() + ", time=" + (System.currentTimeMillis() - start) + ']'); @@ -1080,13 +1216,15 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap throws IgniteCheckedException { final List<IgniteInternalFuture<AffinityTopologyVersion>> futs = new ArrayList<>(); + final AffinityCalculateCache affCache = new AffinityCalculateCache(fut.topologyVersion(), fut.discoveryEvent()); + forAllRegisteredCaches(new IgniteInClosureX<DynamicCacheDescriptor>() { @Override public void applyx(DynamicCacheDescriptor desc) throws IgniteCheckedException { CacheHolder cache = caches.get(desc.cacheId()); if (cache != null) { if (cache.client()) - cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent()); + cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), affCache); return; } @@ -1137,7 +1275,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap throws IgniteCheckedException { fetchAffinity(prev, aff, (GridDhtAssignmentFetchFuture)fetchFut); - aff.calculate(fut.topologyVersion(), fut.discoveryEvent()); + aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), affCache); affFut.onDone(fut.topologyVersion()); } @@ -1218,7 +1356,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap throws IgniteCheckedException { AffinityTopologyVersion topVer = fut.topologyVersion(); - final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>(); + final AffinityCalculateCache affCache = new AffinityCalculateCache(fut.topologyVersion(), fut.discoveryEvent()); if (!crd) { for (GridCacheContext cacheCtx : cctx.cacheContexts()) { @@ -1261,7 +1399,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap GridAffinityAssignmentCache aff, WaitRebalanceInfo rebalanceInfo, boolean latePrimary, - Map<Object, List<List<ClusterNode>>> affCache) + AffinityCalculateCache affCache) throws IgniteCheckedException { assert lateAffAssign; @@ -1277,7 +1415,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap assert aff.idealAssignment() != null : "Previous assignment is not available."; - List<List<ClusterNode>> idealAssignment = aff.calculate(topVer, fut.discoveryEvent()); + List<List<ClusterNode>> idealAssignment = aff.calculate(topVer, fut.discoveryEvent(), affCache); List<List<ClusterNode>> newAssignment = null; if (latePrimary) { @@ -1308,7 +1446,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (newAssignment == null) newAssignment = idealAssignment; - aff.initialize(fut.topologyVersion(), cachedAssignment(aff, newAssignment, affCache)); + aff.initialize(fut.topologyVersion(), newAssignment); } /** @@ -1732,9 +1870,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap GridAffinityAssignmentCache aff = new GridAffinityAssignmentCache(cctx.kernalContext(), ccfg.getName(), - affFunc, - ccfg.getNodeFilter(), - ccfg.getBackups(), + new AffinityConfiguration(affFunc, ccfg.getNodeFilter(), ccfg.getBackups()), ccfg.getCacheMode() == LOCAL); return new CacheHolder2(ccfg.getRebalanceMode() != NONE, cctx, aff, initAff); http://git-wip-us.apache.org/repos/asf/ignite/blob/160cb202/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java index c6e7ee6..6b47f82 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java @@ -26,6 +26,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityAssignment; +import org.apache.ignite.internal.processors.affinity.AffinityConfiguration; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; import org.apache.ignite.internal.util.GridLeanSet; @@ -67,9 +68,7 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter { aff = new GridAffinityAssignmentCache(cctx.kernalContext(), cctx.namex(), - affFunction, - cctx.config().getNodeFilter(), - cctx.config().getBackups(), + new AffinityConfiguration(affFunction, cctx.config().getNodeFilter(), cctx.config().getBackups()), cctx.isLocal()); } @@ -77,7 +76,7 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter { @Override protected void onKernalStart0() throws IgniteCheckedException { if (cctx.isLocal()) // No discovery event needed for local affinity. - aff.calculate(LOC_CACHE_TOP_VER, null); + aff.calculate(LOC_CACHE_TOP_VER, null, null); } /** {@inheritDoc} */
