IGNITE-2655: AffinityFunction: primary and backup copies in different locations Reviewed and merged by Denis Magda ([email protected])
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f175d3c6 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f175d3c6 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f175d3c6 Branch: refs/heads/master Commit: f175d3c670025bd619ec347dba2a5c5f68f4cc32 Parents: 3ca9feb Author: Vladislav Pyatkov <[email protected]> Authored: Thu Jun 2 16:14:10 2016 +0300 Committer: Denis Magda <[email protected]> Committed: Thu Jun 2 16:14:10 2016 +0300 ---------------------------------------------------------------------- .../affinity/fair/FairAffinityFunction.java | 81 +++++++++++- .../rendezvous/RendezvousAffinityFunction.java | 39 +++++- ...ityFunctionBackupFilterAbstractSelfTest.java | 131 ++++++++++++++++++- ...airAffinityFunctionBackupFilterSelfTest.java | 9 ++ ...ousAffinityFunctionBackupFilterSelfTest.java | 9 ++ 5 files changed, 260 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f175d3c6/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunction.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunction.java index b42b683..b6b14ec 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunction.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunction.java @@ -102,6 +102,9 @@ public class FairAffinityFunction implements AffinityFunction { /** Optional backup filter. First node is primary, second node is a node being tested. */ private IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter; + /** Optional affinity backups filter. The first node is a node being tested, the second is a list of nodes that are already assigned for a given partition (primary node is the first in the list). */ + private IgniteBiPredicate<ClusterNode, List<ClusterNode>> affinityBackupFilter; + /** * Empty constructor with all defaults. */ @@ -220,12 +223,40 @@ public class FairAffinityFunction implements AffinityFunction { * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}. * * @param backupFilter Optional backup filter. + * @deprecated Use {@code affinityBackupFilter} instead. */ + @Deprecated public void setBackupFilter(@Nullable IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) { this.backupFilter = backupFilter; } /** + * Gets optional backup filter. If not {@code null}, backups will be selected + * from all nodes that pass this filter. First node passed to this filter is a node being tested, + * and the second parameter is a list of nodes that are already assigned for a given partition (primary node is the first in the list). + * <p> + * Note that {@code affinityBackupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}. + * + * @return Optional backup filter. + */ + @Nullable public IgniteBiPredicate<ClusterNode, List<ClusterNode>> getAffinityBackupFilter() { + return affinityBackupFilter; + } + + /** + * Sets optional backup filter. If provided, then backups will be selected from all + * nodes that pass this filter. First node being passed to this filter is a node being tested, + * and the second parameter is a list of nodes that are already assigned for a given partition (primary node is the first in the list). + * <p> + * Note that {@code affinityBackupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}. + * + * @param affinityBackupFilter Optional backup filter. + */ + public void setAffinityBackupFilter(@Nullable IgniteBiPredicate<ClusterNode, List<ClusterNode>> affinityBackupFilter) { + this.affinityBackupFilter = affinityBackupFilter; + } + + /** * Checks flag to exclude same-host-neighbors from being backups of each other (default is {@code false}). * <p> * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}. @@ -865,6 +896,7 @@ public class FairAffinityFunction implements AffinityFunction { * @param tier Tier. * @param node Node. * @param allowNeighbors Allow neighbors. + * @return {@code true} if the partition is assignable to the node. */ private boolean isAssignable(int part, int tier, final ClusterNode node, boolean allowNeighbors) { if (containsPartition(part, node)) @@ -872,9 +904,50 @@ public class FairAffinityFunction implements AffinityFunction { if (exclNeighbors) return allowNeighbors || !neighborsContainPartition(node, part); - else if (backupFilter == null) - return true; - else { + else if (affinityBackupFilter != null) { + List<ClusterNode> assigment = assignments.get(part); + + assert assigment.size() > 0; + + List<ClusterNode> newAssignment; + + if (tier == 0) { + for (int t = 1; t < assigment.size(); t++) { + newAssignment = new ArrayList<>(assigment.size() - 1); + + newAssignment.add(node); + + if (t != 1) + newAssignment.addAll(assigment.subList(1, t)); + + if (t + 1 < assigment.size()) + newAssignment.addAll(assigment.subList(t + 1, assigment.size())); + + if (!affinityBackupFilter.apply(assigment.get(t), newAssignment)) + return false; + + } + + return true; + } + else if (tier < assigment.size()) { + newAssignment = new ArrayList<>(assigment.size() - 1); + + int i = 0; + + for (ClusterNode assignmentNode: assigment) { + if (i != tier) + newAssignment.add(assignmentNode); + + i++; + } + } + else + newAssignment = assigment; + + return affinityBackupFilter.apply(node, newAssignment); + } + else if (backupFilter != null) { if (tier == 0) { List<ClusterNode> assigment = assignments.get(part); @@ -891,6 +964,8 @@ public class FairAffinityFunction implements AffinityFunction { else return (backupFilter.apply(assignments.get(part).get(0), node)); } + else + return true; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/f175d3c6/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java index 37258d4..990eba1 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java @@ -111,6 +111,11 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza /** Optional backup filter. First node is primary, second node is a node being tested. */ private IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter; + /** Optional affinity backups filter. The first node is a node being tested, + * the second is a list of nodes that are already assigned for a given partition (the first node in the list + * is primary). */ + private IgniteBiPredicate<ClusterNode, List<ClusterNode>> affinityBackupFilter; + /** Hash ID resolver. */ private AffinityNodeHashResolver hashIdRslvr = null; @@ -277,12 +282,40 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}. * * @param backupFilter Optional backup filter. + * @deprecated Use {@code affinityBackupFilter} instead. */ + @Deprecated public void setBackupFilter(@Nullable IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) { this.backupFilter = backupFilter; } /** + * Gets optional backup filter. If not {@code null}, backups will be selected + * from all nodes that pass this filter. First node passed to this filter is a node being tested, + * and the second parameter is a list of nodes that are already assigned for a given partition (primary node is the first in the list). + * <p> + * Note that {@code affinityBackupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}. + * + * @return Optional backup filter. + */ + @Nullable public IgniteBiPredicate<ClusterNode, List<ClusterNode>> getAffinityBackupFilter() { + return affinityBackupFilter; + } + + /** + * Sets optional backup filter. If provided, then backups will be selected from all + * nodes that pass this filter. First node being passed to this filter is a node being tested, + * and the second parameter is a list of nodes that are already assigned for a given partition (primary node is the first in the list). + * <p> + * Note that {@code affinityBackupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}. + * + * @param affinityBackupFilter Optional backup filter. + */ + public void setAffinityBackupFilter(@Nullable IgniteBiPredicate<ClusterNode, List<ClusterNode>> affinityBackupFilter) { + this.affinityBackupFilter = affinityBackupFilter; + } + + /** * Checks flag to exclude same-host-neighbors from being backups of each other (default is {@code false}). * <p> * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}. @@ -384,7 +417,11 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza if (!allNeighbors.contains(node)) res.add(node); } - else if (backupFilter == null || backupFilter.apply(primary, node)) + else if (affinityBackupFilter != null && affinityBackupFilter.apply(node, res)) + res.add(next.get2()); + else if (backupFilter != null && backupFilter.apply(primary, node)) + res.add(next.get2()); + else if (affinityBackupFilter == null && backupFilter == null) res.add(next.get2()); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/f175d3c6/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionBackupFilterAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionBackupFilterAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionBackupFilterAbstractSelfTest.java index 3bf41c1..f01f5d9 100644 --- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionBackupFilterAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionBackupFilterAbstractSelfTest.java @@ -18,6 +18,10 @@ package org.apache.ignite.cache.affinity; import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cluster.ClusterNode; @@ -29,6 +33,7 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.NotNull; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheMode.PARTITIONED; @@ -41,15 +46,18 @@ public abstract class AffinityFunctionBackupFilterAbstractSelfTest extends GridC /** Ip finder. */ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); - /** Backup count. */ - private static final int BACKUPS = 1; - /** Split attribute name. */ private static final String SPLIT_ATTRIBUTE_NAME = "split-attribute"; /** Split attribute value. */ private String splitAttrVal; + /** Attribute value for first node group. */ + public static final String FIRST_NODE_GROUP = "A"; + + /** Backup count. */ + private int backups = 1; + /** Test backup filter. */ protected static final IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter = new IgniteBiPredicate<ClusterNode, ClusterNode>() { @@ -61,13 +69,63 @@ public abstract class AffinityFunctionBackupFilterAbstractSelfTest extends GridC } }; + /** Test backup filter. */ + protected static final IgniteBiPredicate<ClusterNode, List<ClusterNode>> affinityBackupFilter = + new IgniteBiPredicate<ClusterNode, List<ClusterNode>>() { + @Override public boolean apply(ClusterNode node, List<ClusterNode> assigned) { + assert node != null : "primary is null"; + assert assigned != null : "backup is null"; + + Map<String, Integer> backupAssignedAttribute = getAttributeStatistic(assigned); + + String nodeAttributeValue = node.attribute(SPLIT_ATTRIBUTE_NAME); + + if (FIRST_NODE_GROUP.equals(nodeAttributeValue) + && backupAssignedAttribute.get(FIRST_NODE_GROUP) < 2) + return true; + + return backupAssignedAttribute.get(nodeAttributeValue).equals(0); + } + }; + + /** + * @param nodes List of cluster nodes. + * @return Statistic. + */ + @NotNull private static Map<String, Integer> getAttributeStatistic(Collection<ClusterNode> nodes) { + Map<String, Integer> backupAssignedAttribute = new HashMap<>(); + + backupAssignedAttribute.put(FIRST_NODE_GROUP, 0); + + backupAssignedAttribute.put("B", 0); + + backupAssignedAttribute.put("C", 0); + + for (ClusterNode assignedNode: nodes) { + if (assignedNode == null) + continue; + + String val = assignedNode.attribute(SPLIT_ATTRIBUTE_NAME); + + Integer count = backupAssignedAttribute.get(val); + + backupAssignedAttribute.put(val, count + 1); + } + return backupAssignedAttribute; + } + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { CacheConfiguration cacheCfg = defaultCacheConfiguration(); cacheCfg.setCacheMode(PARTITIONED); - cacheCfg.setBackups(BACKUPS); - cacheCfg.setAffinity(affinityFunction()); + cacheCfg.setBackups(backups); + + if (backups < 2) + cacheCfg.setAffinity(affinityFunction()); + else + cacheCfg.setAffinity(affinityFunctionWithAffinityBackupFilter()); + cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); cacheCfg.setRebalanceMode(SYNC); cacheCfg.setAtomicityMode(TRANSACTIONAL); @@ -90,9 +148,15 @@ public abstract class AffinityFunctionBackupFilterAbstractSelfTest extends GridC protected abstract AffinityFunction affinityFunction(); /** + * @return Affinity function for test. + */ + protected abstract AffinityFunction affinityFunctionWithAffinityBackupFilter(); + + /** * @throws Exception If failed. */ public void testPartitionDistribution() throws Exception { + backups = 1; try { for (int i = 0; i < 3; i++) { splitAttrVal = "A"; @@ -135,4 +199,61 @@ public abstract class AffinityFunctionBackupFilterAbstractSelfTest extends GridC assertFalse(F.eq(primary.attribute(SPLIT_ATTRIBUTE_NAME), backup.attribute(SPLIT_ATTRIBUTE_NAME))); } } + + /** + * @throws Exception If failed. + */ + public void testPartitionDistributionWithAffinityBackupFilter() throws Exception { + backups = 3; + try { + for (int i = 0; i < 2; i++) { + splitAttrVal = FIRST_NODE_GROUP; + + startGrid(4 * i); + + startGrid(4 * i + 3); + + splitAttrVal = "B"; + + startGrid(4 * i + 1); + + splitAttrVal = "C"; + + startGrid(4 * i + 2); + + awaitPartitionMapExchange(); + + checkPartitionsWithAffinityBackupFilter(); + } + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings("ConstantConditions") + private void checkPartitionsWithAffinityBackupFilter() throws Exception { + AffinityFunction aff = cacheConfiguration(grid(0).configuration(), null).getAffinity(); + + int partCnt = aff.partitions(); + + IgniteCache<Object, Object> cache = grid(0).cache(null); + + for (int i = 0; i < partCnt; i++) { + Collection<ClusterNode> nodes = affinity(cache).mapKeyToPrimaryAndBackups(i); + + assertEquals(backups + 1, nodes.size()); + + Map<String, Integer> stat = getAttributeStatistic(nodes); + + assertEquals(stat.get(FIRST_NODE_GROUP), new Integer(2)); + + assertEquals(stat.get("B"), new Integer(1)); + + assertEquals(stat.get("C"), new Integer(1)); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/f175d3c6/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunctionBackupFilterSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunctionBackupFilterSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunctionBackupFilterSelfTest.java index eedc9e4..7fddf30 100644 --- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunctionBackupFilterSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunctionBackupFilterSelfTest.java @@ -32,4 +32,13 @@ public class FairAffinityFunctionBackupFilterSelfTest extends AffinityFunctionBa return aff; } + + /** {@inheritDoc} */ + @Override protected AffinityFunction affinityFunctionWithAffinityBackupFilter() { + FairAffinityFunction aff = new FairAffinityFunction(false); + + aff.setAffinityBackupFilter(affinityBackupFilter); + + return aff; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/f175d3c6/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionBackupFilterSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionBackupFilterSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionBackupFilterSelfTest.java index d5d8b8f..a78c383 100644 --- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionBackupFilterSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionBackupFilterSelfTest.java @@ -32,4 +32,13 @@ public class RendezvousAffinityFunctionBackupFilterSelfTest extends AffinityFunc return aff; } + + /** {@inheritDoc} */ + @Override protected AffinityFunction affinityFunctionWithAffinityBackupFilter() { + RendezvousAffinityFunction aff = new RendezvousAffinityFunction(false); + + aff.setAffinityBackupFilter(affinityBackupFilter); + + return aff; + } }
