ignite-3018 Cache affinity calculation is slow with large nodes number
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/027b2c27 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/027b2c27 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/027b2c27 Branch: refs/heads/ignite-4929 Commit: 027b2c27c8824ef1498883dff3af9d5be37a80b5 Parents: 76485fc Author: tledkov-gridgain <[email protected]> Authored: Thu Apr 13 14:55:39 2017 +0300 Committer: Yakov Zhdanov <[email protected]> Committed: Thu Apr 13 14:55:39 2017 +0300 ---------------------------------------------------------------------- .../rendezvous/RendezvousAffinityFunction.java | 283 +++-- .../GridCachePartitionExchangeManager.java | 20 +- ...inityFunctionFastPowerOfTwoHashSelfTest.java | 17 - ...ndezvousAffinityFunctionSimpleBenchmark.java | 1100 ++++++++++++++++++ ...ousAffinityFunctionStandardHashSelfTest.java | 17 - .../IgniteClientReconnectCacheTest.java | 16 +- .../internal/binary/BinaryEnumsSelfTest.java | 2 + .../GridCachePartitionedAffinitySpreadTest.java | 169 --- ...dCachePartitionedQueueEntryMoveSelfTest.java | 2 +- ...ridCachePartitionNotLoadedEventSelfTest.java | 2 + .../near/GridCacheNearTxForceKeyTest.java | 6 +- ...cheRebalancingPartitionDistributionTest.java | 2 +- ...gniteServiceConfigVariationsFullApiTest.java | 9 +- .../IgniteServiceDynamicCachesSelfTest.java | 12 +- .../ignite/testframework/GridTestNode.java | 12 +- ...PartitionOnAffinityRunAtomicCacheOpTest.java | 46 +- 16 files changed, 1364 insertions(+), 351 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/027b2c27/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java index dcac7d4..9c84f00 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java @@ -22,18 +22,16 @@ import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; import java.io.Serializable; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.Comparator; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; import java.util.UUID; -import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.affinity.AffinityFunction; import org.apache.ignite.cache.affinity.AffinityFunctionContext; @@ -48,7 +46,6 @@ import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteBiTuple; -import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.resources.LoggerResource; import org.jetbrains.annotations.Nullable; @@ -84,20 +81,6 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza /** Comparator. */ private static final Comparator<IgniteBiTuple<Long, ClusterNode>> COMPARATOR = new HashComparator(); - /** Thread local message digest. */ - private ThreadLocal<MessageDigest> digest = new ThreadLocal<MessageDigest>() { - @Override protected MessageDigest initialValue() { - try { - return MessageDigest.getInstance("MD5"); - } - catch (NoSuchAlgorithmException e) { - assert false : "Should have failed in constructor"; - - throw new IgniteException("Failed to obtain message digest (digest was available in constructor)", e); - } - } - }; - /** Number of partitions. */ private int parts; @@ -121,10 +104,6 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza /** Hash ID resolver. */ private AffinityNodeHashResolver hashIdRslvr = null; - /** Ignite instance. */ - @IgniteInstanceResource - private Ignite ignite; - /** Logger instance. */ @LoggerResource private transient IgniteLogger log; @@ -195,13 +174,6 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza setPartitions(parts); this.backupFilter = backupFilter; - - try { - MessageDigest.getInstance("MD5"); - } - catch (NoSuchAlgorithmException e) { - throw new IgniteException("Failed to obtain MD5 message digest instance.", e); - } } /** @@ -382,116 +354,94 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza /** * Returns collection of nodes (primary first) for specified partition. * - * @param d Message digest. * @param part Partition. * @param nodes Nodes. - * @param nodesHash Serialized nodes hashes. * @param backups Number of backups. * @param neighborhoodCache Neighborhood. * @return Assignment. */ - public List<ClusterNode> assignPartition(MessageDigest d, - int part, + public List<ClusterNode> assignPartition(int part, List<ClusterNode> nodes, - Map<ClusterNode, byte[]> nodesHash, int backups, @Nullable Map<UUID, Collection<ClusterNode>> neighborhoodCache) { if (nodes.size() <= 1) return nodes; - if (d == null) - d = digest.get(); - - List<IgniteBiTuple<Long, ClusterNode>> lst = new ArrayList<>(nodes.size()); - - try { - for (int i = 0; i < nodes.size(); i++) { - ClusterNode node = nodes.get(i); + IgniteBiTuple<Long, ClusterNode> [] hashArr = + (IgniteBiTuple<Long, ClusterNode> [])new IgniteBiTuple[nodes.size()]; - byte[] nodeHashBytes = nodesHash.get(node); + for (int i = 0; i < nodes.size(); i++) { + ClusterNode node = nodes.get(i); - if (nodeHashBytes == null) { - Object nodeHash = resolveNodeHash(node); + Object nodeHash = resolveNodeHash(node); - byte[] nodeHashBytes0 = U.marshal(ignite.configuration().getMarshaller(), nodeHash); + long hash = hash(nodeHash.hashCode(), part); - // Add 4 bytes for partition bytes. - nodeHashBytes = new byte[nodeHashBytes0.length + 4]; - - System.arraycopy(nodeHashBytes0, 0, nodeHashBytes, 4, nodeHashBytes0.length); - - nodesHash.put(node, nodeHashBytes); - } - - U.intToBytes(part, nodeHashBytes, 0); - - d.reset(); - - byte[] bytes = d.digest(nodeHashBytes); + hashArr[i] = F.t(hash, node); + } - long hash = - (bytes[0] & 0xFFL) - | ((bytes[1] & 0xFFL) << 8) - | ((bytes[2] & 0xFFL) << 16) - | ((bytes[3] & 0xFFL) << 24) - | ((bytes[4] & 0xFFL) << 32) - | ((bytes[5] & 0xFFL) << 40) - | ((bytes[6] & 0xFFL) << 48) - | ((bytes[7] & 0xFFL) << 56); + final int primaryAndBackups = backups == Integer.MAX_VALUE ? nodes.size() : Math.min(backups + 1, nodes.size()); - lst.add(F.t(hash, node)); - } - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } + Iterable<ClusterNode> sortedNodes = new LazyLinearSortedContainer(hashArr, primaryAndBackups); - Collections.sort(lst, COMPARATOR); + // REPLICATED cache case + if (backups == Integer.MAX_VALUE) + return replicatedAssign(nodes, sortedNodes); - int primaryAndBackups = backups == Integer.MAX_VALUE ? nodes.size() : Math.min(backups + 1, nodes.size()); + Iterator<ClusterNode> it = sortedNodes.iterator(); List<ClusterNode> res = new ArrayList<>(primaryAndBackups); - ClusterNode primary = lst.get(0).get2(); + Collection<ClusterNode> allNeighbors = new HashSet<>(); + + ClusterNode primary = it.next(); res.add(primary); + if (exclNeighbors) + allNeighbors.addAll(neighborhoodCache.get(primary.id())); + // Select backups. if (backups > 0) { - for (int i = 1; i < lst.size() && res.size() < primaryAndBackups; i++) { - IgniteBiTuple<Long, ClusterNode> next = lst.get(i); - - ClusterNode node = next.get2(); + while (it.hasNext() && res.size() < primaryAndBackups) { + ClusterNode node = it.next(); if (exclNeighbors) { - Collection<ClusterNode> allNeighbors = GridCacheUtils.neighborsForNodes(neighborhoodCache, res); - - if (!allNeighbors.contains(node)) + if (!allNeighbors.contains(node)) { res.add(node); + + allNeighbors.addAll(neighborhoodCache.get(node.id())); + } + } + else if ((backupFilter != null && backupFilter.apply(primary, node)) + || (affinityBackupFilter != null && affinityBackupFilter.apply(node, res)) + || (affinityBackupFilter == null && backupFilter == null) ) { + res.add(node); + + if (exclNeighbors) + allNeighbors.addAll(neighborhoodCache.get(node.id())); } - else if (affinityBackupFilter != null && affinityBackupFilter.apply(node, res)) - res.add(next.get2()); - else if (backupFilter != null && backupFilter.apply(primary, node)) - res.add(next.get2()); - else if (affinityBackupFilter == null && backupFilter == null) - res.add(next.get2()); } } if (res.size() < primaryAndBackups && nodes.size() >= primaryAndBackups && exclNeighbors) { // Need to iterate again in case if there are no nodes which pass exclude neighbors backups criteria. - for (int i = 1; i < lst.size() && res.size() < primaryAndBackups; i++) { - IgniteBiTuple<Long, ClusterNode> next = lst.get(i); + it = sortedNodes.iterator(); + + it.next(); - ClusterNode node = next.get2(); + while (it.hasNext() && res.size() < primaryAndBackups) { + ClusterNode node = it.next(); if (!res.contains(node)) - res.add(next.get2()); + res.add(node); } if (!exclNeighborsWarn) { LT.warn(log, "Affinity function excludeNeighbors property is ignored " + - "because topology has no enough nodes to assign backups."); + "because topology has no enough nodes to assign backups.", + "Affinity function excludeNeighbors property is ignored " + + "because topology has no enough nodes to assign backups."); exclNeighborsWarn = true; } @@ -502,6 +452,53 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza return res; } + /** + * Creates assignment for REPLICATED cache + * + * @param nodes Topology. + * @param sortedNodes Sorted for specified partitions nodes. + * @return Assignment. + */ + private List<ClusterNode> replicatedAssign(List<ClusterNode> nodes, Iterable<ClusterNode> sortedNodes) { + ClusterNode primary = sortedNodes.iterator().next(); + + List<ClusterNode> res = new ArrayList<>(nodes.size()); + + res.add(primary); + + for (ClusterNode n : nodes) + if (!n.equals(primary)) + res.add(n); + + assert res.size() == nodes.size() : "Not enough backups: " + res.size(); + + return res; + } + + /** + * The pack partition number and nodeHash.hashCode to long and mix it by hash function based on the Wang/Jenkins + * hash. + * + * @param key0 Hash key. + * @param key1 Hash key. + * @see <a href="https://gist.github.com/badboy/6267743#64-bit-mix-functions">64 bit mix functions</a> + * @return Long hash key. + */ + private static long hash(int key0, int key1) { + long key = (key0 & 0xFFFFFFFFL) + | ((key1 & 0xFFFFFFFFL) << 32); + + key = (~key) + (key << 21); // key = (key << 21) - key - 1; + key ^= (key >>> 24); + key += (key << 3) + (key << 8); // key * 265 + key ^= (key >>> 14); + key += (key << 2) + (key << 4); // key * 21 + key ^= (key >>> 28); + key += (key << 31); + + return key; + } + /** {@inheritDoc} */ @Override public void reset() { // No-op. @@ -534,19 +531,10 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza Map<UUID, Collection<ClusterNode>> neighborhoodCache = exclNeighbors ? GridCacheUtils.neighbors(affCtx.currentTopologySnapshot()) : null; - MessageDigest d = digest.get(); - List<ClusterNode> nodes = affCtx.currentTopologySnapshot(); - Map<ClusterNode, byte[]> nodesHash = U.newHashMap(nodes.size()); - for (int i = 0; i < parts; i++) { - List<ClusterNode> partAssignment = assignPartition(d, - i, - nodes, - nodesHash, - affCtx.backups(), - neighborhoodCache); + List<ClusterNode> partAssignment = assignPartition(i, nodes, affCtx.backups(), neighborhoodCache); assignments.add(partAssignment); } @@ -590,4 +578,83 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza o1.get2().id().compareTo(o2.get2().id()); } } + + /** + * Sorts the initial array with linear sort algorithm array + */ + private static class LazyLinearSortedContainer implements Iterable<ClusterNode> { + /** Initial node-hash array. */ + private final IgniteBiTuple<Long, ClusterNode>[] arr; + + /** Count of the sorted elements */ + private int sorted; + + /** + * @param arr Node / partition hash list. + * @param needFirstSortedCnt Estimate count of elements to return by iterator. + */ + LazyLinearSortedContainer(IgniteBiTuple<Long, ClusterNode>[] arr, int needFirstSortedCnt) { + this.arr = arr; + + if (needFirstSortedCnt > (int)Math.log(arr.length)) { + Arrays.sort(arr, COMPARATOR); + + sorted = arr.length; + } + } + + /** {@inheritDoc} */ + @Override public Iterator<ClusterNode> iterator() { + return new SortIterator(); + } + + /** + * + */ + private class SortIterator implements Iterator<ClusterNode> { + /** Index of the first unsorted element. */ + private int cur; + + /** {@inheritDoc} */ + @Override public boolean hasNext() { + return cur < arr.length; + } + + /** {@inheritDoc} */ + @Override public ClusterNode next() { + if (!hasNext()) + throw new NoSuchElementException(); + + if (cur < sorted) + return arr[cur++].get2(); + + IgniteBiTuple<Long, ClusterNode> min = arr[cur]; + + int minIdx = cur; + + for (int i = cur + 1; i < arr.length; i++) { + if (COMPARATOR.compare(arr[i], min) < 0) { + minIdx = i; + + min = arr[i]; + } + } + + if (minIdx != cur) { + arr[minIdx] = arr[cur]; + + arr[cur] = min; + } + + sorted = cur++; + + return min.get2(); + } + + /** {@inheritDoc} */ + @Override public void remove() { + throw new UnsupportedOperationException("Remove doesn't supported"); + } + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/027b2c27/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 1297c38..9350b2b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -781,17 +781,31 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (log.isDebugEnabled()) log.debug("Refreshing partitions [oldest=" + oldest.id() + ", loc=" + cctx.localNodeId() + ']'); - Collection<ClusterNode> rmts; - // If this is the oldest node. if (oldest.id().equals(cctx.localNodeId())) { + // Check rebalance state & send CacheAffinityChangeMessage if need. + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { + if (!cacheCtx.isLocal()) { + if (cacheCtx == null) + continue; + + GridDhtPartitionTopology top = null; + + if (!cacheCtx.isLocal()) + top = cacheCtx.topology(); + + if (top != null) + cctx.affinity().checkRebalanceState(top, cacheCtx.cacheId()); + } + } + GridDhtPartitionsExchangeFuture lastFut = lastInitializedFut; // No need to send to nodes which did not finish their first exchange. AffinityTopologyVersion rmtTopVer = lastFut != null ? lastFut.topologyVersion() : AffinityTopologyVersion.NONE; - rmts = CU.remoteNodes(cctx, rmtTopVer); + Collection<ClusterNode> rmts = CU.remoteNodes(cctx, rmtTopVer); if (log.isDebugEnabled()) log.debug("Refreshing partitions from oldest node: " + cctx.localNodeId()); http://git-wip-us.apache.org/repos/asf/ignite/blob/027b2c27/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionFastPowerOfTwoHashSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionFastPowerOfTwoHashSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionFastPowerOfTwoHashSelfTest.java index 683ffa2..dfebdbd 100644 --- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionFastPowerOfTwoHashSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionFastPowerOfTwoHashSelfTest.java @@ -17,34 +17,17 @@ package org.apache.ignite.cache.affinity.rendezvous; -import org.apache.ignite.Ignite; import org.apache.ignite.cache.affinity.AbstractAffinityFunctionSelfTest; import org.apache.ignite.cache.affinity.AffinityFunction; -import org.apache.ignite.testframework.GridTestUtils; /** * Tests for {@link RendezvousAffinityFunction}. */ public class RendezvousAffinityFunctionFastPowerOfTwoHashSelfTest extends AbstractAffinityFunctionSelfTest { - /** Ignite. */ - private static Ignite ignite; - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - ignite = startGrid(); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - stopAllGrids(); - } - /** {@inheritDoc} */ @Override protected AffinityFunction affinityFunction() { AffinityFunction aff = new RendezvousAffinityFunction(512, null); - GridTestUtils.setFieldValue(aff, "ignite", ignite); - return aff; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/027b2c27/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionSimpleBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionSimpleBenchmark.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionSimpleBenchmark.java new file mode 100644 index 0000000..3e5bae9 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionSimpleBenchmark.java @@ -0,0 +1,1100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.affinity.rendezvous; + +import java.io.Externalizable; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.io.Serializable; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cache.affinity.AffinityFunction; +import org.apache.ignite.cache.affinity.AffinityFunctionContext; +import org.apache.ignite.cache.affinity.AffinityNodeHashResolver; +import org.apache.ignite.cache.affinity.fair.FairAffinityFunction; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.events.EventType; +import org.apache.ignite.internal.IgniteNodeAttributes; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContextImpl; +import org.apache.ignite.internal.processors.cache.GridCacheUtils; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.ignite.internal.util.typedef.internal.LT; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.resources.LoggerResource; +import org.apache.ignite.testframework.GridTestNode; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import java.io.IOException; +import java.io.PrintStream; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * Simple benchmarks, compatibility test and distribution check utils for affinity functions. + * Needs to check changes at the {@link RendezvousAffinityFunction}. + */ +public class RendezvousAffinityFunctionSimpleBenchmark extends GridCommonAbstractTest { + /** MAC prefix. */ + private static final String MAC_PREF = "MAC"; + + /** Ignite. */ + private static Ignite ignite; + + /** Max experiments. */ + private static final int MAX_EXPERIMENTS = 200; + + /** Max experiments. */ + private TopologyModificationMode mode = TopologyModificationMode.CHANGE_LAST_NODE; + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 3 * 3600 * 1000; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + ignite = startGrid(); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** + * @param nodesCnt Count of nodes to generate. + * @return Nodes list. + */ + private List<ClusterNode> createBaseNodes(int nodesCnt) { + List<ClusterNode> nodes = new ArrayList<>(nodesCnt); + + for (int i = 0; i < nodesCnt; i++) { + GridTestNode node = new GridTestNode(UUID.randomUUID()); + + // two neighbours nodes + node.setAttribute(IgniteNodeAttributes.ATTR_MACS, MAC_PREF + i / 2); + + nodes.add(node); + } + return nodes; + } + + /** + * Modify the topology by remove the last / add new node. + * + * @param nodes Topology. + * @param prevAssignment Previous afinity. + * @param iter Number of iteration. + * @param backups Backups count. + * @return Affinity context. + */ + private GridAffinityFunctionContextImpl nodesModificationChangeLast(List<ClusterNode> nodes, + List<List<ClusterNode>> prevAssignment, int iter, int backups) { + DiscoveryEvent discoEvt; + + discoEvt = iter % 2 == 0 ? addNode(nodes, iter) : removeNode(nodes, nodes.size() - 1); + + return new GridAffinityFunctionContextImpl(nodes, + prevAssignment, discoEvt, new AffinityTopologyVersion(nodes.size()), backups); + } + + /** + * @param nodes Topology. + * @param idx Index of node to remove. + * @return Discovery event. + */ + @NotNull private DiscoveryEvent removeNode(List<ClusterNode> nodes, int idx) { + return new DiscoveryEvent(nodes.get(0), "", EventType.EVT_NODE_LEFT, nodes.remove(idx)); + } + + /** + * Modify the topology by remove the first node / add new node + * + * @param nodes Topology. + * @param prevAssignment Previous affinity. + * @param iter Number of iteration. + * @param backups Backups count. + * @return Affinity context. + */ + private GridAffinityFunctionContextImpl nodesModificationChangeFirst(List<ClusterNode> nodes, + List<List<ClusterNode>> prevAssignment, int iter, int backups) { + DiscoveryEvent discoEvt; + + discoEvt = iter % 2 == 0 ? addNode(nodes, iter) : removeNode(nodes, 0); + + return new GridAffinityFunctionContextImpl(nodes, + prevAssignment, discoEvt, new AffinityTopologyVersion(nodes.size()), backups); + } + + /** + * @param nodes Topology. + * @param iter Iteration count. + * @return Discovery event. + */ + @NotNull private DiscoveryEvent addNode(List<ClusterNode> nodes, int iter) { + GridTestNode node = new GridTestNode(UUID.randomUUID()); + + // two neighbours nodes + node.setAttribute(IgniteNodeAttributes.ATTR_MACS, MAC_PREF + "_add_" + iter / 4); + + nodes.add(node); + + return new DiscoveryEvent(nodes.get(0), "", EventType.EVT_NODE_JOINED, node); + } + + /** + * + * @param aff Affinity function. + * @param nodes Topology. + * @param iter Number of iteration. + * @param prevAssignment Previous affinity assignment. + * @param backups Backups count. + * @return Tuple with affinity and time spend of the affinity calculation. + */ + private IgniteBiTuple<Long, List<List<ClusterNode>>> assignPartitions(AffinityFunction aff, + List<ClusterNode> nodes, List<List<ClusterNode>> prevAssignment, int backups, int iter) { + + GridAffinityFunctionContextImpl ctx = null; + switch (mode) { + case CHANGE_LAST_NODE: + ctx = nodesModificationChangeLast(nodes, prevAssignment, iter, backups); + break; + case CHANGE_FIRST_NODE: + ctx = nodesModificationChangeFirst(nodes, prevAssignment, iter, backups); + break; + + case ADD: + ctx = new GridAffinityFunctionContextImpl(nodes, + prevAssignment, addNode(nodes, iter), new AffinityTopologyVersion(nodes.size()), backups); + break; + + case REMOVE_RANDOM: + ctx = new GridAffinityFunctionContextImpl(nodes, + prevAssignment, removeNode(nodes, nodes.size() - 1), + new AffinityTopologyVersion(nodes.size()), backups); + break; + + case NONE: + ctx = new GridAffinityFunctionContextImpl(nodes, + prevAssignment, + new DiscoveryEvent(nodes.get(0), "", EventType.EVT_NODE_JOINED, nodes.get(nodes.size() - 1)), + new AffinityTopologyVersion(nodes.size()), backups); + break; + + } + + long start = System.currentTimeMillis(); + + List<List<ClusterNode>> assignments = aff.assignPartitions(ctx); + + return F.t(System.currentTimeMillis() - start, assignments); + } + + /** + * @param lst List pf measures. + * @return Average of measures. + */ + private double average(Collection<Long> lst) { + if (lst.isEmpty()) + return 0; + + long sum = 0; + + for (long l : lst) + sum += l; + + return (double)sum / lst.size(); + } + + /** + * @param lst List pf measures. + * @param avg Average of the measures. + * @return Variance of the measures. + */ + private double variance(Collection<Long> lst, double avg) { + if (lst.isEmpty()) + return 0; + + long sum = 0; + + for (long l : lst) + sum += (l - avg) * (l - avg); + + return Math.sqrt((double)sum / lst.size()); + } + + /** + * The table with count of partitions on node: + * + * column 0 - primary partitions counts + * column 1 - backup#0 partitions counts + * etc + * + * Rows correspond to the nodes. + * + * @param lst Affinity result. + * @param nodes Topology. + * @return Frequency distribution: counts of partitions on node. + */ + private static List<List<Integer>> freqDistribution(List<List<ClusterNode>> lst, Collection<ClusterNode> nodes) { + List<Map<ClusterNode, AtomicInteger>> nodeMaps = new ArrayList<>(); + + int backups = lst.get(0).size(); + + for (int i = 0; i < backups; ++i) { + Map<ClusterNode, AtomicInteger> map = new HashMap<>(); + + for (List<ClusterNode> l : lst) { + ClusterNode node = l.get(i); + + if (!map.containsKey(node)) + map.put(node, new AtomicInteger(1)); + else + map.get(node).incrementAndGet(); + } + + nodeMaps.add(map); + } + + List<List<Integer>> byNodes = new ArrayList<>(nodes.size()); + for (ClusterNode node : nodes) { + List<Integer> byBackups = new ArrayList<>(backups); + + for (int j = 0; j < backups; ++j) { + if (nodeMaps.get(j).get(node) == null) + byBackups.add(0); + else + byBackups.add(nodeMaps.get(j).get(node).get()); + } + + byNodes.add(byBackups); + } + return byNodes; + } + + /** + * @param byNodes Frequency distribution. + * @param suffix Label suffix. + * @throws IOException On error. + */ + private void printDistribution(Collection<List<Integer>> byNodes, String suffix) throws IOException { + int nodes = byNodes.size(); + + try (PrintStream ps = new PrintStream(Files.newOutputStream(FileSystems.getDefault() + .getPath(String.format("%03d", nodes) + suffix)))) { + + for (List<Integer> byNode : byNodes) { + for (int w : byNode) + ps.print(String.format("%05d ", w)); + + ps.println(""); + } + } + } + + /** + * Chi-square test of the distribution with uniform distribution. + * + * @param byNodes Distribution. + * @param parts Partitions count. + * @param goldenNodeWeight Weight of according the uniform distribution. + * @return Chi-square test. + */ + private double chiSquare(List<List<Integer>> byNodes, int parts, double goldenNodeWeight) { + double sum = 0; + + for (List<Integer> byNode : byNodes) { + double w = (double)byNode.get(0) / parts; + + sum += (goldenNodeWeight - w) * (goldenNodeWeight - w) / goldenNodeWeight; + } + return sum; + } + + /** + * @throws IOException On error. + */ + public void testDistribution() throws IOException { + AffinityFunction aff0 = new RendezvousAffinityFunction(true, 1024); + + AffinityFunction aff1 = new RendezvousAffinityFunctionOld(true, 1024); + + GridTestUtils.setFieldValue(aff1, "ignite", ignite); + + affinityDistribution(aff0, aff1); + } + + /** + * + * @param aff0 Affinity function to compare. + * @param aff1 Affinity function to compare. + */ + private void affinityDistribution(AffinityFunction aff0, AffinityFunction aff1) { + int[] nodesCnts = {5, 64, 100, 128, 200, 256, 300, 400, 500, 600}; + + for (int nodesCnt : nodesCnts) { + List<ClusterNode> nodes0 = createBaseNodes(nodesCnt); + List<ClusterNode> nodes1 = createBaseNodes(nodesCnt); + + assignPartitions(aff0, nodes0, null, 2, 0).get2(); + List<List<ClusterNode>> lst0 = assignPartitions(aff0, nodes0, null, 2, 1).get2(); + + assignPartitions(aff1, nodes1, null, 2, 0).get2(); + List<List<ClusterNode>> lst1 = assignPartitions(aff1, nodes1, null, 2, 1).get2(); + + List<List<Integer>> dist0 = freqDistribution(lst0, nodes0); + List<List<Integer>> dist1 = freqDistribution(lst1, nodes1); + + info(String.format("Chi^2. Test %d nodes. %s: %f; %s: %f;", + nodesCnt, + aff0.getClass().getSimpleName(), + chiSquare(dist0, aff0.partitions(), 1.0 / nodesCnt), + aff1.getClass().getSimpleName(), + chiSquare(dist1, aff0.partitions(), 1.0 / nodesCnt))); + + try { + printDistribution(dist0, "." + aff0.getClass().getSimpleName()); + printDistribution(dist1, "." + aff1.getClass().getSimpleName()); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + /** + * + */ + public void testAffinityBenchmarkAdd() { + mode = TopologyModificationMode.ADD; + + AffinityFunction aff0 = new RendezvousAffinityFunctionOld(true, 1024); + + GridTestUtils.setFieldValue(aff0, "ignite", ignite); + + affinityBenchmark(aff0, new RendezvousAffinityFunction(true, 1024)); + } + + /** + * + */ + public void testAffinityBenchmarkChangeLast() { + mode = TopologyModificationMode.CHANGE_LAST_NODE; + + AffinityFunction aff0 = new RendezvousAffinityFunctionOld(true, 1024); + + GridTestUtils.setFieldValue(aff0, "ignite", ignite); + + affinityBenchmark(aff0, new RendezvousAffinityFunction(true, 1024)); + } + + /** + * @param aff0 Affinity function. to compare. + * @param aff1 Affinity function. to compare. + */ + private void affinityBenchmark(AffinityFunction aff0, AffinityFunction aff1) { + int[] nodesCnts = {100, 4, 100, 200, 300, 400, 500, 600}; + + final int backups = 2; + + for (int nodesCnt : nodesCnts) { + List<ClusterNode> nodes0 = createBaseNodes(nodesCnt); + List<ClusterNode> nodes1 = createBaseNodes(nodesCnt); + + List<Long> times0 = new ArrayList<>(MAX_EXPERIMENTS); + List<Long> times1 = new ArrayList<>(MAX_EXPERIMENTS); + + List<List<ClusterNode>> prevAssignment = + assignPartitions(aff0, nodes0, null, backups, 0).get2(); + + for (int i = 0; i < MAX_EXPERIMENTS; ++i) { + IgniteBiTuple<Long, List<List<ClusterNode>>> aa + = assignPartitions(aff0, nodes0, prevAssignment, backups, i); + + prevAssignment = aa.get2(); + + times0.add(aa.get1()); + } + + prevAssignment = assignPartitions(aff1, nodes1, null, backups, 0).get2(); + + for (int i = 0; i < MAX_EXPERIMENTS; ++i) { + IgniteBiTuple<Long, List<List<ClusterNode>>> aa + = assignPartitions(aff1, nodes1, prevAssignment, backups, i); + + prevAssignment = aa.get2(); + + times1.add(aa.get1()); + } + + double avr0 = average(times0); + double var0 = variance(times0, avr0); + + double avr1 = average(times1); + double var1 = variance(times1, avr1); + + info(String.format("Test %d nodes. %s: %.1f ms +/- %.3f ms; %s: %.1f ms +/- %.3f ms;", + nodesCnt, + aff0.getClass().getSimpleName(), + avr0, var0, + aff1.getClass().getSimpleName(), + avr1, var1)); + } + } + + /** + * + * @param affOld Old affinity. + * @param affNew New affinity/ + * @return Count of partitions to migrate. + */ + private int countPartitionsToMigrate(List<List<ClusterNode>> affOld, List<List<ClusterNode>> affNew) { + if (affOld == null || affNew == null) + return 0; + + assertEquals(affOld.size(), affNew.size()); + + int diff = 0; + for (int i = 0; i < affOld.size(); ++i) { + Collection<ClusterNode> s0 = new HashSet<>(affOld.get(i)); + Iterable<ClusterNode> s1 = new HashSet<>(affNew.get(i)); + + for (ClusterNode n : s1) { + if (!s0.contains(n)) + ++diff; + } + } + + return diff; + } + + /** + * + */ + public void testPartitionsMigrate() { + int[] nodesCnts = {2, 3, 10, 64, 100, 200, 300, 400, 500, 600}; + + final int backups = 2; + + AffinityFunction aff0 = new RendezvousAffinityFunction(true, 256); + AffinityFunction aff1 = new FairAffinityFunction(true, 256); + + for (int nodesCnt : nodesCnts) { + List<ClusterNode> nodes0 = createBaseNodes(nodesCnt); + List<ClusterNode> nodes1 = createBaseNodes(nodesCnt); + + List<List<ClusterNode>> affPrev = null; + + int diffCnt0 = 0; + + affPrev = assignPartitions(aff0, nodes0, null, backups, 0).get2(); + for (int i = 0; i < MAX_EXPERIMENTS; ++i) { + List<List<ClusterNode>> affCur = assignPartitions(aff0, nodes0, affPrev, backups, i).get2(); + diffCnt0 += countPartitionsToMigrate(affPrev, affCur); + affPrev = affCur; + } + + affPrev = assignPartitions(aff1, nodes1, null, backups, 0).get2(); + int diffCnt1 = 0; + for (int i = 0; i < MAX_EXPERIMENTS; ++i) { + List<List<ClusterNode>> affCur = assignPartitions(aff1, nodes1, affPrev, backups, i).get2(); + diffCnt1 += countPartitionsToMigrate(affPrev, affCur); + affPrev = affCur; + } + + double goldenChangeAffinity = (double)aff1.partitions() / nodesCnt * (backups + 1); + info(String.format("Test %d nodes. Golden: %.1f; %s: %.1f; %s: %.1f;", + nodesCnt, goldenChangeAffinity, + aff0.getClass().getSimpleName(), + (double)diffCnt0 / (MAX_EXPERIMENTS - 1), + aff1.getClass().getSimpleName(), + (double)diffCnt1 / (MAX_EXPERIMENTS - 1))); + } + } + + /** + * + */ + public void _testAffinityCompatibility() { + mode = TopologyModificationMode.ADD; + + AffinityFunction aff0 = new RendezvousAffinityFunction(true, 1024); + + // Use the full copy of the old implementaion of the RendezvousAffinityFunction to check the compatibility. + AffinityFunction aff1 = new RendezvousAffinityFunctionOld(true, 1024); + GridTestUtils.setFieldValue(aff1, "ignite", ignite); + + affinityCompatibility(aff0, aff1); + } + + /** + * @param aff0 Affinity function to compare. + * @param aff1 Affinity function to compare. + */ + private void affinityCompatibility(AffinityFunction aff0, AffinityFunction aff1) { + int[] nodesCnts = {64, 100, 200, 300, 400, 500, 600}; + + final int backups = 2; + + mode = TopologyModificationMode.NONE; + + for (int nodesCnt : nodesCnts) { + List<ClusterNode> nodes = createBaseNodes(nodesCnt); + + List<List<ClusterNode>> assignment0 = assignPartitions(aff0, nodes, null, backups, 0).get2(); + + List<List<ClusterNode>> assignment1 = assignPartitions(aff1, nodes, null, backups, 0).get2(); + + assertEquals (assignment0, assignment1); + } + } + + /** + * + */ + private enum TopologyModificationMode { + /** Change the last node. */ + CHANGE_LAST_NODE, + + /** Change the first node. */ + CHANGE_FIRST_NODE, + + /** Add. */ + ADD, + + /** Remove random. */ + REMOVE_RANDOM, + + /** Do nothing*/ + NONE + } + + /** + * Full copy of the old implementation of the RendezvousAffinityFunction to check compatibility and performance. + */ + private static class RendezvousAffinityFunctionOld implements AffinityFunction, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Default number of partitions. */ + public static final int DFLT_PARTITION_COUNT = 1024; + + /** Comparator. */ + private static final Comparator<IgniteBiTuple<Long, ClusterNode>> COMPARATOR = new HashComparator(); + + /** Thread local message digest. */ + private ThreadLocal<MessageDigest> digest = new ThreadLocal<MessageDigest>() { + @Override protected MessageDigest initialValue() { + try { + return MessageDigest.getInstance("MD5"); + } + catch (NoSuchAlgorithmException e) { + assert false : "Should have failed in constructor"; + + throw new IgniteException("Failed to obtain message digest (digest was available in constructor)", e); + } + } + }; + + /** Number of partitions. */ + private int parts; + + /** Exclude neighbors flag. */ + private boolean exclNeighbors; + + /** Exclude neighbors warning. */ + private transient boolean exclNeighborsWarn; + + /** Optional backup filter. First node is primary, second node is a node being tested. */ + private IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter; + + /** Optional affinity backups filter. The first node is a node being tested, + * the second is a list of nodes that are already assigned for a given partition (the first node in the list + * is primary). */ + private IgniteBiPredicate<ClusterNode, List<ClusterNode>> affinityBackupFilter; + + /** Hash ID resolver. */ + private AffinityNodeHashResolver hashIdRslvr = null; + + /** Ignite instance. */ + @IgniteInstanceResource + private Ignite ignite; + + /** Logger instance. */ + @LoggerResource + private transient IgniteLogger log; + + /** + * Empty constructor with all defaults. + */ + public RendezvousAffinityFunctionOld() { + this(false); + } + + /** + * Initializes affinity with flag to exclude same-host-neighbors from being backups of each other + * and specified number of backups. + * <p> + * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}. + * + * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups + * of each other. + */ + public RendezvousAffinityFunctionOld(boolean exclNeighbors) { + this(exclNeighbors, DFLT_PARTITION_COUNT); + } + + /** + * Initializes affinity with flag to exclude same-host-neighbors from being backups of each other, + * and specified number of backups and partitions. + * <p> + * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}. + * + * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups + * of each other. + * @param parts Total number of partitions. + */ + public RendezvousAffinityFunctionOld(boolean exclNeighbors, int parts) { + this(exclNeighbors, parts, null); + } + + /** + * Initializes optional counts for replicas and backups. + * <p> + * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}. + * + * @param parts Total number of partitions. + * @param backupFilter Optional back up filter for nodes. If provided, backups will be selected + * from all nodes that pass this filter. First argument for this filter is primary node, and second + * argument is node being tested. + * <p> + * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}. + */ + public RendezvousAffinityFunctionOld(int parts, @Nullable IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) { + this(false, parts, backupFilter); + } + + /** + * Private constructor. + * + * @param exclNeighbors Exclude neighbors flag. + * @param parts Partitions count. + * @param backupFilter Backup filter. + */ + private RendezvousAffinityFunctionOld(boolean exclNeighbors, int parts, + IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) { + A.ensure(parts > 0, "parts > 0"); + + this.exclNeighbors = exclNeighbors; + this.parts = parts; + this.backupFilter = backupFilter; + + try { + MessageDigest.getInstance("MD5"); + } + catch (NoSuchAlgorithmException e) { + throw new IgniteException("Failed to obtain MD5 message digest instance.", e); + } + } + + /** + * Gets total number of key partitions. To ensure that all partitions are + * equally distributed across all nodes, please make sure that this + * number is significantly larger than a number of nodes. Also, partition + * size should be relatively small. Try to avoid having partitions with more + * than quarter million keys. + * <p> + * Note that for fully replicated caches this method should always + * return {@code 1}. + * + * @return Total partition count. + */ + public int getPartitions() { + return parts; + } + + /** + * Sets total number of partitions. + * + * @param parts Total number of partitions. + */ + public void setPartitions(int parts) { + A.ensure(parts <= CacheConfiguration.MAX_PARTITIONS_COUNT, "parts <= " + CacheConfiguration.MAX_PARTITIONS_COUNT); + + this.parts = parts; + } + + /** + * Gets hash ID resolver for nodes. This resolver is used to provide + * alternate hash ID, other than node ID. + * <p> + * Node IDs constantly change when nodes get restarted, which causes them to + * be placed on different locations in the hash ring, and hence causing + * repartitioning. Providing an alternate hash ID, which survives node restarts, + * puts node on the same location on the hash ring, hence minimizing required + * repartitioning. + * + * @return Hash ID resolver. + */ + @Deprecated + public AffinityNodeHashResolver getHashIdResolver() { + return hashIdRslvr; + } + + /** + * Sets hash ID resolver for nodes. This resolver is used to provide + * alternate hash ID, other than node ID. + * <p> + * Node IDs constantly change when nodes get restarted, which causes them to + * be placed on different locations in the hash ring, and hence causing + * repartitioning. Providing an alternate hash ID, which survives node restarts, + * puts node on the same location on the hash ring, hence minimizing required + * repartitioning. + * + * @param hashIdRslvr Hash ID resolver. + * + * @deprecated Use {@link IgniteConfiguration#setConsistentId(Serializable)} instead. + */ + @Deprecated + public void setHashIdResolver(AffinityNodeHashResolver hashIdRslvr) { + this.hashIdRslvr = hashIdRslvr; + } + + /** + * Gets optional backup filter. If not {@code null}, backups will be selected + * from all nodes that pass this filter. First node passed to this filter is primary node, + * and second node is a node being tested. + * <p> + * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}. + * + * @return Optional backup filter. + */ + @Nullable public IgniteBiPredicate<ClusterNode, ClusterNode> getBackupFilter() { + return backupFilter; + } + + /** + * Sets optional backup filter. If provided, then backups will be selected from all + * nodes that pass this filter. First node being passed to this filter is primary node, + * and second node is a node being tested. + * <p> + * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}. + * + * @param backupFilter Optional backup filter. + * @deprecated Use {@code affinityBackupFilter} instead. + */ + @Deprecated + public void setBackupFilter(@Nullable IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) { + this.backupFilter = backupFilter; + } + + /** + * Gets optional backup filter. If not {@code null}, backups will be selected + * from all nodes that pass this filter. First node passed to this filter is a node being tested, + * and the second parameter is a list of nodes that are already assigned for a given partition (primary node is + * the first in the list). + * <p> + * Note that {@code affinityBackupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}. + * + * @return Optional backup filter. + */ + @Nullable public IgniteBiPredicate<ClusterNode, List<ClusterNode>> getAffinityBackupFilter() { + return affinityBackupFilter; + } + + /** + * Sets optional backup filter. If provided, then backups will be selected from all + * nodes that pass this filter. First node being passed to this filter is a node being tested, + * and the second parameter is a list of nodes that are already assigned for a given partition (primary node is + * the first in the list). + * <p> + * Note that {@code affinityBackupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}. + * + * @param affinityBackupFilter Optional backup filter. + */ + public void setAffinityBackupFilter(@Nullable IgniteBiPredicate<ClusterNode, + List<ClusterNode>> affinityBackupFilter) { + this.affinityBackupFilter = affinityBackupFilter; + } + + /** + * Checks flag to exclude same-host-neighbors from being backups of each other (default is {@code false}). + * <p> + * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}. + * + * @return {@code True} if nodes residing on the same host may not act as backups of each other. + */ + public boolean isExcludeNeighbors() { + return exclNeighbors; + } + + /** + * Sets flag to exclude same-host-neighbors from being backups of each other (default is {@code false}). + * <p> + * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}. + * + * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups of each other. + */ + public void setExcludeNeighbors(boolean exclNeighbors) { + this.exclNeighbors = exclNeighbors; + } + + /** + * Resolves node hash. + * + * @param node Cluster node; + * @return Node hash. + */ + public Object resolveNodeHash(ClusterNode node) { + if (hashIdRslvr != null) + return hashIdRslvr.resolve(node); + else + return node.consistentId(); + } + + /** + * Returns collection of nodes (primary first) for specified partition. + * + * @param d Message digest. + * @param part Partition. + * @param nodes Nodes. + * @param nodesHash Serialized nodes hashes. + * @param backups Number of backups. + * @param neighborhoodCache Neighborhood. + * @return Assignment. + */ + public List<ClusterNode> assignPartition(MessageDigest d, + int part, + List<ClusterNode> nodes, + Map<ClusterNode, byte[]> nodesHash, + int backups, + @Nullable Map<UUID, Collection<ClusterNode>> neighborhoodCache) { + if (nodes.size() <= 1) + return nodes; + + if (d == null) + d = digest.get(); + + List<IgniteBiTuple<Long, ClusterNode>> lst = new ArrayList<>(nodes.size()); + + try { + for (int i = 0; i < nodes.size(); i++) { + ClusterNode node = nodes.get(i); + + byte[] nodeHashBytes = nodesHash.get(node); + + if (nodeHashBytes == null) { + Object nodeHash = resolveNodeHash(node); + + byte[] nodeHashBytes0 = U.marshal(ignite.configuration().getMarshaller(), nodeHash); + + // Add 4 bytes for partition bytes. + nodeHashBytes = new byte[nodeHashBytes0.length + 4]; + + System.arraycopy(nodeHashBytes0, 0, nodeHashBytes, 4, nodeHashBytes0.length); + + nodesHash.put(node, nodeHashBytes); + } + + U.intToBytes(part, nodeHashBytes, 0); + + d.reset(); + + byte[] bytes = d.digest(nodeHashBytes); + + long hash = + (bytes[0] & 0xFFL) + | ((bytes[1] & 0xFFL) << 8) + | ((bytes[2] & 0xFFL) << 16) + | ((bytes[3] & 0xFFL) << 24) + | ((bytes[4] & 0xFFL) << 32) + | ((bytes[5] & 0xFFL) << 40) + | ((bytes[6] & 0xFFL) << 48) + | ((bytes[7] & 0xFFL) << 56); + + lst.add(F.t(hash, node)); + } + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + + Collections.sort(lst, COMPARATOR); + + int primaryAndBackups = backups == Integer.MAX_VALUE ? nodes.size() : Math.min(backups + 1, nodes.size()); + + List<ClusterNode> res = new ArrayList<>(primaryAndBackups); + + ClusterNode primary = lst.get(0).get2(); + + res.add(primary); + + // Select backups. + if (backups > 0) { + for (int i = 1; i < lst.size() && res.size() < primaryAndBackups; i++) { + IgniteBiTuple<Long, ClusterNode> next = lst.get(i); + + ClusterNode node = next.get2(); + + if (exclNeighbors) { + Collection<ClusterNode> allNeighbors = GridCacheUtils.neighborsForNodes(neighborhoodCache, res); + + if (!allNeighbors.contains(node)) + res.add(node); + } + else if (affinityBackupFilter != null && affinityBackupFilter.apply(node, res)) + res.add(next.get2()); + else if (backupFilter != null && backupFilter.apply(primary, node)) + res.add(next.get2()); + else if (affinityBackupFilter == null && backupFilter == null) + res.add(next.get2()); + } + } + + if (res.size() < primaryAndBackups && nodes.size() >= primaryAndBackups && exclNeighbors) { + // Need to iterate again in case if there are no nodes which pass exclude neighbors backups criteria. + for (int i = 1; i < lst.size() && res.size() < primaryAndBackups; i++) { + IgniteBiTuple<Long, ClusterNode> next = lst.get(i); + + ClusterNode node = next.get2(); + + if (!res.contains(node)) + res.add(next.get2()); + } + + if (!exclNeighborsWarn) { + LT.warn(log, "Affinity function excludeNeighbors property is ignored " + + "because topology has no enough nodes to assign backups."); + + exclNeighborsWarn = true; + } + } + + assert res.size() <= primaryAndBackups; + + return res; + } + + /** {@inheritDoc} */ + @Override public void reset() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public int partitions() { + return parts; + } + + /** {@inheritDoc} */ + @Override public int partition(Object key) { + if (key == null) + throw new IllegalArgumentException("Null key is passed for a partition calculation. " + + "Make sure that an affinity key that is used is initialized properly."); + + return U.safeAbs(key.hashCode() % parts); + } + + /** {@inheritDoc} */ + @Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext affCtx) { + List<List<ClusterNode>> assignments = new ArrayList<>(parts); + + Map<UUID, Collection<ClusterNode>> neighborhoodCache = exclNeighbors ? + GridCacheUtils.neighbors(affCtx.currentTopologySnapshot()) : null; + + MessageDigest d = digest.get(); + + List<ClusterNode> nodes = affCtx.currentTopologySnapshot(); + + Map<ClusterNode, byte[]> nodesHash = U.newHashMap(nodes.size()); + + for (int i = 0; i < parts; i++) { + List<ClusterNode> partAssignment = assignPartition(d, + i, + nodes, + nodesHash, + affCtx.backups(), + neighborhoodCache); + + assignments.add(partAssignment); + } + + return assignments; + } + + /** {@inheritDoc} */ + @Override public void removeNode(UUID nodeId) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeInt(parts); + out.writeBoolean(exclNeighbors); + out.writeObject(hashIdRslvr); + out.writeObject(backupFilter); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + parts = in.readInt(); + exclNeighbors = in.readBoolean(); + hashIdRslvr = (AffinityNodeHashResolver)in.readObject(); + backupFilter = (IgniteBiPredicate<ClusterNode, ClusterNode>)in.readObject(); + } + + /** + * + */ + private static class HashComparator implements Comparator<IgniteBiTuple<Long, ClusterNode>>, Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public int compare(IgniteBiTuple<Long, ClusterNode> o1, IgniteBiTuple<Long, ClusterNode> o2) { + return o1.get1() < o2.get1() ? -1 : o1.get1() > o2.get1() ? 1 : + o1.get2().id().compareTo(o2.get2().id()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/027b2c27/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionStandardHashSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionStandardHashSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionStandardHashSelfTest.java index ed47c57..cffa277 100644 --- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionStandardHashSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionStandardHashSelfTest.java @@ -17,34 +17,17 @@ package org.apache.ignite.cache.affinity.rendezvous; -import org.apache.ignite.Ignite; import org.apache.ignite.cache.affinity.AbstractAffinityFunctionSelfTest; import org.apache.ignite.cache.affinity.AffinityFunction; -import org.apache.ignite.testframework.GridTestUtils; /** * Tests for {@link RendezvousAffinityFunction}. */ public class RendezvousAffinityFunctionStandardHashSelfTest extends AbstractAffinityFunctionSelfTest { - /** Ignite. */ - private static Ignite ignite; - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - ignite = startGrid(); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - stopAllGrids(); - } - /** {@inheritDoc} */ @Override protected AffinityFunction affinityFunction() { AffinityFunction aff = new RendezvousAffinityFunction(513, null); - GridTestUtils.setFieldValue(aff, "ignite", ignite); - return aff; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/027b2c27/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java index 01aa256..dff827d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java @@ -101,6 +101,9 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac private static final String STATIC_CACHE = "static-cache"; /** */ + private static final int CACHE_PUTS_CNT = 3; + + /** */ private UUID nodeId; /** {@inheritDoc} */ @@ -580,17 +583,8 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac new CI1<IgniteCache<Object, Object>>() { @Override public void apply(IgniteCache<Object, Object> cache) { try (Transaction tx = client.transactions().txStart(txConcurrency, REPEATABLE_READ)) { - log.info("Put1: " + key); - - cache.put(key, key); - - Integer key2 = key + 1; - - log.info("Put2: " + key2); - - cache.put(key2, key2); - - log.info("Commit [key1=" + key + ", key2=" + key2 + ']'); + for (int i = 0; i < CACHE_PUTS_CNT; ++i) + cache.put(key + i, key + i); tx.commit(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/027b2c27/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryEnumsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryEnumsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryEnumsSelfTest.java index ed473d8..6cac96c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryEnumsSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryEnumsSelfTest.java @@ -114,6 +114,8 @@ public class BinaryEnumsSelfTest extends GridCommonAbstractTest { node2 = startGrid(1); cache2 = node2.cache(CACHE_NAME); cacheBinary2 = cache2.withKeepBinary(); + + awaitPartitionMapExchange(); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/027b2c27/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java deleted file mode 100644 index 2d46cf4..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; -import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.testframework.GridTestNode; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; - -/** - * - */ -public class GridCachePartitionedAffinitySpreadTest extends GridCommonAbstractTest { - /** */ - public static final int NODES_CNT = 50; - - /** - * @throws Exception If failed. - */ - public void testPartitionSpreading() throws Exception { - System.out.printf("%6s, %6s, %6s, %6s, %8s\n", "Nodes", "Reps", "Min", "Max", "Dev"); - - for (int i = 5; i < NODES_CNT; i = i * 3 / 2) { - for (int replicas = 128; replicas <= 4096; replicas*=2) { - Collection<ClusterNode> nodes = createNodes(i, replicas); - - RendezvousAffinityFunction aff = new RendezvousAffinityFunction(false, 10000); - - checkDistribution(aff, nodes); - } - - System.out.println(); - } - } - - /** - * @param nodesCnt Nodes count. - * @param replicas Value of - * @return Collection of test nodes. - */ - private Collection<ClusterNode> createNodes(int nodesCnt, int replicas) { - Collection<ClusterNode> nodes = new ArrayList<>(nodesCnt); - - for (int i = 0; i < nodesCnt; i++) - nodes.add(new TestRichNode(replicas)); - - return nodes; - } - - /** - * @param aff Affinity to check. - * @param nodes Collection of nodes to test on. - */ - private void checkDistribution(RendezvousAffinityFunction aff, Collection<ClusterNode> nodes) { - Map<ClusterNode, Integer> parts = new HashMap<>(nodes.size()); - - for (int part = 0; part < aff.getPartitions(); part++) { - Collection<ClusterNode> affNodes = aff.assignPartition(null, - part, - new ArrayList<>(nodes), - new HashMap<ClusterNode, byte[]>(), - 0, - null); - - assertEquals(1, affNodes.size()); - - ClusterNode node = F.first(affNodes); - - parts.put(node, parts.get(node) != null ? parts.get(node) + 1 : 1); - } - - int min = Integer.MAX_VALUE; - int max = Integer.MIN_VALUE; - int total = 0; - - float mean = 0; - float m2 = 0; - int n = 0; - - for (ClusterNode node : nodes) { - int partsCnt = parts.get(node) != null ? parts.get(node) : 0; - - total += partsCnt; - - if (partsCnt < min) - min = partsCnt; - - if (partsCnt > max) - max = partsCnt; - - n++; - float delta = partsCnt - mean; - mean += delta / n; - m2 += delta * (partsCnt - mean); - } - - m2 /= (n - 1); - assertEquals(aff.getPartitions(), total); - - System.out.printf("%6s, %6s, %6s, %8.4f\n", nodes.size(),min, max, Math.sqrt(m2)); - } - - /** - * Rich node stub to use in emulated server topology. - */ - private static class TestRichNode extends GridTestNode { - /** */ - private final UUID nodeId; - - /** */ - private final int replicas; - - /** - * Externalizable class requires public no-arg constructor. - */ - @SuppressWarnings("UnusedDeclaration") - private TestRichNode(int replicas) { - this(UUID.randomUUID(), replicas); - } - - /** - * Constructs rich node stub to use in emulated server topology. - * - * @param nodeId Node id. - */ - private TestRichNode(UUID nodeId, int replicas) { - this.nodeId = nodeId; - this.replicas = replicas; - } - - /** - * Unused constructor for externalizable support. - */ - public TestRichNode() { - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ - @Override public UUID id() { - return nodeId; - } - - /** {@inheritDoc} */ - @Override public <T> T attribute(String name) { - return super.attribute(name); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/027b2c27/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java index db11291..c17e9f7 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java @@ -90,7 +90,7 @@ public class GridCachePartitionedQueueEntryMoveSelfTest extends IgniteCollection * @throws Exception If failed. */ public void testQueue() throws Exception { - final String queueName = "q"; + final String queueName = "qq"; System.out.println(U.filler(20, '\n')); http://git-wip-us.apache.org/repos/asf/ignite/blob/027b2c27/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java index 9e79a27..c8568d2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java @@ -102,6 +102,8 @@ public class GridCachePartitionNotLoadedEventSelfTest extends GridCommonAbstract startGrid(1); startGrid(2); + awaitPartitionMapExchange(); + final PartitionNotFullyLoadedListener lsnr = new PartitionNotFullyLoadedListener(); ignite(2).events().localListen(lsnr, EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST); http://git-wip-us.apache.org/repos/asf/ignite/blob/027b2c27/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearTxForceKeyTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearTxForceKeyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearTxForceKeyTest.java index f1c791e..47d54d5 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearTxForceKeyTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearTxForceKeyTest.java @@ -71,13 +71,13 @@ public class GridCacheNearTxForceKeyTest extends GridCommonAbstractTest { Ignite ignite1 = startGrid(1); + awaitPartitionMapExchange(); + // This key should become primary for ignite1. - final Integer key = ignite0.configuration().getMarshaller() instanceof OptimizedMarshaller ? 2 : 7; + final Integer key = primaryKey(ignite1.cache(null)); assertNull(cache.getAndPut(key, key)); - awaitPartitionMapExchange(); - assertTrue(ignite0.affinity(null).isPrimary(ignite1.cluster().localNode(), key)); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/027b2c27/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingPartitionDistributionTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingPartitionDistributionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingPartitionDistributionTest.java index 61ee9ea..eebafed 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingPartitionDistributionTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingPartitionDistributionTest.java @@ -52,7 +52,7 @@ public class GridCacheRebalancingPartitionDistributionTest extends GridRollingRe .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) .setCacheMode(CacheMode.PARTITIONED) .setBackups(1) - .setAffinity(new RendezvousAffinityFunction(true /* machine-safe */, 271)) + .setAffinity(new RendezvousAffinityFunction(true /* machine-safe */, 1024)) .setAtomicWriteOrderMode(CacheAtomicWriteOrderMode.CLOCK) .setRebalanceMode(CacheRebalanceMode.SYNC) .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); http://git-wip-us.apache.org/repos/asf/ignite/blob/027b2c27/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceConfigVariationsFullApiTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceConfigVariationsFullApiTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceConfigVariationsFullApiTest.java index 0e33650..c0f836b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceConfigVariationsFullApiTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceConfigVariationsFullApiTest.java @@ -25,6 +25,8 @@ import java.io.Serializable; import java.util.concurrent.ThreadLocalRandom; import javax.cache.configuration.Factory; import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteServices; import org.apache.ignite.binary.BinaryObjectException; import org.apache.ignite.binary.BinaryReader; @@ -109,7 +111,12 @@ public class IgniteServiceConfigVariationsFullApiTest extends IgniteConfigVariat @Override public void run(IgniteServices services, String svcName, TestService svc) { IgniteCache<Object, Object> cache = grid(testedNodeIdx).getOrCreateCache(CACHE_NAME); - services.deployKeyAffinitySingleton(svcName, (Service)svc, cache.getName(), "1"); + try { + services.deployKeyAffinitySingleton(svcName, (Service)svc, cache.getName(), primaryKey(cache)); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } } })); } http://git-wip-us.apache.org/repos/asf/ignite/blob/027b2c27/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceDynamicCachesSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceDynamicCachesSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceDynamicCachesSelfTest.java index 026e6a6..fb6c0f2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceDynamicCachesSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceDynamicCachesSelfTest.java @@ -83,7 +83,7 @@ public class IgniteServiceDynamicCachesSelfTest extends GridCommonAbstractTest { final String svcName = "myService"; - svcs.deployKeyAffinitySingleton(svcName, new TestService(), cacheName, "key"); + svcs.deployKeyAffinitySingleton(svcName, new TestService(), cacheName, primaryKey(ig.cache(cacheName))); boolean res = GridTestUtils.waitForCondition(new PA() { @Override public boolean apply() { @@ -125,7 +125,15 @@ public class IgniteServiceDynamicCachesSelfTest extends GridCommonAbstractTest { final String svcName = "myService"; - svcs.deployKeyAffinitySingleton(svcName, new TestService(), cacheName, "key"); + ig.createCache(ccfg); + + Object key = primaryKey(ig.cache(cacheName)); + + ig.destroyCache(cacheName); + + awaitPartitionMapExchange(); + + svcs.deployKeyAffinitySingleton(svcName, new TestService(), cacheName, key); assert svcs.service(svcName) == null; http://git-wip-us.apache.org/repos/asf/ignite/blob/027b2c27/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java index 09d4765..d331387 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java @@ -53,8 +53,8 @@ public class GridTestNode extends GridMetadataAwareAdapter implements ClusterNod /** */ private UUID id; - /** */ - private Object consistentId = consistentIdCtr.incrementAndGet(); + /** String objects as a consistent Id is closer to real case than Integer */ + private Object consistentId = "Node_" + consistentIdCtr.incrementAndGet(); /** */ private ClusterMetrics metrics; @@ -247,13 +247,5 @@ public class GridTestNode extends GridMetadataAwareAdapter implements ClusterNod /** {@inheritDoc} */ @Override public String toString() { return id.toString(); -// StringBuilder buf = new StringBuilder(); -// -// buf.append(getClass().getSimpleName()); -// buf.append(" [attrs=").append(attrs); -// buf.append(", id=").append(id); -// buf.append(']'); -// -// return buf.toString(); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/027b2c27/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunAtomicCacheOpTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunAtomicCacheOpTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunAtomicCacheOpTest.java index 7f59a4b..71e737f 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunAtomicCacheOpTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunAtomicCacheOpTest.java @@ -21,9 +21,11 @@ import java.util.Arrays; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.cluster.ClusterTopologyException; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; @@ -157,10 +159,15 @@ public class IgniteCacheLockPartitionOnAffinityRunAtomicCacheOpTest extends Igni affFut = GridTestUtils.runMultiThreadedAsync(new Runnable() { @Override public void run() { for (int i = 0; i < PARTS_CNT; ++i) { - grid(0).compute().affinityRun( - Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()), - new Integer(i), - new NotReservedCacheOpAffinityRun(i, key.getAndIncrement() * KEYS_CNT, cacheName)); + try { + grid(0).compute().affinityRun( + Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()), + new Integer(i), + new NotReservedCacheOpAffinityRun(i, key.getAndIncrement() * KEYS_CNT, cacheName)); + } + catch (IgniteException e) { + checkException(e, ClusterTopologyException.class); + } } } }, AFFINITY_THREADS_CNT, "affinity-run"); @@ -204,10 +211,15 @@ public class IgniteCacheLockPartitionOnAffinityRunAtomicCacheOpTest extends Igni if (System.currentTimeMillis() >= endTime) break; - grid(0).compute().affinityRun( - Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()), - new Integer(i), - new ReservedPartitionCacheOpAffinityRun(i, key.getAndIncrement() * KEYS_CNT)); + try { + grid(0).compute().affinityRun( + Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()), + new Integer(i), + new ReservedPartitionCacheOpAffinityRun(i, key.getAndIncrement() * KEYS_CNT)); + } + catch (IgniteException e) { + checkException(e, ClusterTopologyException.class); + } } } }, AFFINITY_THREADS_CNT, "affinity-run"); @@ -229,6 +241,24 @@ public class IgniteCacheLockPartitionOnAffinityRunAtomicCacheOpTest extends Igni } } + + /** + * + * @param e Exception to check. + * @param exCls Expected exception cause class. + */ + private void checkException(IgniteException e, Class<? extends Exception> exCls) { + for (Throwable t = e; t.getCause() != null; t = t.getCause()) { + if (t.getCause().getClass().isAssignableFrom(exCls)) { + log.info("Expected exception: " + e); + + return; + } + } + + throw e; + } + /** */ private static class NotReservedCacheOpAffinityRun implements IgniteRunnable { /** Org id. */
