IGNITE-5398 - Reuse sort container across multiple affinity assignment calculations
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6b2ac9c9 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6b2ac9c9 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6b2ac9c9 Branch: refs/heads/ignite-5398 Commit: 6b2ac9c98fc4774816f895870b28fbeb3898e20c Parents: f248292 Author: Alexey Goncharuk <[email protected]> Authored: Fri Jun 2 20:12:36 2017 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Fri Jun 2 20:12:36 2017 +0300 ---------------------------------------------------------------------- .../cache/affinity/AffinityFunctionContext.java | 14 ++++ .../rendezvous/RendezvousAffinityFunction.java | 72 ++++++++++++++------ .../affinity/AffinityAttachmentHolder.java | 40 +++++++++++ .../affinity/GridAffinityAssignmentCache.java | 26 +++++-- .../GridAffinityFunctionContextImpl.java | 24 ++++++- .../cache/CacheAffinitySharedManager.java | 54 +++++++++------ .../cache/GridCacheAffinityManager.java | 3 +- .../GridCachePartitionExchangeManager.java | 7 ++ .../GridDhtPartitionsExchangeFuture.java | 18 ++++- .../AbstractAffinityFunctionSelfTest.java | 9 +-- .../cache/GridCacheAffinityApiSelfTest.java | 17 ++--- .../GridCachePartitionedAffinitySpreadTest.java | 35 +++++++--- ...dCachePartitionedQueueEntryMoveSelfTest.java | 3 +- .../CacheLateAffinityAssignmentTest.java | 10 ++- ...niteCacheClientNodeChangingTopologyTest.java | 4 +- 15 files changed, 258 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/6b2ac9c9/modules/core/src/main/java/org/apache/ignite/cache/affinity/AffinityFunctionContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/AffinityFunctionContext.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/AffinityFunctionContext.java index fd071cb..d5c021a 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/AffinityFunctionContext.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/AffinityFunctionContext.java @@ -68,4 +68,18 @@ public interface AffinityFunctionContext { * not available. */ @Nullable public DiscoveryEvent discoveryEvent(); + + /** + * Gets an optional context attachment that will be passed across different cache affinity calculations. + * + * @return Optional user attachment. + */ + @Nullable public <T> T attachment(); + + /** + * Sets an optional context attachment that will be passed across different cache affinity calculations. + * + * @param attachment Attachment to set. + */ + public <T> void attachment(T attachment); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/6b2ac9c9/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java index 021f4e2..7a741cf 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java @@ -40,12 +40,10 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.processors.cache.GridCacheUtils; -import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiPredicate; -import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.resources.LoggerResource; import org.jetbrains.annotations.Nullable; @@ -79,7 +77,7 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza public static final int DFLT_PARTITION_COUNT = 1024; /** Comparator. */ - private static final Comparator<IgniteBiTuple<Long, ClusterNode>> COMPARATOR = new HashComparator(); + private static final Comparator<NodeWithHash> COMPARATOR = new HashComparator(); /** Number of partitions. */ private int parts; @@ -336,26 +334,27 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza public List<ClusterNode> assignPartition(int part, List<ClusterNode> nodes, int backups, - @Nullable Map<UUID, Collection<ClusterNode>> neighborhoodCache) { + @Nullable Map<UUID, Collection<ClusterNode>> neighborhoodCache, + NodeWithHash[] sortContainer + ) { if (nodes.size() <= 1) return nodes; - IgniteBiTuple<Long, ClusterNode> [] hashArr = - (IgniteBiTuple<Long, ClusterNode> [])new IgniteBiTuple[nodes.size()]; + assert sortContainer.length == nodes.size() : "Invalid sort container [part=" + part + + "nodes=" + nodes + ", container=" + Arrays.toString(sortContainer) + ']'; for (int i = 0; i < nodes.size(); i++) { ClusterNode node = nodes.get(i); Object nodeHash = resolveNodeHash(node); - long hash = hash(nodeHash.hashCode(), part); - - hashArr[i] = F.t(hash, node); + sortContainer[i].node = node; + sortContainer[i].hash = hash(nodeHash.hashCode(), part); } final int primaryAndBackups = backups == Integer.MAX_VALUE ? nodes.size() : Math.min(backups + 1, nodes.size()); - Iterable<ClusterNode> sortedNodes = new LazyLinearSortedContainer(hashArr, primaryAndBackups); + Iterable<ClusterNode> sortedNodes = new LazyLinearSortedContainer(sortContainer, primaryAndBackups); Iterator<ClusterNode> it = sortedNodes.iterator(); @@ -474,8 +473,10 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza List<ClusterNode> nodes = affCtx.currentTopologySnapshot(); + NodeWithHash[] container = getOrCreateSortContainer(affCtx); + for (int i = 0; i < parts; i++) { - List<ClusterNode> partAssignment = assignPartition(i, nodes, affCtx.backups(), neighborhoodCache); + List<ClusterNode> partAssignment = assignPartition(i, nodes, affCtx.backups(), neighborhoodCache, container); assignments.add(partAssignment); } @@ -488,6 +489,26 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza // No-op. } + /** + * @param ctx Affinity function context. + * @return Array to use for partition assignment. + */ + private NodeWithHash[] getOrCreateSortContainer(AffinityFunctionContext ctx) { + NodeWithHash[] prev = ctx.attachment(); + List<ClusterNode> topSnapshot = ctx.currentTopologySnapshot(); + + if (prev == null || prev.length != topSnapshot.size()) { + prev = new NodeWithHash[topSnapshot.size()]; + + for (int i = 0; i < prev.length; i++) + prev[i] = new NodeWithHash(); + + ctx.attachment(prev); + } + + return prev; + } + /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { out.writeInt(parts); @@ -508,14 +529,14 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza /** * */ - private static class HashComparator implements Comparator<IgniteBiTuple<Long, ClusterNode>>, Serializable { + private static class HashComparator implements Comparator<NodeWithHash>, Serializable { /** */ private static final long serialVersionUID = 0L; /** {@inheritDoc} */ - @Override public int compare(IgniteBiTuple<Long, ClusterNode> o1, IgniteBiTuple<Long, ClusterNode> o2) { - return o1.get1() < o2.get1() ? -1 : o1.get1() > o2.get1() ? 1 : - o1.get2().id().compareTo(o2.get2().id()); + @Override public int compare(NodeWithHash o1, NodeWithHash o2) { + return o1.hash < o2.hash ? -1 : o1.hash > o2.hash ? 1 : + o1.node.id().compareTo(o2.node.id()); } } @@ -524,7 +545,7 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza */ private static class LazyLinearSortedContainer implements Iterable<ClusterNode> { /** Initial node-hash array. */ - private final IgniteBiTuple<Long, ClusterNode>[] arr; + private final NodeWithHash[] arr; /** Count of the sorted elements */ private int sorted; @@ -533,7 +554,7 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza * @param arr Node / partition hash list. * @param needFirstSortedCnt Estimate count of elements to return by iterator. */ - LazyLinearSortedContainer(IgniteBiTuple<Long, ClusterNode>[] arr, int needFirstSortedCnt) { + LazyLinearSortedContainer(NodeWithHash[] arr, int needFirstSortedCnt) { this.arr = arr; if (needFirstSortedCnt > (int)Math.log(arr.length)) { @@ -566,9 +587,9 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza throw new NoSuchElementException(); if (cur < sorted) - return arr[cur++].get2(); + return arr[cur++].node; - IgniteBiTuple<Long, ClusterNode> min = arr[cur]; + NodeWithHash min = arr[cur]; int minIdx = cur; @@ -588,7 +609,7 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza sorted = cur++; - return min.get2(); + return min.node; } /** {@inheritDoc} */ @@ -597,4 +618,15 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza } } } + + /** + * A specific tuple which holds a primitive long to avoid unneccessary boxing. + */ + public static class NodeWithHash { + /** */ + private ClusterNode node; + + /** */ + private long hash; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/6b2ac9c9/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAttachmentHolder.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAttachmentHolder.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAttachmentHolder.java new file mode 100644 index 0000000..0467bc7 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAttachmentHolder.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.affinity; + +/** + * + */ +public class AffinityAttachmentHolder { + /** */ + private Object attachment; + + /** + * @return User attachment. + */ + public Object attachment() { + return attachment; + } + + /** + * @param attachment User attachment. + */ + public void attachment(Object attachment) { + this.attachment = attachment; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/6b2ac9c9/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java index 5070462..3fc59fe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java @@ -257,8 +257,12 @@ public class GridAffinityAssignmentCache { * @return Affinity assignments. */ @SuppressWarnings("IfMayBeConditional") - public List<List<ClusterNode>> calculate(AffinityTopologyVersion topVer, DiscoveryEvent discoEvt, - DiscoCache discoCache) { + public List<List<ClusterNode>> calculate( + AffinityTopologyVersion topVer, + DiscoveryEvent discoEvt, + DiscoCache discoCache, + AffinityAttachmentHolder holder + ) { if (log.isDebugEnabled()) log.debug("Calculating affinity [topVer=" + topVer + ", locNodeId=" + ctx.localNodeId() + ", discoEvt=" + discoEvt + ']'); @@ -284,12 +288,22 @@ public class GridAffinityAssignmentCache { if (!affNode) assignment = prevAssignment; else - assignment = aff.assignPartitions(new GridAffinityFunctionContextImpl(sorted, prevAssignment, - discoEvt, topVer, backups)); + assignment = aff.assignPartitions(new GridAffinityFunctionContextImpl( + sorted, + prevAssignment, + discoEvt, + topVer, + backups, + holder)); } else - assignment = aff.assignPartitions(new GridAffinityFunctionContextImpl(sorted, prevAssignment, discoEvt, - topVer, backups)); + assignment = aff.assignPartitions(new GridAffinityFunctionContextImpl( + sorted, + prevAssignment, + discoEvt, + topVer, + backups, + holder)); assert assignment != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/6b2ac9c9/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityFunctionContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityFunctionContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityFunctionContextImpl.java index e2bb99d..05ee846 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityFunctionContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityFunctionContextImpl.java @@ -43,17 +43,27 @@ public class GridAffinityFunctionContextImpl implements AffinityFunctionContext /** Number of backups to assign. */ private final int backups; + /** */ + private AffinityAttachmentHolder holder; + /** * @param topSnapshot Topology snapshot. * @param topVer Topology version. */ - public GridAffinityFunctionContextImpl(List<ClusterNode> topSnapshot, List<List<ClusterNode>> prevAssignment, - DiscoveryEvent discoEvt, @NotNull AffinityTopologyVersion topVer, int backups) { + public GridAffinityFunctionContextImpl( + List<ClusterNode> topSnapshot, + List<List<ClusterNode>> prevAssignment, + DiscoveryEvent discoEvt, + @NotNull AffinityTopologyVersion topVer, + int backups, + AffinityAttachmentHolder holder + ) { this.topSnapshot = topSnapshot; this.prevAssignment = prevAssignment; this.discoEvt = discoEvt; this.topVer = topVer; this.backups = backups; + this.holder = holder; } /** {@inheritDoc} */ @@ -81,6 +91,16 @@ public class GridAffinityFunctionContextImpl implements AffinityFunctionContext return backups; } + /** {@inheritDoc} */ + @Nullable @Override public <T> T attachment() { + return (T)holder.attachment(); + } + + /** {@inheritDoc} */ + @Override public <T> void attachment(T attachment) { + holder.attachment(attachment); + } + /** * Gets the previous assignment. * http://git-wip-us.apache.org/repos/asf/ignite/blob/6b2ac9c9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 5fdd1bc..8051768 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -343,9 +343,11 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @throws IgniteCheckedException If failed. * @return {@code True} if client-only exchange is needed. */ - public boolean onCacheChangeRequest(final GridDhtPartitionsExchangeFuture fut, + public boolean onCacheChangeRequest( + final GridDhtPartitionsExchangeFuture fut, boolean crd, - Collection<DynamicCacheChangeRequest> reqs) + Collection<DynamicCacheChangeRequest> reqs + ) throws IgniteCheckedException { assert !F.isEmpty(reqs) : fut; @@ -426,8 +428,11 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap assert aff.lastVersion().equals(AffinityTopologyVersion.NONE) : aff.lastVersion(); - List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(), - fut.discoveryEvent(), fut.discoCache()); + List<List<ClusterNode>> assignment = aff.calculate( + fut.topologyVersion(), + fut.discoveryEvent(), + fut.discoCache(), + fut.attachmentHolder()); aff.initialize(fut.topologyVersion(), assignment); @@ -795,7 +800,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap assert old == null : old; - List<List<ClusterNode>> newAff = cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); + List<List<ClusterNode>> newAff = cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), + fut.discoCache(), fut.attachmentHolder()); cache.affinity().initialize(fut.topologyVersion(), newAff); } @@ -833,7 +839,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (cache.affinity().lastVersion().equals(AffinityTopologyVersion.NONE)) { List<List<ClusterNode>> assignment = - cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); + cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache(), + fut.attachmentHolder()); cache.affinity().initialize(fut.topologyVersion(), assignment); } @@ -861,7 +868,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (!fetch && canCalculateAffinity(aff, fut)) { exchLog.info("initAffinity start [topVer=" + fut.topologyVersion() + ", cache=" + aff.cacheName() + ']'); - List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); + List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), + fut.discoCache(), fut.attachmentHolder()); aff.initialize(fut.topologyVersion(), assignment); @@ -933,7 +941,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap exchLog.info("onServerJoin calc aff start [topVer=" + fut.topologyVersion() + ", cache=" + cache.name() + ']'); - List<List<ClusterNode>> newAff = cache.affinity().calculate(topVer, fut.discoveryEvent(), fut.discoCache()); + List<List<ClusterNode>> newAff = cache.affinity().calculate(topVer, fut.discoveryEvent(), + fut.discoCache(), fut.attachmentHolder()); cache.affinity().initialize(topVer, newAff); @@ -1006,7 +1015,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (cctx.localNodeId().equals(cacheDesc.receivedFrom())) { List<List<ClusterNode>> assignment = - cacheCtx.affinity().affinityCache().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); + cacheCtx.affinity().affinityCache().calculate(fut.topologyVersion(), fut.discoveryEvent(), + fut.discoCache(), fut.attachmentHolder()); cacheCtx.affinity().affinityCache().initialize(fut.topologyVersion(), assignment); } @@ -1050,7 +1060,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap GridDhtAffinityAssignmentResponse res = fetchFut.get(); if (res == null) { - List<List<ClusterNode>> aff = affCache.calculate(topVer, fut.discoveryEvent(), fut.discoCache()); + List<List<ClusterNode>> aff = affCache.calculate(topVer, fut.discoveryEvent(), fut.discoCache(), + fut.attachmentHolder()); affCache.initialize(topVer, aff); } @@ -1062,7 +1073,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap else { assert !affCache.centralizedAffinityFunction() || !lateAffAssign; - affCache.calculate(topVer, fut.discoveryEvent(), fut.discoCache()); + affCache.calculate(topVer, fut.discoveryEvent(), fut.discoCache(), fut.attachmentHolder()); } List<List<ClusterNode>> aff = res.affinityAssignment(cctx.discovery()); @@ -1096,7 +1107,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap exchLog.info("onServerLeft calc aff start [topVer=" + fut.topologyVersion() + ", cache=" + cacheCtx.name() + ']'); - cacheCtx.affinity().affinityCache().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); + cacheCtx.affinity().affinityCache().calculate(fut.topologyVersion(), fut.discoveryEvent(), + fut.discoCache(), fut.attachmentHolder()); exchLog.info("onServerLeft calc aff end [topVer=" + fut.topologyVersion() + ", cache=" + cacheCtx.name() + ']'); } @@ -1150,7 +1162,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (cache != null) { if (cache.client()) - cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); + cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache(), + fut.attachmentHolder()); return; } @@ -1202,7 +1215,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap throws IgniteCheckedException { fetchAffinity(prev, aff, (GridDhtAssignmentFetchFuture)fetchFut); - aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); + aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache(), + fut.attachmentHolder()); affFut.onDone(fut.topologyVersion()); } @@ -1328,14 +1342,13 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @param rebalanceInfo Rebalance information. * @param latePrimary If {@code true} delays primary assignment if it is not owner. * @param affCache Already calculated assignments (to reduce data stored in history). - * @throws IgniteCheckedException If failed. */ private void initAffinityOnNodeJoin(GridDhtPartitionsExchangeFuture fut, GridAffinityAssignmentCache aff, WaitRebalanceInfo rebalanceInfo, boolean latePrimary, - Map<Object, List<List<ClusterNode>>> affCache) - throws IgniteCheckedException { + Map<Object, List<List<ClusterNode>>> affCache + ) { exchLog.info("initAffinityOnNodeJoin start [topVer=" + fut.topologyVersion() + ", cache=" + aff.cacheName() + ']'); @@ -1352,7 +1365,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap assert aff.idealAssignment() != null : "Previous assignment is not available."; - List<List<ClusterNode>> idealAssignment = aff.calculate(topVer, fut.discoveryEvent(), fut.discoCache()); + List<List<ClusterNode>> idealAssignment = aff.calculate(topVer, fut.discoveryEvent(), fut.discoCache(), + fut.attachmentHolder()); List<List<ClusterNode>> newAssignment = null; if (latePrimary) { @@ -1360,8 +1374,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap List<ClusterNode> newNodes = idealAssignment.get(p); List<ClusterNode> curNodes = curAff.get(p); - ClusterNode curPrimary = curNodes.size() > 0 ? curNodes.get(0) : null; - ClusterNode newPrimary = newNodes.size() > 0 ? newNodes.get(0) : null; + ClusterNode curPrimary = !curNodes.isEmpty() ? curNodes.get(0) : null; + ClusterNode newPrimary = !newNodes.isEmpty() ? newNodes.get(0) : null; if (curPrimary != null && newPrimary != null && !curPrimary.equals(newPrimary)) { assert cctx.discovery().node(topVer, curPrimary.id()) != null : curPrimary; http://git-wip-us.apache.org/repos/asf/ignite/blob/6b2ac9c9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java index 24321b2..0c2499b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java @@ -26,6 +26,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityAssignment; +import org.apache.ignite.internal.processors.affinity.AffinityAttachmentHolder; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; import org.apache.ignite.internal.util.future.GridFinishedFuture; @@ -76,7 +77,7 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter { @Override protected void onKernalStart0() throws IgniteCheckedException { if (cctx.isLocal()) // No discovery event needed for local affinity. - aff.calculate(LOC_CACHE_TOP_VER, null, null); + aff.calculate(LOC_CACHE_TOP_VER, null, null, new AffinityAttachmentHolder()); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/6b2ac9c9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 0f6a656..fc29a6b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -349,6 +349,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** + * @return {@code true} if current thread is exchange worker. + */ + public boolean inExchangeWorkerThread() { + return Thread.currentThread() == exchWorker.runner(); + } + + /** * @return Initial exchange ID. */ private GridDhtPartitionExchangeId initialExchangeId() { http://git-wip-us.apache.org/repos/asf/ignite/blob/6b2ac9c9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 110716b..f86c914 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -30,7 +30,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReadWriteLock; import org.apache.ignite.IgniteCheckedException; @@ -50,9 +49,9 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.events.DiscoveryCustomEvent; import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; -import org.apache.ignite.internal.managers.discovery.GridDiscoveryTopologySnapshot; import org.apache.ignite.internal.pagemem.snapshot.SnapshotOperation; import org.apache.ignite.internal.pagemem.snapshot.StartSnapshotOperationAckDiscoveryMessage; +import org.apache.ignite.internal.processors.affinity.AffinityAttachmentHolder; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage; @@ -86,7 +85,6 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteRunnable; -import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; @@ -243,6 +241,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT @GridToStringExclude private volatile IgniteDhtPartitionsToReloadMap partsToReload = new IgniteDhtPartitionsToReloadMap(); + /** */ + private AffinityAttachmentHolder affAttachmentHolder; + /** * Dummy future created to trigger reassignments if partition * topology changed while preloading. @@ -407,6 +408,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } /** + * @return Affinity attachment holder to use. + */ + public AffinityAttachmentHolder attachmentHolder() { + return cctx.exchange().inExchangeWorkerThread() ? affAttachmentHolder : new AffinityAttachmentHolder(); + } + + /** * @param cacheId Cache ID to check. * @param topVer Topology version. * @return {@code True} if cache was added during this exchange. @@ -547,6 +555,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT skipPreload = cctx.kernalContext().clientNode(); + affAttachmentHolder = new AffinityAttachmentHolder(); + exchLog.info("Start exchange init [topVer=" + topVer + ", crd=" + crdNode + ", evt=" + discoEvt.type() + @@ -1294,6 +1304,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT initFut.onDone(err == null); + affAttachmentHolder.attachment(null); + if (exchId.isLeft()) { for (GridCacheContext cacheCtx : cctx.cacheContexts()) cacheCtx.config().getAffinity().removeNode(exchId.nodeId()); http://git-wip-us.apache.org/repos/asf/ignite/blob/6b2ac9c9/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java index 8f8d78a..6490b5b 100644 --- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java @@ -28,6 +28,7 @@ import java.util.UUID; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.EventType; +import org.apache.ignite.internal.processors.affinity.AffinityAttachmentHolder; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContextImpl; import org.apache.ignite.testframework.GridTestNode; @@ -104,7 +105,6 @@ public abstract class AbstractAffinityFunctionSelfTest extends GridCommonAbstrac } /** - * @param backups Number of backups. * @throws Exception If failed. */ public void testNullKeyForPartitionCalculation() throws Exception { @@ -155,7 +155,8 @@ public abstract class AbstractAffinityFunctionSelfTest extends GridCommonAbstrac DiscoveryEvent discoEvt = new DiscoveryEvent(node, "", EventType.EVT_NODE_JOINED, node); GridAffinityFunctionContextImpl ctx = - new GridAffinityFunctionContextImpl(nodes, prev, discoEvt, new AffinityTopologyVersion(i), backups); + new GridAffinityFunctionContextImpl(nodes, prev, discoEvt, new AffinityTopologyVersion(i), backups, + new AffinityAttachmentHolder()); List<List<ClusterNode>> assignment = aff.assignPartitions(ctx); @@ -181,7 +182,7 @@ public abstract class AbstractAffinityFunctionSelfTest extends GridCommonAbstrac List<List<ClusterNode>> assignment = aff.assignPartitions( new GridAffinityFunctionContextImpl(nodes, prev, discoEvt, new AffinityTopologyVersion(i), - backups)); + backups, new AffinityAttachmentHolder())); info("Assigned."); @@ -254,7 +255,7 @@ public abstract class AbstractAffinityFunctionSelfTest extends GridCommonAbstrac List<List<ClusterNode>> assignment = aff.assignPartitions( new GridAffinityFunctionContextImpl(nodes, prev, discoEvt, new AffinityTopologyVersion(i), - backups)); + backups, new AffinityAttachmentHolder())); verifyAssignment(assignment, backups, aff.partitions(), nodes.size()); http://git-wip-us.apache.org/repos/asf/ignite/blob/6b2ac9c9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityApiSelfTest.java index 25a0be3..c0d2399 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityApiSelfTest.java @@ -32,6 +32,7 @@ import org.apache.ignite.cache.affinity.AffinityKeyMapper; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.processors.affinity.AffinityAttachmentHolder; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContextImpl; import org.apache.ignite.internal.util.typedef.F; @@ -108,7 +109,7 @@ public class GridCacheAffinityApiSelfTest extends GridCacheAbstractSelfTest { public void testPrimaryPartitionsOneNode() throws Exception { AffinityFunctionContext ctx = new GridAffinityFunctionContextImpl(new ArrayList<>(grid(0).cluster().nodes()), null, null, - new AffinityTopologyVersion(1), 1); + new AffinityTopologyVersion(1), 1, new AffinityAttachmentHolder()); List<List<ClusterNode>> assignment = affinity().assignPartitions(ctx); @@ -151,7 +152,7 @@ public class GridCacheAffinityApiSelfTest extends GridCacheAbstractSelfTest { AffinityFunctionContext ctx = new GridAffinityFunctionContextImpl(new ArrayList<>(grid(0).cluster().nodes()), null, null, - new AffinityTopologyVersion(1), 1); + new AffinityTopologyVersion(1), 1, new AffinityAttachmentHolder()); List<List<ClusterNode>> assignment = affinity().assignPartitions(ctx); @@ -182,7 +183,7 @@ public class GridCacheAffinityApiSelfTest extends GridCacheAbstractSelfTest { AffinityFunctionContext ctx = new GridAffinityFunctionContextImpl(new ArrayList<>(grid(0).cluster().nodes()), null, null, - new AffinityTopologyVersion(1), 1); + new AffinityTopologyVersion(1), 1, new AffinityAttachmentHolder()); List<List<ClusterNode>> assignment = affinity().assignPartitions(ctx); @@ -216,7 +217,7 @@ public class GridCacheAffinityApiSelfTest extends GridCacheAbstractSelfTest { AffinityFunctionContext ctx = new GridAffinityFunctionContextImpl(new ArrayList<>(grid(0).cluster().nodes()), null, null, - new AffinityTopologyVersion(1), 1); + new AffinityTopologyVersion(1), 1, new AffinityAttachmentHolder()); List<List<ClusterNode>> assignment = affinity().assignPartitions(ctx); @@ -239,7 +240,7 @@ public class GridCacheAffinityApiSelfTest extends GridCacheAbstractSelfTest { AffinityFunctionContext ctx = new GridAffinityFunctionContextImpl(new ArrayList<>(grid(0).cluster().nodes()), null, null, - new AffinityTopologyVersion(1), 1); + new AffinityTopologyVersion(1), 1, new AffinityAttachmentHolder()); AffinityFunction aff = affinity(); @@ -258,7 +259,7 @@ public class GridCacheAffinityApiSelfTest extends GridCacheAbstractSelfTest { AffinityFunctionContext ctx = new GridAffinityFunctionContextImpl(new ArrayList<>(grid(0).cluster().nodes()), null, null, - new AffinityTopologyVersion(1), 1); + new AffinityTopologyVersion(1), 1, new AffinityAttachmentHolder()); AffinityFunction aff = affinity(); @@ -278,7 +279,7 @@ public class GridCacheAffinityApiSelfTest extends GridCacheAbstractSelfTest { AffinityFunctionContext ctx = new GridAffinityFunctionContextImpl(new ArrayList<>(grid(0).cluster().nodes()), null, null, - new AffinityTopologyVersion(1), 1); + new AffinityTopologyVersion(1), 1, new AffinityAttachmentHolder()); AffinityFunction aff = affinity(); @@ -303,7 +304,7 @@ public class GridCacheAffinityApiSelfTest extends GridCacheAbstractSelfTest { AffinityFunctionContext ctx = new GridAffinityFunctionContextImpl(new ArrayList<>(grid(0).cluster().nodes()), null, null, - new AffinityTopologyVersion(1), 1); + new AffinityTopologyVersion(1), 1, new AffinityAttachmentHolder()); AffinityFunction aff = affinity(); http://git-wip-us.apache.org/repos/asf/ignite/blob/6b2ac9c9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java index 2784295..1af480a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java @@ -20,10 +20,14 @@ package org.apache.ignite.internal.processors.cache; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.UUID; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.processors.affinity.AffinityAttachmentHolder; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContextImpl; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.testframework.GridTestNode; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; @@ -43,7 +47,7 @@ public class GridCachePartitionedAffinitySpreadTest extends GridCommonAbstractTe for (int i = 5; i < NODES_CNT; i = i * 3 / 2) { for (int replicas = 128; replicas <= 4096; replicas*=2) { - Collection<ClusterNode> nodes = createNodes(i, replicas); + List<ClusterNode> nodes = createNodes(i, replicas); RendezvousAffinityFunction aff = new RendezvousAffinityFunction(false, 10000); @@ -59,8 +63,8 @@ public class GridCachePartitionedAffinitySpreadTest extends GridCommonAbstractTe * @param replicas Value of * @return Collection of test nodes. */ - private Collection<ClusterNode> createNodes(int nodesCnt, int replicas) { - Collection<ClusterNode> nodes = new ArrayList<>(nodesCnt); + private List<ClusterNode> createNodes(int nodesCnt, int replicas) { + List<ClusterNode> nodes = new ArrayList<>(nodesCnt); for (int i = 0; i < nodesCnt; i++) nodes.add(new TestRichNode(replicas)); @@ -72,14 +76,22 @@ public class GridCachePartitionedAffinitySpreadTest extends GridCommonAbstractTe * @param aff Affinity to check. * @param nodes Collection of nodes to test on. */ - private void checkDistribution(RendezvousAffinityFunction aff, Collection<ClusterNode> nodes) { + private void checkDistribution(RendezvousAffinityFunction aff, List<ClusterNode> nodes) { Map<ClusterNode, Integer> parts = new HashMap<>(nodes.size()); + GridAffinityFunctionContextImpl ctx = new GridAffinityFunctionContextImpl( + nodes, + null, + null, + new AffinityTopologyVersion(1, 0), + 0, + new AffinityAttachmentHolder() + ); + + List<List<ClusterNode>> affDist = aff.assignPartitions(ctx); + for (int part = 0; part < aff.getPartitions(); part++) { - Collection<ClusterNode> affNodes = aff.assignPartition(part, - new ArrayList<ClusterNode>(nodes), - 0, - new HashMap<UUID, Collection<ClusterNode>>()); + Collection<ClusterNode> affNodes = affDist.get(part); assertEquals(1, affNodes.size()); @@ -134,7 +146,10 @@ public class GridCachePartitionedAffinitySpreadTest extends GridCommonAbstractTe */ @SuppressWarnings("UnusedDeclaration") private TestRichNode(int replicas) { - this(UUID.randomUUID(), replicas); + super(UUID.randomUUID()); + + nodeId = id(); + this.replicas = replicas; } /** @@ -143,6 +158,8 @@ public class GridCachePartitionedAffinitySpreadTest extends GridCommonAbstractTe * @param nodeId Node id. */ private TestRichNode(UUID nodeId, int replicas) { + super(nodeId); + this.nodeId = nodeId; this.replicas = replicas; } http://git-wip-us.apache.org/repos/asf/ignite/blob/6b2ac9c9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java index db11291..1040a86 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java @@ -34,6 +34,7 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.CollectionConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.processors.affinity.AffinityAttachmentHolder; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContextImpl; import org.apache.ignite.internal.processors.cache.datastructures.IgniteCollectionAbstractTest; @@ -215,7 +216,7 @@ public class GridCachePartitionedQueueEntryMoveSelfTest extends IgniteCollection private Collection<ClusterNode> nodes(AffinityFunction aff, int part, Collection<ClusterNode> nodes) { List<List<ClusterNode>> assignment = aff.assignPartitions( new GridAffinityFunctionContextImpl(new ArrayList<>(nodes), null, null, new AffinityTopologyVersion(1), - BACKUP_CNT)); + BACKUP_CNT, new AffinityAttachmentHolder())); return assignment.get(part); } http://git-wip-us.apache.org/repos/asf/ignite/blob/6b2ac9c9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java index 54328c6..441101a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java @@ -57,6 +57,7 @@ import org.apache.ignite.internal.TestRecordingCommunicationSpi; import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; import org.apache.ignite.internal.managers.communication.GridIoMessage; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.processors.affinity.AffinityAttachmentHolder; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContextImpl; import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage; @@ -245,7 +246,8 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest { null, null, topVer(1, 0), - cctx.config().getBackups()); + cctx.config().getBackups(), + new AffinityAttachmentHolder()); List<List<ClusterNode>> calcAff1_0 = func.assignPartitions(ctx); @@ -256,7 +258,8 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest { calcAff1_0, null, topVer(1, 0), - cctx.config().getBackups()); + cctx.config().getBackups(), + new AffinityAttachmentHolder()); List<List<ClusterNode>> calcAff2_0 = func.assignPartitions(ctx); @@ -2457,7 +2460,8 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest { previousAssignment(topVer, cacheDesc.cacheId()), evt, topVer0, - cacheDesc.cacheConfiguration().getBackups()); + cacheDesc.cacheConfiguration().getBackups(), + new AffinityAttachmentHolder()); List<List<ClusterNode>> assignment = func.assignPartitions(affCtx); http://git-wip-us.apache.org/repos/asf/ignite/blob/6b2ac9c9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java index 9c39ad7..d9ad163 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java @@ -56,6 +56,7 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.processors.affinity.AffinityAttachmentHolder; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContextImpl; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; @@ -721,7 +722,8 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac null, discoEvt, new AffinityTopologyVersion(topVer + 1), - 1); + 1, + new AffinityAttachmentHolder()); AffinityFunction affFunc = ignite.cache(null).getConfiguration(CacheConfiguration.class).getAffinity();
