This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new a148e7b6441 IGNITE-26583 Rebalancing optimization for MultiDC - Fixes 
#12473.
a148e7b6441 is described below

commit a148e7b6441993704f662ddcab4ab045c833c84e
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Sat Nov 1 16:57:58 2025 +0300

    IGNITE-26583 Rebalancing optimization for MultiDC - Fixes #12473.
    
    Signed-off-by: Aleksey Plekhanov <[email protected]>
---
 .../jol/GridAffinityAssignmentJolBenchmark.java    |   1 -
 .../org/apache/ignite/cluster/ClusterNode.java     |   3 +-
 .../org/apache/ignite/internal/IgniteKernal.java   |   7 +
 .../managers/discovery/GridDiscoveryManager.java   |   8 -
 .../dht/preloader/GridDhtPreloader.java            |  26 ++-
 .../ignite/spi/discovery/tcp/TcpDiscoverySpi.java  |   3 -
 .../discovery/tcp/internal/TcpDiscoveryNode.java   |  18 +-
 .../affinity/GridAffinityAssignmentV2Test.java     |   1 -
 .../rebalancing/MultiDcRebalancingTest.java        | 217 +++++++++++++++++++++
 .../ignite/testsuites/IgniteCacheTestSuite8.java   |   3 +
 .../spi/discovery/zk/ZookeeperDiscoverySpi.java    |   3 -
 .../zk/internal/ZookeeperClusterNode.java          |  16 +-
 12 files changed, 257 insertions(+), 49 deletions(-)

diff --git 
a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jol/GridAffinityAssignmentJolBenchmark.java
 
b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jol/GridAffinityAssignmentJolBenchmark.java
index d75f590be08..df474379073 100644
--- 
a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jol/GridAffinityAssignmentJolBenchmark.java
+++ 
b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jol/GridAffinityAssignmentJolBenchmark.java
@@ -328,7 +328,6 @@ public class GridAffinityAssignmentJolBenchmark {
     private static ClusterNode node(int idx) {
         TcpDiscoveryNode node = new TcpDiscoveryNode(
             UUID.randomUUID(),
-            null,
             Collections.singletonList("127.0.0.1"),
             Collections.singletonList("127.0.0.1"),
             0,
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java 
b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java
index ff02b9704a0..bc698c38889 100644
--- a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java
+++ b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteNodeAttributes;
 import org.apache.ignite.lang.IgniteExperimental;
 import org.apache.ignite.lang.IgniteProductVersion;
 import org.jetbrains.annotations.Nullable;
@@ -183,7 +184,7 @@ public interface ClusterNode extends BaselineNode {
      */
     @IgniteExperimental
     @Nullable public default String dataCenterId() {
-        return null;
+        return attribute(IgniteNodeAttributes.ATTR_DATA_CENTER_ID);
     }
 
     /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index a3e0ac10b61..1878ef042e8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -221,6 +221,7 @@ import org.jetbrains.annotations.Nullable;
 import static java.util.Collections.singleton;
 import static java.util.Optional.ofNullable;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_DATA_CENTER_ID;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK;
 import static org.apache.ignite.IgniteSystemProperties.getBoolean;
@@ -236,6 +237,7 @@ import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_BUILD_DATE;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_BUILD_VER;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CLIENT_MODE;
 import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CONSISTENCY_CHECK_SKIPPED;
+import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_CENTER_ID;
 import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_STORAGE_CONFIG;
 import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_STREAMER_POOL_SIZE;
 import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE;
@@ -1606,6 +1608,11 @@ public class IgniteKernal implements IgniteEx, 
Externalizable {
         add(ATTR_TX_SERIALIZABLE_ENABLED, 
cfg.getTransactionConfiguration().isTxSerializableEnabled());
         add(ATTR_TX_AWARE_QUERIES_ENABLED, 
cfg.getTransactionConfiguration().isTxAwareQueriesEnabled());
 
+        if (IgniteSystemProperties.getString(IGNITE_DATA_CENTER_ID) != null)
+            add(ATTR_DATA_CENTER_ID, 
IgniteSystemProperties.getString(IGNITE_DATA_CENTER_ID));
+        else if (userAttrs != null && userAttrs.get(IGNITE_DATA_CENTER_ID) != 
null)
+            add(ATTR_DATA_CENTER_ID, 
(Serializable)userAttrs.get(IGNITE_DATA_CENTER_ID));
+
         // Stick in SPI versions and classes attributes.
         addSpiAttributes(cfg.getCollisionSpi());
         addSpiAttributes(cfg.getDiscoverySpi());
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 14e26e7af65..7334cfc16c9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -150,7 +150,6 @@ import org.jetbrains.annotations.Nullable;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2;
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_DATA_CENTER_ID;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_DISCOVERY_HISTORY_SIZE;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_SECURITY_COMPATIBILITY_MODE;
@@ -166,7 +165,6 @@ import static 
org.apache.ignite.events.EventType.EVT_NODE_METRICS_UPDATED;
 import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED;
 import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
 import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
-import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_CENTER_ID;
 import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_REGIONS_OFFHEAP_SIZE;
 import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE;
 import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_LATE_AFFINITY_ASSIGNMENT;
@@ -486,12 +484,6 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
         ctx.addNodeAttribute(ATTR_OFFHEAP_SIZE, requiredOffheap());
         ctx.addNodeAttribute(ATTR_DATA_REGIONS_OFFHEAP_SIZE, 
configuredOffheap());
 
-        // TODO When exposing to public interface, replace the retrieval in 
IgniteClusterNode implementations.
-        String dcId = IgniteSystemProperties.getString(IGNITE_DATA_CENTER_ID);
-
-        if (dcId != null)
-            ctx.addNodeAttribute(ATTR_DATA_CENTER_ID, dcId);
-
         DiscoverySpi spi = getSpi();
 
         discoOrdered = discoOrdered();
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 606ba1352ae..b4126c955f9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -19,7 +19,9 @@ package 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -47,7 +49,9 @@ import org.apache.ignite.internal.util.lang.GridPlainRunnable;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.SB;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgnitePredicate;
 import org.jetbrains.annotations.Nullable;
 
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_DISABLE_REBALANCING_CANCELLATION_OPTIMIZATION;
@@ -195,6 +199,10 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
 
         CachePartitionFullCountersMap countersMap = 
grp.topology().fullUpdateCounters();
 
+        String dcId = ctx.localNode().dataCenterId();
+        Collection<UUID> sameDcNodeIds = dcId == null ? null : new 
HashSet<>(F.viewReadOnly(
+            ctx.discovery().aliveServerNodes(), ClusterNode::id, n -> 
Objects.equals(n.dataCenterId(), dcId)));
+
         for (int p = 0; p < partitions; p++) {
             if (ctx.exchange().hasPendingServerExchange()) {
                 if (log.isDebugEnabled())
@@ -228,8 +236,12 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
                 if (grp.persistenceEnabled() && exchFut != null) {
                     List<UUID> nodeIds = 
exchFut.partitionHistorySupplier(grp.groupId(), p, part.initialUpdateCounter());
 
-                    if (!F.isEmpty(nodeIds))
+                    if (!F.isEmpty(nodeIds)) {
+                        if (sameDcNodeIds != null)
+                            nodeIds = retainNodesNotEmpty(nodeIds, 
sameDcNodeIds::contains);
+
                         histSupplier = ctx.discovery().node(nodeIds.get(p % 
nodeIds.size()));
+                    }
                 }
 
                 if (histSupplier != null && !exchFut.isClearingPartition(grp, 
p)) {
@@ -260,6 +272,9 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
                     });
 
                     if (!picked.isEmpty()) {
+                        if (dcId != null)
+                            picked = retainNodesNotEmpty(picked, n -> 
Objects.equals(dcId, n.dataCenterId()));
+
                         ClusterNode n = picked.get(p % picked.size());
 
                         GridDhtPartitionDemandMessage msg = assignments.get(n);
@@ -303,6 +318,15 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
         return assignments;
     }
 
+    /**
+     * Retains nodes which satisfy filter. Returns original list if result set 
is empty.
+     */
+    private <T> List<T> retainNodesNotEmpty(List<T> nodes, IgnitePredicate<T> 
filter) {
+        List<T> nodes0 = U.arrayList(nodes, filter);
+
+        return !F.isEmpty(nodes0) ? nodes0 : nodes;
+    }
+
     /** {@inheritDoc} */
     @Override public void onReconnected() {
         startFut = new GridFutureAdapter<>();
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index f3ee38abbdc..6df10d88a0f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -114,7 +114,6 @@ import org.jetbrains.annotations.TestOnly;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_CONSISTENT_ID_BY_HOST_WITHOUT_PORT;
 import static org.apache.ignite.IgniteSystemProperties.getBoolean;
 import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
-import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_CENTER_ID;
 import static 
org.apache.ignite.internal.managers.discovery.GridDiscoveryManager.DISCO_METRICS;
 
 /**
@@ -1169,8 +1168,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter 
implements IgniteDiscovery
 
         locNode = new TcpDiscoveryNode(
             ignite.configuration().getNodeId(),
-            //TODO remove usage of internal API when an alternative from 
public API is available
-            
(String)((IgniteEx)ignite).context().nodeAttributes().get(ATTR_DATA_CENTER_ID),
             addrs.get1(),
             addrs.get2(),
             srvPort,
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
index 36727c5b66d..89f1f492e92 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
@@ -48,7 +48,6 @@ import 
org.apache.ignite.spi.discovery.DiscoveryMetricsProvider;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.jetbrains.annotations.Nullable;
 
-import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_CENTER_ID;
 import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_NODE_CONSISTENT_ID;
 import static org.apache.ignite.internal.util.lang.ClusterNodeFunc.eqNodes;
 
@@ -74,9 +73,6 @@ public class TcpDiscoveryNode extends 
GridMetadataAwareAdapter implements Ignite
     @GridToStringExclude
     private Map<String, Object> attrs;
 
-    /** Data center ID of the node. */
-    private String dcId;
-
     /** Internal discovery addresses as strings. */
     @GridToStringInclude
     private Collection<String> addrs;
@@ -159,7 +155,6 @@ public class TcpDiscoveryNode extends 
GridMetadataAwareAdapter implements Ignite
      * Constructor.
      *
      * @param id Node Id.
-     * @param dcId ID of a data center where this node is started ({@code 
null} if there is only one data center).
      * @param addrs Addresses.
      * @param hostNames Host names.
      * @param discPort Port.
@@ -168,7 +163,6 @@ public class TcpDiscoveryNode extends 
GridMetadataAwareAdapter implements Ignite
      * @param consistentId Node consistent ID.
      */
     public TcpDiscoveryNode(UUID id,
-        String dcId,
         Collection<String> addrs,
         Collection<String> hostNames,
         int discPort,
@@ -181,7 +175,6 @@ public class TcpDiscoveryNode extends 
GridMetadataAwareAdapter implements Ignite
         assert ver != null;
 
         this.id = id;
-        this.dcId = dcId;
 
         List<String> sortedAddrs = new ArrayList<>(addrs);
 
@@ -360,11 +353,6 @@ public class TcpDiscoveryNode extends 
GridMetadataAwareAdapter implements Ignite
         this.ver = ver;
     }
 
-    /** {@inheritDoc} */
-    @Override public @Nullable String dataCenterId() {
-        return dcId;
-    }
-
     /** {@inheritDoc} */
     @Override public Collection<String> addresses() {
         return addrs;
@@ -541,7 +529,7 @@ public class TcpDiscoveryNode extends 
GridMetadataAwareAdapter implements Ignite
      */
     public TcpDiscoveryNode clientReconnectNode(Map<String, Object> nodeAttrs) 
{
         TcpDiscoveryNode node = new TcpDiscoveryNode(
-            id, dcId, addrs, hostNames, discPort, metricsProvider, ver, null
+            id, addrs, hostNames, discPort, metricsProvider, ver, null
         );
 
         node.attrs = Collections.unmodifiableMap(new HashMap<>(nodeAttrs));
@@ -627,8 +615,6 @@ public class TcpDiscoveryNode extends 
GridMetadataAwareAdapter implements Ignite
             consistentId = consistentIdAttr != null ? consistentIdAttr : id;
         else
             consistentId = consistentIdAttr != null ? consistentIdAttr : 
U.consistentId(addrs, discPort);
-
-        dcId = (String)attrs.get(ATTR_DATA_CENTER_ID);
     }
 
     /** {@inheritDoc} */
@@ -643,7 +629,7 @@ public class TcpDiscoveryNode extends 
GridMetadataAwareAdapter implements Ignite
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        return S.toString(TcpDiscoveryNode.class, this, "isClient", 
isClient());
+        return S.toString(TcpDiscoveryNode.class, this, "isClient", 
isClient(), "dataCenterId", dataCenterId());
     }
 
     /**
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentV2Test.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentV2Test.java
index 7d3caae9f63..5d8e966851c 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentV2Test.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentV2Test.java
@@ -237,7 +237,6 @@ public class GridAffinityAssignmentV2Test {
     protected TcpDiscoveryNode node(DiscoveryMetricsProvider metrics, 
IgniteProductVersion v, String consistentId) {
         TcpDiscoveryNode node = new TcpDiscoveryNode(
             UUID.randomUUID(),
-            null,
             Collections.singletonList("127.0.0.1"),
             Collections.singletonList("127.0.0.1"),
             0,
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/MultiDcRebalancingTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/MultiDcRebalancingTest.java
new file mode 100644
index 00000000000..bb71087a7e7
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/MultiDcRebalancingTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemander;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_PREFER_WAL_REBALANCE;
+
+/** */
+@WithSystemProperty(key = IGNITE_PREFER_WAL_REBALANCE, value = "true")
+public class MultiDcRebalancingTest extends GridCommonAbstractTest {
+    /** */
+    private static final String DC1 = "DC1";
+
+    /** */
+    private static final String DC2 = "DC2";
+
+    /** */
+    private boolean pds;
+
+    /** */
+    private IgniteEx startGrid(int idx, String dcId) throws Exception {
+        return startGrid(getConfiguration(getTestIgniteInstanceName(idx))
+            .setDataStorageConfiguration(new 
DataStorageConfiguration().setDefaultDataRegionConfiguration(
+                new DataRegionConfiguration().setPersistenceEnabled(pds)))
+            .setConsistentId(getTestIgniteInstanceName(idx))
+            .setCommunicationSpi(new RebalanceAwareCommSPI())
+            
.setUserAttributes(F.asMap(IgniteSystemProperties.IGNITE_DATA_CENTER_ID, 
dcId)));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+        cleanPersistenceDir();
+    }
+
+    /** */
+    @Test
+    public void testFullRebalanceSameDC() throws Exception {
+        checkFullRebalance(false);
+    }
+
+    /** */
+    @Test
+    public void testFullRebalanceCrossDC() throws Exception {
+        checkFullRebalance(true);
+    }
+
+    /** */
+    @Test
+    public void testHistoricalRebalanceSameDC() throws Exception {
+        checkHistoricalRebalance(false);
+    }
+
+    /** */
+    @Test
+    public void testHistoricalRebalanceCrossDC() throws Exception {
+        checkHistoricalRebalance(true);
+    }
+
+    /** */
+    private void checkFullRebalance(boolean crossDC) throws Exception {
+        startGrid(0, DC1);
+        startGrid(1, crossDC ? DC1 : DC2);
+
+        grid(0).cluster().baselineAutoAdjustEnabled(false);
+
+        IgniteCache<Object, Object> cache = grid(0).createCache(
+            new CacheConfiguration<>(DEFAULT_CACHE_NAME).setBackups(1));
+
+        for (int i = 0; i < 10_000; i++)
+            cache.put(i, i);
+
+        IgniteEx ignite2 = startGrid(2, crossDC ? DC2 : DC1);
+        IgniteEx ignite3 = startGrid(3, DC2);
+
+        resetBaselineTopology();
+
+        waitRebalanceFinished(ignite2);
+        waitRebalanceFinished(ignite3);
+
+        assertTrue(commSPI(ignite2).rebalanceMsgCnt > 0);
+        assertTrue(commSPI(ignite3).rebalanceMsgCnt > 0);
+        assertFalse(commSPI(ignite2).historical);
+        assertFalse(commSPI(ignite3).historical);
+        assertEquals(crossDC, commSPI(ignite2).crossDcRebalance);
+        assertEquals(crossDC, commSPI(ignite3).crossDcRebalance);
+    }
+
+    /** */
+    private void checkHistoricalRebalance(boolean crossDC) throws Exception {
+        pds = true;
+
+        startGrid(0, DC1);
+        startGrid(1, DC2);
+        startGrid(2, DC1);
+
+        grid(0).cluster().state(ClusterState.ACTIVE);
+
+        IgniteCache<Object, Object> cache = grid(0).createCache(
+            new CacheConfiguration<>(DEFAULT_CACHE_NAME).setBackups(crossDC ? 
1 : 2));
+
+        for (int i = 0; i < 10_000; i++)
+            cache.put(i, i);
+
+        forceCheckpoint();
+
+        stopGrid(2);
+
+        for (int i = 10_000; i < 20_000; i++)
+            cache.put(i, i);
+
+        IgniteEx ignite2 = startGrid(2, DC1);
+
+        waitRebalanceFinished(ignite2);
+
+        assertTrue(commSPI(ignite2).rebalanceMsgCnt > 0);
+        assertTrue(commSPI(ignite2).historical);
+        assertEquals(crossDC, commSPI(ignite2).crossDcRebalance);
+    }
+
+    /** */
+    private RebalanceAwareCommSPI commSPI(Ignite ignite) {
+        return 
(RebalanceAwareCommSPI)(ignite.configuration().getCommunicationSpi());
+    }
+
+    /** */
+    private void waitRebalanceFinished(IgniteEx ignite) throws Exception {
+        assertTrue(GridTestUtils.waitForCondition(() -> {
+            IgniteInternalFuture<Boolean> fut = 
ignite.cachex(DEFAULT_CACHE_NAME).context().preloader().rebalanceFuture();
+
+            GridDhtPartitionDemander.RebalanceFuture rebFut = 
(GridDhtPartitionDemander.RebalanceFuture)fut;
+
+            return (!rebFut.isInitial() && 
rebFut.topologyVersion().topologyVersion() == 
ignite.cluster().topologyVersion());
+        }, 1000));
+
+        
assertTrue(ignite.cachex(DEFAULT_CACHE_NAME).context().preloader().rebalanceFuture().get());
+    }
+
+    /** */
+    private static class RebalanceAwareCommSPI extends TcpCommunicationSpi {
+        /** */
+        private int rebalanceMsgCnt;
+
+        /** */
+        private boolean historical;
+
+        /** */
+        private boolean crossDcRebalance;
+
+        /** {@inheritDoc} */
+        @Override public void sendMessage(
+            ClusterNode node,
+            Message msg,
+            IgniteInClosure<IgniteException> ackClosure
+        ) throws IgniteSpiException {
+            if (((GridIoMessage)msg).message() instanceof 
GridDhtPartitionDemandMessage) {
+                rebalanceMsgCnt++;
+
+                GridDhtPartitionDemandMessage demandMsg = 
(GridDhtPartitionDemandMessage)((GridIoMessage)msg).message();
+
+                historical |= demandMsg.partitions().hasHistorical();
+
+                if 
(!ignite.cluster().localNode().dataCenterId().equals(node.dataCenterId()))
+                    crossDcRebalance = true;
+            }
+
+            super.sendMessage(node, msg, ackClosure);
+        }
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite8.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite8.java
index 065fff5173e..7e238e85f0b 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite8.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite8.java
@@ -31,6 +31,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridC
 import 
org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingSyncCheckDataTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingSyncSelfTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingUnmarshallingFailedSelfTest;
+import 
org.apache.ignite.internal.processors.cache.distributed.rebalancing.MultiDcRebalancingTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.CleanupRestoredCachesSlowTest;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.DynamicSuite;
@@ -76,6 +77,8 @@ public class IgniteCacheTestSuite8 {
         GridTestUtils.addTestIfNeeded(suite, 
GridCacheRebalancingAsyncSelfTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, 
GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, 
GridCacheRebalancingCancelTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, MultiDcRebalancingTest.class, 
ignoredTests);
+
         GridTestUtils.addTestIfNeeded(suite, 
CacheStoreTxPutAllMultiNodeTest.class, ignoredTests);
 
         GridTestUtils.addTestIfNeeded(suite, 
CleanupRestoredCachesSlowTest.class, ignoredTests);
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
index e72444fff1c..c439dce3708 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
@@ -64,7 +64,6 @@ import org.jetbrains.annotations.Nullable;
 
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_CONSISTENT_ID_BY_HOST_WITHOUT_PORT;
 import static org.apache.ignite.IgniteSystemProperties.getBoolean;
-import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_CENTER_ID;
 import static 
org.apache.ignite.internal.managers.discovery.GridDiscoveryManager.DISCO_METRICS;
 
 /**
@@ -526,8 +525,6 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter 
implements IgniteDis
 
         ZookeeperClusterNode locNode = new ZookeeperClusterNode(
             cfg.getNodeId(),
-            //TODO remove usage of internal API when an alternative from 
public API is available
-            
(String)((IgniteEx)ignite).context().nodeAttributes().get(ATTR_DATA_CENTER_ID),
             addrs.get1(),
             addrs.get2(),
             locNodeVer,
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java
index d70b9d0aaa5..bd657912425 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java
@@ -40,7 +40,6 @@ import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider;
 import org.jetbrains.annotations.Nullable;
 
-import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_CENTER_ID;
 import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_NODE_CONSISTENT_ID;
 import static org.apache.ignite.internal.util.lang.ClusterNodeFunc.eqNodes;
 
@@ -72,9 +71,6 @@ public class ZookeeperClusterNode implements 
IgniteClusterNode, Externalizable,
     /** Node attributes. */
     private Map<String, Object> attrs;
 
-    /** Data Center ID. */
-    private String dcId;
-
     /** Internal discovery addresses as strings. */
     private Collection<String> addrs;
 
@@ -107,7 +103,6 @@ public class ZookeeperClusterNode implements 
IgniteClusterNode, Externalizable,
 
     /**
      * @param id Node ID.
-     * @param dcId Data Center ID.
      * @param addrs Node addresses.
      * @param hostNames Node host names.
      * @param ver Node version.
@@ -119,7 +114,6 @@ public class ZookeeperClusterNode implements 
IgniteClusterNode, Externalizable,
      */
     public ZookeeperClusterNode(
         UUID id,
-        String dcId,
         Collection<String> addrs,
         Collection<String> hostNames,
         IgniteProductVersion ver,
@@ -134,7 +128,6 @@ public class ZookeeperClusterNode implements 
IgniteClusterNode, Externalizable,
 
         this.id = id;
         this.ver = ver;
-        this.dcId = dcId;
         this.attrs = Collections.unmodifiableMap(attrs);
         this.addrs = addrs;
         this.hostNames = hostNames;
@@ -240,11 +233,6 @@ public class ZookeeperClusterNode implements 
IgniteClusterNode, Externalizable,
         return F.view(attrs, new SecurityCredentialsAttrFilterPredicate());
     }
 
-    /** {{@inheritDoc} */
-    @Override public @Nullable String dataCenterId() {
-        return dcId;
-    }
-
     /** {@inheritDoc} */
     @Override public Collection<String> addresses() {
         return addrs;
@@ -361,8 +349,6 @@ public class ZookeeperClusterNode implements 
IgniteClusterNode, Externalizable,
 
         if (mtr != null)
             metrics = ClusterMetricsSnapshot.deserialize(mtr, 0);
-
-        dcId = (String)attrs.get(ATTR_DATA_CENTER_ID);
     }
 
     /** {@inheritDoc} */
@@ -394,7 +380,7 @@ public class ZookeeperClusterNode implements 
IgniteClusterNode, Externalizable,
     /** {@inheritDoc} */
     @Override public String toString() {
         return "ZookeeperClusterNode [id=" + id +
-            ", dataCenterId=" + dcId +
+            ", dataCenterId=" + dataCenterId() +
             ", addrs=" + addrs +
             ", order=" + order +
             ", loc=" + loc +

Reply via email to