This is an automated email from the ASF dual-hosted git repository.

sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c140fbcd37 IGNITE-26580 Implement an affinity backup filter assigning 
partition copies evenly between data centers (#12459)
5c140fbcd37 is described below

commit 5c140fbcd375079ed0e1bbe9b0a3c4159e86d715
Author: Sergey Chugunov <[email protected]>
AuthorDate: Tue Nov 11 11:36:37 2025 +0400

    IGNITE-26580 Implement an affinity backup filter assigning partition copies 
evenly between data centers (#12459)
---
 .../rendezvous/MdcAffinityBackupFilter.java        | 128 ++++++
 .../processors/cache/ClusterCachesInfo.java        |   2 +
 .../MdcAffinityBackupFilterSelfTest.java           | 460 +++++++++++++++++++++
 .../ignite/testsuites/IgniteCacheTestSuite2.java   |   2 +
 4 files changed, 592 insertions(+)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/MdcAffinityBackupFilter.java
 
b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/MdcAffinityBackupFilter.java
new file mode 100644
index 00000000000..d800728056e
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/MdcAffinityBackupFilter.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.affinity.rendezvous;
+
+import java.util.List;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteBiPredicate;
+
+/**
+ * Multi-data center affinity backup filter that ensures each partition's data 
is distributed across multiple data centers,
+ * providing high availability and fault tolerance. This implementation 
guarantees at least one copy of the data in each
+ * data center and attempts to maintain the configured backup factor without 
discarding copies.
+ * <p>
+ * The filter works by grouping nodes based on their data center 
identification attribute (@see {@link ClusterNode#dataCenterId()})
+ * and ensuring that for every partition, at least one node from each data 
center is included in the primary-backup set.
+ * <p>
+ * The filter will discard backup copies only if the number of available nodes 
in a given data center is less
+ * than the number of copies assigned to that data center.
+ * For example, if a partition has 4 copies (1 primary and 3 backups) and the 
cluster has 2 data centers,
+ * than 2 copies are assigned to each data center. The only scenario when just 
a single copy is assigned to a node in a data center is when
+ * the number of nodes in that data center is one.
+ * <p>
+ * This class is constructed with a number of data centers the cluster spans 
and a number of backups of the cache this filter is applied to.
+ * Implementation expects that all copies can be spread evenly across all data 
centers. In other words, (backups + 1) is divisible by
+ * number of data centers without remainder. Uneven distributions of copies 
are not supported.
+ * <p>
+ * Warning: Ensure that all nodes have a consistent and valid data center 
identifier attribute. Missing or inconsistent values
+ * may lead to unexpected placement of data.
+ * </pre>
+ * <h2 class="header">Spring Example</h2>
+ * Create a partitioned cache template where each data center has at least one 
copy of the data, and the backup count is maintained.
+ * <pre name="code" class="xml">
+ * &lt;property name="cacheConfiguration"&gt;
+ *     &lt;list&gt;
+ *         &lt;bean id="cache-template-bean" abstract="true" 
class="org.apache.ignite.configuration.CacheConfiguration"&gt;
+ *             &lt;property name="name" value="JobcaseDefaultCacheConfig*"/&gt;
+ *             &lt;property name="cacheMode" value="PARTITIONED" /&gt;
+ *             &lt;property name="backups" value="3" /&gt;
+ *             &lt;property name="affinity"&gt;
+ *                 &lt;bean 
class="org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction"&gt;
+ *                     &lt;property name="affinityBackupFilter"&gt;
+ *                         &lt;bean 
class="org.apache.ignite.cache.affinity.rendezvous.MdcAffinityBackupFilter"&gt;
+ *                             &lt;constructor-arg value="2"/&gt; <!-- 
dcsNumber -->
+ *                             &lt;constructor-arg value="3"/&gt; <!-- 
backups, the same as in the cache template -->
+ *                         &lt;/bean&gt;
+ *                     &lt;/property&gt;
+ *                 &lt;/bean&gt;
+ *             &lt;/property&gt;
+ *         &lt;/bean&gt;
+ *     &lt;/list&gt;
+ * &lt;/property&gt;
+ * </pre>
+ * <p>
+ * With more backups, additional replicas can be distributed across different 
data centers to further improve redundancy.
+ */
+public class MdcAffinityBackupFilter implements IgniteBiPredicate<ClusterNode, 
List<ClusterNode>> {
+    /** */
+    private static final long serialVersionUID = 1L;
+
+    /** */
+    private final int partCopiesPerDc;
+
+    /**
+     * @param dcsNum Number of data centers.
+     * @param backups Number of backups.
+     */
+    public MdcAffinityBackupFilter(int dcsNum, int backups) {
+        if (dcsNum < 2) {
+            throw new IllegalArgumentException("MdcAffinityBackupFilter cannot 
be used in an environment with only one datacenter. " +
+                "Number of datacenters must be at least 2.");
+        }
+
+        int numCopies = backups + 1;
+
+        partCopiesPerDc = numCopies / dcsNum;
+        int remainder = numCopies % dcsNum;
+
+        if (remainder != 0) {
+            String suggestion = "recommended ";
+            if (numCopies - remainder <= 0)
+                suggestion += "value is " + (backups + (dcsNum - remainder));
+            else
+                suggestion += "values are " + (backups - remainder) + " and " 
+ (backups + (dcsNum - remainder));
+            
+            throw new IllegalArgumentException("Number of copies is not 
completely divisible by number of datacenters, " +
+                "copies cannot be distributed evenly across DCs. " +
+                "Please adjust the number of backups, " + suggestion);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean apply(ClusterNode candidate, List<ClusterNode> 
previouslySelected) {
+        String candidateDcId = candidate.dataCenterId();
+        int candDcCopiesAssigned = 0;
+
+        for (int i = 0; i < previouslySelected.size(); i++) {
+            String prevDcId = previouslySelected.get(i).dataCenterId();
+
+            if (prevDcId == null)
+                return false;
+
+            candDcCopiesAssigned += prevDcId.equals(candidateDcId) ? 1 : 0;
+        }
+
+        return candDcCopiesAssigned < partCopiesPerDc;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(MdcAffinityBackupFilter.class, this);
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index fc18b656530..4d4fb35575d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -434,6 +434,8 @@ public class ClusterCachesInfo {
             "Affinity partitions count", locAttr.affinityPartitionsCount(),
             rmtAttr.affinityPartitionsCount(), true);
 
+        // TODO IGNITE-26967 - implement validation of affinity backup filter.
+
         CU.validateKeyConfigiration(rmtAttr.groupName(), rmtAttr.cacheName(), 
rmt, rmtAttr.configuration().getKeyConfiguration(),
             locAttr.configuration().getKeyConfiguration(), log, true);
 
diff --git 
a/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/MdcAffinityBackupFilterSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/MdcAffinityBackupFilterSelfTest.java
new file mode 100644
index 00000000000..a45f3ccd901
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/MdcAffinityBackupFilterSelfTest.java
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.affinity.rendezvous;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
+
+/**
+ * Verifies behaviour of {@link MdcAffinityBackupFilter} - guarantees that 
each DC has at least one copy of every partition.
+ * Verified distribution uniformity in each DC separately.
+ */
+public class MdcAffinityBackupFilterSelfTest extends GridCommonAbstractTest {
+    /** */
+    private static final int PARTS_CNT = 512;
+
+    /** */
+    private int backups;
+
+    /** */
+    private String[] dcIds;
+
+    /** */
+    private IgniteBiPredicate<ClusterNode, List<ClusterNode>> backupFilter;
+
+    /** */
+    private boolean persistenceEnabled;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setCacheConfiguration(defaultCacheConfiguration()
+            .setBackups(backups)
+            .setAffinity(
+                new RendezvousAffinityFunction(false, PARTS_CNT)
+                    .setAffinityBackupFilter(backupFilter)));
+
+        if (persistenceEnabled) {
+            cfg.setDataStorageConfiguration(
+                new DataStorageConfiguration()
+                    .setDefaultDataRegionConfiguration(
+                        new DataRegionConfiguration()
+                            .setPersistenceEnabled(true)
+                    )
+            );
+        }
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+
+        cleanPersistenceDir();
+
+        System.clearProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID);
+    }
+
+    /**
+     * Verifies that {@link MdcAffinityBackupFilter} prohibits single data 
center deployment.
+     */
+    @Test
+    public void testSingleDcDeploymentIsProhibited() {
+        assertThrows(null,
+            () -> new MdcAffinityBackupFilter(1, 1),
+            IllegalArgumentException.class,
+            "Number of datacenters must be at least 2.");
+    }
+
+    /**
+     * Verifies that {@link MdcAffinityBackupFilter} enforces even number of 
partition copies per datacenter.
+     */
+    @Test
+    public void testUniformNumberOfPartitionCopiesPerDcIsEnforced() {
+        assertThrows(null,
+            () -> new MdcAffinityBackupFilter(3, 1),
+            IllegalArgumentException.class,
+            "recommended value is 2");
+
+        assertThrows(null,
+            () -> new MdcAffinityBackupFilter(3, 7),
+            IllegalArgumentException.class,
+            "recommended values are 5 and 8");
+    }
+
+    /**
+     * Verifies that filter doesn't assing additional copies if baseline 
topology is not changed,
+     * and it could lead to breaking of the guarantee `at least one copy of 
partition per datacenter`.
+     * <p/>
+     * Verifies that after baseline topology change, guarantee is restored.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testMdcFilterWithBaselineTopology() throws Exception {
+        persistenceEnabled = true;
+        dcIds = new String[] {"DC_0", "DC_1"};
+        int nodesPerDc = 2;
+        backups = 1;
+        backupFilter = new MdcAffinityBackupFilter(dcIds.length, backups);
+
+        IgniteEx srv = startClusterAcrossDataCenters(dcIds, nodesPerDc);
+
+        srv.cluster().state(ACTIVE);
+
+        IgniteCache cache = srv.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+        Map<Integer, Set<UUID>> dc1OldAffDistr = affinityForPartitions(cache, 
node -> node.dataCenterId().equals(dcIds[1]));
+
+        UUID srv0Id = grid(0).localNode().id();
+
+        Map<Integer, Set<UUID>> dc0AffDistr = affinityForPartitions(cache, 
node -> node.dataCenterId().equals(dcIds[0]));
+
+        Set<Integer> srv0Partitions = dc0AffDistr
+            .entrySet().stream()
+            .filter(entry -> entry.getValue().contains(srv0Id))
+            .map(Map.Entry::getKey)
+            .collect(Collectors.toSet());
+
+        stopGrid(0);
+
+        dc0AffDistr = affinityForPartitions(cache, node -> 
node.dataCenterId().equals(dcIds[0]));
+
+        // Partitions from stopped node are not assigned to any node without 
baseline topology change.
+        assertFalse(dc0AffDistr.keySet().containsAll(srv0Partitions));
+
+        Map<Integer, Set<UUID>> dc1NewAffDistr = affinityForPartitions(cache, 
node -> node.dataCenterId().equals(dcIds[1]));
+
+        // Assignment of partitions in remote DC has not changed.
+        assertEquals(dc1OldAffDistr, dc1NewAffDistr);
+
+        srv.cluster().setBaselineTopology(srv.cluster().topologyVersion());
+        awaitPartitionMapExchange();
+
+        Set<Integer> allParts = Stream.iterate(0, i -> i + 
1).limit(PARTS_CNT).collect(Collectors.toSet());
+        dc0AffDistr = affinityForPartitions(cache, node -> 
node.dataCenterId().equals(dcIds[0]));
+        // After baseline topology change, guarantee is restored.
+        assertTrue(allParts.containsAll(dc0AffDistr.keySet()));
+    }
+
+    /**
+     * Verifies that partition copies are assigned evenly across a cluster in 
two data centers.
+     * <p/>
+     * When a node from one data center is stopped, partition distribution is 
that data center should stay uniform.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void test2DcDistribution() throws Exception {
+        dcIds = new String[] {"DC_0", "DC_1"};
+        int nodesPerDc = 4;
+        backups = 3;
+        backupFilter = new MdcAffinityBackupFilter(dcIds.length, backups);
+
+        IgniteEx srv = startClusterAcrossDataCenters(dcIds, nodesPerDc);
+
+        verifyDistributionGuarantees(srv, dcIds, nodesPerDc, backups);
+
+        //stopping one node in DC_1 should not compromise distribution as 
there are additional nodes in the same DC
+        stopGrid(5);
+
+        awaitPartitionMapExchange();
+
+        verifyDistributionGuarantees(srv, dcIds, nodesPerDc, backups);
+
+        //stopping another node in DC_1 should not compromise distribution as 
well
+        stopGrid(6);
+
+        awaitPartitionMapExchange();
+
+        verifyDistributionGuarantees(srv, dcIds, nodesPerDc, backups);
+    }
+
+    /**
+     * Verifies that partition copies are assigned evenly across a cluster in 
three data centers.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void test3DcDistribution() throws Exception {
+        dcIds = new String[] {"DC_0", "DC_1", "DC_2"};
+        int nodesPerDc = 2;
+        backups = 5;
+        backupFilter = new MdcAffinityBackupFilter(dcIds.length, backups);
+
+        IgniteEx srv = startClusterAcrossDataCenters(dcIds, 2);
+
+        verifyDistributionGuarantees(srv, dcIds, nodesPerDc, backups);
+    }
+
+    /**
+     * Verifies that node is prohibited from joining cluster if its 
affinityBackupFilter configuration differs
+     * from the one specified in the cluster.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    @Ignore("https://issues.apache.org/jira/browse/IGNITE-26967";)
+    public void testAffinityFilterConfigurationValidation() throws Exception {
+        dcIds = new String[] {"DC_0", "DC_1"};
+        backups = 3;
+        backupFilter = new MdcAffinityBackupFilter(dcIds.length, backups);
+        startGrid(0);
+
+        backupFilter = new ClusterNodeAttributeAffinityBackupFilter("DC_ID");
+        try {
+            startGrid(1);
+
+            fail("Expected exception was not thrown.");
+        }
+        catch (IgniteCheckedException e) {
+            String errMsg = e.getMessage();
+
+            assertNotNull(errMsg);
+
+            assertTrue(errMsg.contains("Affinity backup filter class 
mismatch"));
+        }
+
+        backupFilter = new MdcAffinityBackupFilter(dcIds.length, backups + 
dcIds.length);
+        try {
+            startGrid(1);
+
+            fail("Expected exception was not thrown.");
+        }
+        catch (IgniteCheckedException e) {
+            String errMsg = e.getMessage();
+
+            assertNotNull(errMsg);
+
+            assertTrue(errMsg.contains("Affinity backup filter mismatch"));
+        }
+    }
+
+    /**
+     * Verifies that distribution of partitions in one datacenter
+     * doesn't change if nodes leave in another or if the other DC goes down 
completely.
+     * In other words, no rebalance is triggered in one dc if some topology 
changes happen in another.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testNoRebalanceInOneDcIfTopologyChangesInAnother() throws 
Exception {
+        dcIds = new String[] {"DC_0", "DC_1"};
+        int nodesPerDc = 3;
+        backups = 3;
+        backupFilter = new MdcAffinityBackupFilter(dcIds.length, backups);
+        Predicate<ClusterNode> dc1NodesFilter = node -> 
dcIds[1].equals(node.dataCenterId());
+
+        IgniteEx srv = startClusterAcrossDataCenters(dcIds, nodesPerDc);
+        awaitPartitionMapExchange();
+        Map<Integer, Set<UUID>> oldDistribution = 
affinityForPartitions(srv.getOrCreateCache(DEFAULT_CACHE_NAME), dc1NodesFilter);
+
+        oldDistribution = verifyNoRebalancing(0, true, null, srv, 
oldDistribution, dc1NodesFilter);
+        oldDistribution = verifyNoRebalancing(1, true, null, srv, 
oldDistribution, dc1NodesFilter);
+        // Start srv0 back in DC_0 to make sure that neither starting nor 
stopping a server doesn't affect
+        // affinity distribution in remote DC.
+        oldDistribution = verifyNoRebalancing(0, false, dcIds[0], srv, 
oldDistribution, dc1NodesFilter);
+        // Stop it again.
+        oldDistribution = verifyNoRebalancing(0, true, null, srv, 
oldDistribution, dc1NodesFilter);
+        verifyNoRebalancing(2, true, null, srv, oldDistribution, 
dc1NodesFilter);
+    }
+
+    /** Starts specified number of nodes in each DC. */
+    private IgniteEx startClusterAcrossDataCenters(String[] dcIds, int 
nodesPerDc) throws Exception {
+        int nodeIdx = 0;
+        IgniteEx lastNode = null;
+
+        for (String dcId : dcIds) {
+            System.setProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID, 
dcId);
+
+            for (int i = 0; i < nodesPerDc; i++)
+                lastNode = startGrid(nodeIdx++);
+        }
+
+        return lastNode;
+    }
+
+    /** */
+    private Map<Integer, Set<UUID>> verifyNoRebalancing(
+        int srvIdx,
+        boolean stopSrv,
+        String startingSrvDcId,
+        IgniteEx srv,
+        Map<Integer, Set<UUID>> oldDistribution,
+        Predicate<ClusterNode> dc1NodesFilter
+    ) throws Exception {
+        if (stopSrv)
+            stopGrid(srvIdx);
+        else {
+            System.setProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID, 
startingSrvDcId);
+            startGrid(srvIdx);
+        }
+        awaitPartitionMapExchange();
+        IgniteCache<Integer, Integer> cache = 
srv.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+        Map<Integer, Set<UUID>> newDistribution = affinityForPartitions(cache, 
dc1NodesFilter);
+
+        assertEquals(
+            String.format("Affinity distribution changed after server node %d 
was stopped", srvIdx),
+            oldDistribution,
+            newDistribution);
+
+        return newDistribution;
+    }
+
+    /** */
+    private Map<Integer, Set<UUID>> affinityForPartitions(IgniteCache<Integer, 
Integer> cache, Predicate<ClusterNode> dcFilter) {
+        Map<Integer, Set<UUID>> result = new HashMap<>(PARTS_CNT);
+        Affinity<Integer> aff = affinity(cache);
+
+        for (int i = 0; i < PARTS_CNT; i++) {
+            int j = i;
+
+            // For each partition, collect UUID of all its affinity nodes 
passing the provided filter.
+            aff.mapKeyToPrimaryAndBackups(i)
+                .stream()
+                .filter(dcFilter)
+                .forEach(
+                    node -> result.compute(j,
+                        (k, v) -> {
+                            if (v == null) {
+                                Set<UUID> s = new HashSet<>();
+                                s.add(node.id());
+                                return s;
+                            }
+                            else {
+                                v.add(node.id());
+                                return v;
+                            }
+                        }));
+        }
+
+        return result;
+    }
+
+    /**
+     * Checks that copies of each partition are distributed evenly across data 
centers and copies are spread evenly across nodes.
+     */
+    private void verifyDistributionGuarantees(
+        IgniteEx srv,
+        String[] dcIds,
+        int nodesPerDc,
+        int backups
+    ) {
+        IgniteCache cache = srv.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+        int partCnt = cacheConfiguration(srv.configuration(), 
cache.getName()).getAffinity().partitions();
+        Affinity aff = affinity(cache);
+        int expectedCopiesPerNode = (backups + 1) / dcIds.length;
+
+        Map<ClusterNode, Integer> overallCopiesPerNode = new HashMap<>();
+        int[] copiesPerNode = new int[dcIds.length * nodesPerDc];
+
+        for (int partId = 0; partId < partCnt; partId++) {
+            int[] partCopiesPerDc = new int[dcIds.length];
+
+            Collection<ClusterNode> nodes = 
aff.mapKeyToPrimaryAndBackups(partId);
+
+            //calculate actual number of copies in each data center
+            //aggregate copies per each node
+            for (ClusterNode node : nodes) {
+                copiesPerNode[(int)(node.order() - 1)]++;
+
+                overallCopiesPerNode.compute(node, (k, v) -> {
+                    if (v == null)
+                        return 1;
+                    else
+                        return v + 1;
+                });
+
+                for (int j = 0; j < dcIds.length; j++) {
+                    if (node.dataCenterId().equals(dcIds[j])) {
+                        partCopiesPerDc[j]++;
+                        break;
+                    }
+                }
+            }
+
+            verifyCopyInEachDcGuarantee(partId, expectedCopiesPerNode, 
partCopiesPerDc);
+        }
+
+        verifyDistributionUniformity(dcIds, overallCopiesPerNode);
+    }
+
+    /** */
+    private void verifyCopyInEachDcGuarantee(int partId, int 
expectedCopiesPerNode, int[] partCopiesPerDc) {
+        for (int dcIdx = 0; dcIdx < dcIds.length; dcIdx++) {
+            assertEquals(String.format("Unexpected number of copies of 
partition %d in data center %s", partId, dcIds[dcIdx]),
+                expectedCopiesPerNode,
+                partCopiesPerDc[dcIdx]);
+        }
+    }
+
+    /** */
+    private void verifyDistributionUniformity(String[] dcIds, Map<ClusterNode, 
Integer> overallCopiesPerNode) {
+        for (String dcId : dcIds) {
+            long nodesInDc = overallCopiesPerNode.entrySet().stream().filter(e 
-> e.getKey().dataCenterId().equals(dcId)).count();
+
+            double idealCopiesPerNode = (double)((PARTS_CNT * (backups + 1)) / 
(dcIds.length * nodesInDc));
+
+            List<Integer> numOfCopiesPerNode = 
overallCopiesPerNode.entrySet().stream()
+                .filter(e -> 
e.getKey().dataCenterId().equals(dcId)).map(Map.Entry::getValue).collect(Collectors.toList());
+
+            for (int copiesOnNode : numOfCopiesPerNode) {
+                double deviation = (Math.abs(copiesOnNode - 
idealCopiesPerNode) / idealCopiesPerNode);
+
+                assertTrue(
+                    String.format("Too big deviation from ideal distribution: 
partitions assigned = %d, " +
+                            "ideal partitions assigned = %d, deviation = %d",
+                        copiesOnNode,
+                        (int)idealCopiesPerNode,
+                        (int)(deviation * 100)
+                    ),
+                    deviation < 0.1);
+            }
+        }
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
index 0c44992d8e4..97a6ab93f69 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
@@ -22,6 +22,7 @@ import java.util.Collection;
 import java.util.List;
 import 
org.apache.ignite.cache.affinity.rendezvous.ClusterNodeAttributeAffinityBackupFilterSelfTest;
 import 
org.apache.ignite.cache.affinity.rendezvous.ClusterNodeAttributeColocatedBackupFilterSelfTest;
+import 
org.apache.ignite.cache.affinity.rendezvous.MdcAffinityBackupFilterSelfTest;
 import 
org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunctionBackupFilterSelfTest;
 import 
org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunctionExcludeNeighborsSelfTest;
 import 
org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunctionFastPowerOfTwoHashSelfTest;
@@ -348,6 +349,7 @@ public class IgniteCacheTestSuite2 {
         GridTestUtils.addTestIfNeeded(suite, 
RendezvousAffinityFunctionBackupFilterSelfTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, 
ClusterNodeAttributeAffinityBackupFilterSelfTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, 
ClusterNodeAttributeColocatedBackupFilterSelfTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, 
MdcAffinityBackupFilterSelfTest.class, ignoredTests);
 
         GridTestUtils.addTestIfNeeded(suite, CachePartitionStateTest.class, 
ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, CacheComparatorTest.class, 
ignoredTests);

Reply via email to