This is an automated email from the ASF dual-hosted git repository.
sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 5c140fbcd37 IGNITE-26580 Implement an affinity backup filter assigning
partition copies evenly between data centers (#12459)
5c140fbcd37 is described below
commit 5c140fbcd375079ed0e1bbe9b0a3c4159e86d715
Author: Sergey Chugunov <[email protected]>
AuthorDate: Tue Nov 11 11:36:37 2025 +0400
IGNITE-26580 Implement an affinity backup filter assigning partition copies
evenly between data centers (#12459)
---
.../rendezvous/MdcAffinityBackupFilter.java | 128 ++++++
.../processors/cache/ClusterCachesInfo.java | 2 +
.../MdcAffinityBackupFilterSelfTest.java | 460 +++++++++++++++++++++
.../ignite/testsuites/IgniteCacheTestSuite2.java | 2 +
4 files changed, 592 insertions(+)
diff --git
a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/MdcAffinityBackupFilter.java
b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/MdcAffinityBackupFilter.java
new file mode 100644
index 00000000000..d800728056e
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/MdcAffinityBackupFilter.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.affinity.rendezvous;
+
+import java.util.List;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteBiPredicate;
+
+/**
+ * Multi-data center affinity backup filter that ensures each partition's data
is distributed across multiple data centers,
+ * providing high availability and fault tolerance. This implementation
guarantees at least one copy of the data in each
+ * data center and attempts to maintain the configured backup factor without
discarding copies.
+ * <p>
+ * The filter works by grouping nodes based on their data center
identification attribute (@see {@link ClusterNode#dataCenterId()})
+ * and ensuring that for every partition, at least one node from each data
center is included in the primary-backup set.
+ * <p>
+ * The filter will discard backup copies only if the number of available nodes
in a given data center is less
+ * than the number of copies assigned to that data center.
+ * For example, if a partition has 4 copies (1 primary and 3 backups) and the
cluster has 2 data centers,
+ * than 2 copies are assigned to each data center. The only scenario when just
a single copy is assigned to a node in a data center is when
+ * the number of nodes in that data center is one.
+ * <p>
+ * This class is constructed with a number of data centers the cluster spans
and a number of backups of the cache this filter is applied to.
+ * Implementation expects that all copies can be spread evenly across all data
centers. In other words, (backups + 1) is divisible by
+ * number of data centers without remainder. Uneven distributions of copies
are not supported.
+ * <p>
+ * Warning: Ensure that all nodes have a consistent and valid data center
identifier attribute. Missing or inconsistent values
+ * may lead to unexpected placement of data.
+ * </pre>
+ * <h2 class="header">Spring Example</h2>
+ * Create a partitioned cache template where each data center has at least one
copy of the data, and the backup count is maintained.
+ * <pre name="code" class="xml">
+ * <property name="cacheConfiguration">
+ * <list>
+ * <bean id="cache-template-bean" abstract="true"
class="org.apache.ignite.configuration.CacheConfiguration">
+ * <property name="name" value="JobcaseDefaultCacheConfig*"/>
+ * <property name="cacheMode" value="PARTITIONED" />
+ * <property name="backups" value="3" />
+ * <property name="affinity">
+ * <bean
class="org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction">
+ * <property name="affinityBackupFilter">
+ * <bean
class="org.apache.ignite.cache.affinity.rendezvous.MdcAffinityBackupFilter">
+ * <constructor-arg value="2"/> <!--
dcsNumber -->
+ * <constructor-arg value="3"/> <!--
backups, the same as in the cache template -->
+ * </bean>
+ * </property>
+ * </bean>
+ * </property>
+ * </bean>
+ * </list>
+ * </property>
+ * </pre>
+ * <p>
+ * With more backups, additional replicas can be distributed across different
data centers to further improve redundancy.
+ */
+public class MdcAffinityBackupFilter implements IgniteBiPredicate<ClusterNode,
List<ClusterNode>> {
+ /** */
+ private static final long serialVersionUID = 1L;
+
+ /** */
+ private final int partCopiesPerDc;
+
+ /**
+ * @param dcsNum Number of data centers.
+ * @param backups Number of backups.
+ */
+ public MdcAffinityBackupFilter(int dcsNum, int backups) {
+ if (dcsNum < 2) {
+ throw new IllegalArgumentException("MdcAffinityBackupFilter cannot
be used in an environment with only one datacenter. " +
+ "Number of datacenters must be at least 2.");
+ }
+
+ int numCopies = backups + 1;
+
+ partCopiesPerDc = numCopies / dcsNum;
+ int remainder = numCopies % dcsNum;
+
+ if (remainder != 0) {
+ String suggestion = "recommended ";
+ if (numCopies - remainder <= 0)
+ suggestion += "value is " + (backups + (dcsNum - remainder));
+ else
+ suggestion += "values are " + (backups - remainder) + " and "
+ (backups + (dcsNum - remainder));
+
+ throw new IllegalArgumentException("Number of copies is not
completely divisible by number of datacenters, " +
+ "copies cannot be distributed evenly across DCs. " +
+ "Please adjust the number of backups, " + suggestion);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(ClusterNode candidate, List<ClusterNode>
previouslySelected) {
+ String candidateDcId = candidate.dataCenterId();
+ int candDcCopiesAssigned = 0;
+
+ for (int i = 0; i < previouslySelected.size(); i++) {
+ String prevDcId = previouslySelected.get(i).dataCenterId();
+
+ if (prevDcId == null)
+ return false;
+
+ candDcCopiesAssigned += prevDcId.equals(candidateDcId) ? 1 : 0;
+ }
+
+ return candDcCopiesAssigned < partCopiesPerDc;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MdcAffinityBackupFilter.class, this);
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index fc18b656530..4d4fb35575d 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -434,6 +434,8 @@ public class ClusterCachesInfo {
"Affinity partitions count", locAttr.affinityPartitionsCount(),
rmtAttr.affinityPartitionsCount(), true);
+ // TODO IGNITE-26967 - implement validation of affinity backup filter.
+
CU.validateKeyConfigiration(rmtAttr.groupName(), rmtAttr.cacheName(),
rmt, rmtAttr.configuration().getKeyConfiguration(),
locAttr.configuration().getKeyConfiguration(), log, true);
diff --git
a/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/MdcAffinityBackupFilterSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/MdcAffinityBackupFilterSelfTest.java
new file mode 100644
index 00000000000..a45f3ccd901
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/MdcAffinityBackupFilterSelfTest.java
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.affinity.rendezvous;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
+
+/**
+ * Verifies behaviour of {@link MdcAffinityBackupFilter} - guarantees that
each DC has at least one copy of every partition.
+ * Verified distribution uniformity in each DC separately.
+ */
+public class MdcAffinityBackupFilterSelfTest extends GridCommonAbstractTest {
+ /** */
+ private static final int PARTS_CNT = 512;
+
+ /** */
+ private int backups;
+
+ /** */
+ private String[] dcIds;
+
+ /** */
+ private IgniteBiPredicate<ClusterNode, List<ClusterNode>> backupFilter;
+
+ /** */
+ private boolean persistenceEnabled;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setCacheConfiguration(defaultCacheConfiguration()
+ .setBackups(backups)
+ .setAffinity(
+ new RendezvousAffinityFunction(false, PARTS_CNT)
+ .setAffinityBackupFilter(backupFilter)));
+
+ if (persistenceEnabled) {
+ cfg.setDataStorageConfiguration(
+ new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration()
+ .setPersistenceEnabled(true)
+ )
+ );
+ }
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+
+ cleanPersistenceDir();
+
+ System.clearProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID);
+ }
+
+ /**
+ * Verifies that {@link MdcAffinityBackupFilter} prohibits single data
center deployment.
+ */
+ @Test
+ public void testSingleDcDeploymentIsProhibited() {
+ assertThrows(null,
+ () -> new MdcAffinityBackupFilter(1, 1),
+ IllegalArgumentException.class,
+ "Number of datacenters must be at least 2.");
+ }
+
+ /**
+ * Verifies that {@link MdcAffinityBackupFilter} enforces even number of
partition copies per datacenter.
+ */
+ @Test
+ public void testUniformNumberOfPartitionCopiesPerDcIsEnforced() {
+ assertThrows(null,
+ () -> new MdcAffinityBackupFilter(3, 1),
+ IllegalArgumentException.class,
+ "recommended value is 2");
+
+ assertThrows(null,
+ () -> new MdcAffinityBackupFilter(3, 7),
+ IllegalArgumentException.class,
+ "recommended values are 5 and 8");
+ }
+
+ /**
+ * Verifies that filter doesn't assing additional copies if baseline
topology is not changed,
+ * and it could lead to breaking of the guarantee `at least one copy of
partition per datacenter`.
+ * <p/>
+ * Verifies that after baseline topology change, guarantee is restored.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testMdcFilterWithBaselineTopology() throws Exception {
+ persistenceEnabled = true;
+ dcIds = new String[] {"DC_0", "DC_1"};
+ int nodesPerDc = 2;
+ backups = 1;
+ backupFilter = new MdcAffinityBackupFilter(dcIds.length, backups);
+
+ IgniteEx srv = startClusterAcrossDataCenters(dcIds, nodesPerDc);
+
+ srv.cluster().state(ACTIVE);
+
+ IgniteCache cache = srv.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+ Map<Integer, Set<UUID>> dc1OldAffDistr = affinityForPartitions(cache,
node -> node.dataCenterId().equals(dcIds[1]));
+
+ UUID srv0Id = grid(0).localNode().id();
+
+ Map<Integer, Set<UUID>> dc0AffDistr = affinityForPartitions(cache,
node -> node.dataCenterId().equals(dcIds[0]));
+
+ Set<Integer> srv0Partitions = dc0AffDistr
+ .entrySet().stream()
+ .filter(entry -> entry.getValue().contains(srv0Id))
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toSet());
+
+ stopGrid(0);
+
+ dc0AffDistr = affinityForPartitions(cache, node ->
node.dataCenterId().equals(dcIds[0]));
+
+ // Partitions from stopped node are not assigned to any node without
baseline topology change.
+ assertFalse(dc0AffDistr.keySet().containsAll(srv0Partitions));
+
+ Map<Integer, Set<UUID>> dc1NewAffDistr = affinityForPartitions(cache,
node -> node.dataCenterId().equals(dcIds[1]));
+
+ // Assignment of partitions in remote DC has not changed.
+ assertEquals(dc1OldAffDistr, dc1NewAffDistr);
+
+ srv.cluster().setBaselineTopology(srv.cluster().topologyVersion());
+ awaitPartitionMapExchange();
+
+ Set<Integer> allParts = Stream.iterate(0, i -> i +
1).limit(PARTS_CNT).collect(Collectors.toSet());
+ dc0AffDistr = affinityForPartitions(cache, node ->
node.dataCenterId().equals(dcIds[0]));
+ // After baseline topology change, guarantee is restored.
+ assertTrue(allParts.containsAll(dc0AffDistr.keySet()));
+ }
+
+ /**
+ * Verifies that partition copies are assigned evenly across a cluster in
two data centers.
+ * <p/>
+ * When a node from one data center is stopped, partition distribution is
that data center should stay uniform.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void test2DcDistribution() throws Exception {
+ dcIds = new String[] {"DC_0", "DC_1"};
+ int nodesPerDc = 4;
+ backups = 3;
+ backupFilter = new MdcAffinityBackupFilter(dcIds.length, backups);
+
+ IgniteEx srv = startClusterAcrossDataCenters(dcIds, nodesPerDc);
+
+ verifyDistributionGuarantees(srv, dcIds, nodesPerDc, backups);
+
+ //stopping one node in DC_1 should not compromise distribution as
there are additional nodes in the same DC
+ stopGrid(5);
+
+ awaitPartitionMapExchange();
+
+ verifyDistributionGuarantees(srv, dcIds, nodesPerDc, backups);
+
+ //stopping another node in DC_1 should not compromise distribution as
well
+ stopGrid(6);
+
+ awaitPartitionMapExchange();
+
+ verifyDistributionGuarantees(srv, dcIds, nodesPerDc, backups);
+ }
+
+ /**
+ * Verifies that partition copies are assigned evenly across a cluster in
three data centers.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void test3DcDistribution() throws Exception {
+ dcIds = new String[] {"DC_0", "DC_1", "DC_2"};
+ int nodesPerDc = 2;
+ backups = 5;
+ backupFilter = new MdcAffinityBackupFilter(dcIds.length, backups);
+
+ IgniteEx srv = startClusterAcrossDataCenters(dcIds, 2);
+
+ verifyDistributionGuarantees(srv, dcIds, nodesPerDc, backups);
+ }
+
+ /**
+ * Verifies that node is prohibited from joining cluster if its
affinityBackupFilter configuration differs
+ * from the one specified in the cluster.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ @Ignore("https://issues.apache.org/jira/browse/IGNITE-26967")
+ public void testAffinityFilterConfigurationValidation() throws Exception {
+ dcIds = new String[] {"DC_0", "DC_1"};
+ backups = 3;
+ backupFilter = new MdcAffinityBackupFilter(dcIds.length, backups);
+ startGrid(0);
+
+ backupFilter = new ClusterNodeAttributeAffinityBackupFilter("DC_ID");
+ try {
+ startGrid(1);
+
+ fail("Expected exception was not thrown.");
+ }
+ catch (IgniteCheckedException e) {
+ String errMsg = e.getMessage();
+
+ assertNotNull(errMsg);
+
+ assertTrue(errMsg.contains("Affinity backup filter class
mismatch"));
+ }
+
+ backupFilter = new MdcAffinityBackupFilter(dcIds.length, backups +
dcIds.length);
+ try {
+ startGrid(1);
+
+ fail("Expected exception was not thrown.");
+ }
+ catch (IgniteCheckedException e) {
+ String errMsg = e.getMessage();
+
+ assertNotNull(errMsg);
+
+ assertTrue(errMsg.contains("Affinity backup filter mismatch"));
+ }
+ }
+
+ /**
+ * Verifies that distribution of partitions in one datacenter
+ * doesn't change if nodes leave in another or if the other DC goes down
completely.
+ * In other words, no rebalance is triggered in one dc if some topology
changes happen in another.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testNoRebalanceInOneDcIfTopologyChangesInAnother() throws
Exception {
+ dcIds = new String[] {"DC_0", "DC_1"};
+ int nodesPerDc = 3;
+ backups = 3;
+ backupFilter = new MdcAffinityBackupFilter(dcIds.length, backups);
+ Predicate<ClusterNode> dc1NodesFilter = node ->
dcIds[1].equals(node.dataCenterId());
+
+ IgniteEx srv = startClusterAcrossDataCenters(dcIds, nodesPerDc);
+ awaitPartitionMapExchange();
+ Map<Integer, Set<UUID>> oldDistribution =
affinityForPartitions(srv.getOrCreateCache(DEFAULT_CACHE_NAME), dc1NodesFilter);
+
+ oldDistribution = verifyNoRebalancing(0, true, null, srv,
oldDistribution, dc1NodesFilter);
+ oldDistribution = verifyNoRebalancing(1, true, null, srv,
oldDistribution, dc1NodesFilter);
+ // Start srv0 back in DC_0 to make sure that neither starting nor
stopping a server doesn't affect
+ // affinity distribution in remote DC.
+ oldDistribution = verifyNoRebalancing(0, false, dcIds[0], srv,
oldDistribution, dc1NodesFilter);
+ // Stop it again.
+ oldDistribution = verifyNoRebalancing(0, true, null, srv,
oldDistribution, dc1NodesFilter);
+ verifyNoRebalancing(2, true, null, srv, oldDistribution,
dc1NodesFilter);
+ }
+
+ /** Starts specified number of nodes in each DC. */
+ private IgniteEx startClusterAcrossDataCenters(String[] dcIds, int
nodesPerDc) throws Exception {
+ int nodeIdx = 0;
+ IgniteEx lastNode = null;
+
+ for (String dcId : dcIds) {
+ System.setProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID,
dcId);
+
+ for (int i = 0; i < nodesPerDc; i++)
+ lastNode = startGrid(nodeIdx++);
+ }
+
+ return lastNode;
+ }
+
+ /** */
+ private Map<Integer, Set<UUID>> verifyNoRebalancing(
+ int srvIdx,
+ boolean stopSrv,
+ String startingSrvDcId,
+ IgniteEx srv,
+ Map<Integer, Set<UUID>> oldDistribution,
+ Predicate<ClusterNode> dc1NodesFilter
+ ) throws Exception {
+ if (stopSrv)
+ stopGrid(srvIdx);
+ else {
+ System.setProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID,
startingSrvDcId);
+ startGrid(srvIdx);
+ }
+ awaitPartitionMapExchange();
+ IgniteCache<Integer, Integer> cache =
srv.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+ Map<Integer, Set<UUID>> newDistribution = affinityForPartitions(cache,
dc1NodesFilter);
+
+ assertEquals(
+ String.format("Affinity distribution changed after server node %d
was stopped", srvIdx),
+ oldDistribution,
+ newDistribution);
+
+ return newDistribution;
+ }
+
+ /** */
+ private Map<Integer, Set<UUID>> affinityForPartitions(IgniteCache<Integer,
Integer> cache, Predicate<ClusterNode> dcFilter) {
+ Map<Integer, Set<UUID>> result = new HashMap<>(PARTS_CNT);
+ Affinity<Integer> aff = affinity(cache);
+
+ for (int i = 0; i < PARTS_CNT; i++) {
+ int j = i;
+
+ // For each partition, collect UUID of all its affinity nodes
passing the provided filter.
+ aff.mapKeyToPrimaryAndBackups(i)
+ .stream()
+ .filter(dcFilter)
+ .forEach(
+ node -> result.compute(j,
+ (k, v) -> {
+ if (v == null) {
+ Set<UUID> s = new HashSet<>();
+ s.add(node.id());
+ return s;
+ }
+ else {
+ v.add(node.id());
+ return v;
+ }
+ }));
+ }
+
+ return result;
+ }
+
+ /**
+ * Checks that copies of each partition are distributed evenly across data
centers and copies are spread evenly across nodes.
+ */
+ private void verifyDistributionGuarantees(
+ IgniteEx srv,
+ String[] dcIds,
+ int nodesPerDc,
+ int backups
+ ) {
+ IgniteCache cache = srv.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+ int partCnt = cacheConfiguration(srv.configuration(),
cache.getName()).getAffinity().partitions();
+ Affinity aff = affinity(cache);
+ int expectedCopiesPerNode = (backups + 1) / dcIds.length;
+
+ Map<ClusterNode, Integer> overallCopiesPerNode = new HashMap<>();
+ int[] copiesPerNode = new int[dcIds.length * nodesPerDc];
+
+ for (int partId = 0; partId < partCnt; partId++) {
+ int[] partCopiesPerDc = new int[dcIds.length];
+
+ Collection<ClusterNode> nodes =
aff.mapKeyToPrimaryAndBackups(partId);
+
+ //calculate actual number of copies in each data center
+ //aggregate copies per each node
+ for (ClusterNode node : nodes) {
+ copiesPerNode[(int)(node.order() - 1)]++;
+
+ overallCopiesPerNode.compute(node, (k, v) -> {
+ if (v == null)
+ return 1;
+ else
+ return v + 1;
+ });
+
+ for (int j = 0; j < dcIds.length; j++) {
+ if (node.dataCenterId().equals(dcIds[j])) {
+ partCopiesPerDc[j]++;
+ break;
+ }
+ }
+ }
+
+ verifyCopyInEachDcGuarantee(partId, expectedCopiesPerNode,
partCopiesPerDc);
+ }
+
+ verifyDistributionUniformity(dcIds, overallCopiesPerNode);
+ }
+
+ /** */
+ private void verifyCopyInEachDcGuarantee(int partId, int
expectedCopiesPerNode, int[] partCopiesPerDc) {
+ for (int dcIdx = 0; dcIdx < dcIds.length; dcIdx++) {
+ assertEquals(String.format("Unexpected number of copies of
partition %d in data center %s", partId, dcIds[dcIdx]),
+ expectedCopiesPerNode,
+ partCopiesPerDc[dcIdx]);
+ }
+ }
+
+ /** */
+ private void verifyDistributionUniformity(String[] dcIds, Map<ClusterNode,
Integer> overallCopiesPerNode) {
+ for (String dcId : dcIds) {
+ long nodesInDc = overallCopiesPerNode.entrySet().stream().filter(e
-> e.getKey().dataCenterId().equals(dcId)).count();
+
+ double idealCopiesPerNode = (double)((PARTS_CNT * (backups + 1)) /
(dcIds.length * nodesInDc));
+
+ List<Integer> numOfCopiesPerNode =
overallCopiesPerNode.entrySet().stream()
+ .filter(e ->
e.getKey().dataCenterId().equals(dcId)).map(Map.Entry::getValue).collect(Collectors.toList());
+
+ for (int copiesOnNode : numOfCopiesPerNode) {
+ double deviation = (Math.abs(copiesOnNode -
idealCopiesPerNode) / idealCopiesPerNode);
+
+ assertTrue(
+ String.format("Too big deviation from ideal distribution:
partitions assigned = %d, " +
+ "ideal partitions assigned = %d, deviation = %d",
+ copiesOnNode,
+ (int)idealCopiesPerNode,
+ (int)(deviation * 100)
+ ),
+ deviation < 0.1);
+ }
+ }
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
index 0c44992d8e4..97a6ab93f69 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
@@ -22,6 +22,7 @@ import java.util.Collection;
import java.util.List;
import
org.apache.ignite.cache.affinity.rendezvous.ClusterNodeAttributeAffinityBackupFilterSelfTest;
import
org.apache.ignite.cache.affinity.rendezvous.ClusterNodeAttributeColocatedBackupFilterSelfTest;
+import
org.apache.ignite.cache.affinity.rendezvous.MdcAffinityBackupFilterSelfTest;
import
org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunctionBackupFilterSelfTest;
import
org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunctionExcludeNeighborsSelfTest;
import
org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunctionFastPowerOfTwoHashSelfTest;
@@ -348,6 +349,7 @@ public class IgniteCacheTestSuite2 {
GridTestUtils.addTestIfNeeded(suite,
RendezvousAffinityFunctionBackupFilterSelfTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite,
ClusterNodeAttributeAffinityBackupFilterSelfTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite,
ClusterNodeAttributeColocatedBackupFilterSelfTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite,
MdcAffinityBackupFilterSelfTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, CachePartitionStateTest.class,
ignoredTests);
GridTestUtils.addTestIfNeeded(suite, CacheComparatorTest.class,
ignoredTests);