This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new a148e7b6441 IGNITE-26583 Rebalancing optimization for MultiDC - Fixes
#12473.
a148e7b6441 is described below
commit a148e7b6441993704f662ddcab4ab045c833c84e
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Sat Nov 1 16:57:58 2025 +0300
IGNITE-26583 Rebalancing optimization for MultiDC - Fixes #12473.
Signed-off-by: Aleksey Plekhanov <[email protected]>
---
.../jol/GridAffinityAssignmentJolBenchmark.java | 1 -
.../org/apache/ignite/cluster/ClusterNode.java | 3 +-
.../org/apache/ignite/internal/IgniteKernal.java | 7 +
.../managers/discovery/GridDiscoveryManager.java | 8 -
.../dht/preloader/GridDhtPreloader.java | 26 ++-
.../ignite/spi/discovery/tcp/TcpDiscoverySpi.java | 3 -
.../discovery/tcp/internal/TcpDiscoveryNode.java | 18 +-
.../affinity/GridAffinityAssignmentV2Test.java | 1 -
.../rebalancing/MultiDcRebalancingTest.java | 217 +++++++++++++++++++++
.../ignite/testsuites/IgniteCacheTestSuite8.java | 3 +
.../spi/discovery/zk/ZookeeperDiscoverySpi.java | 3 -
.../zk/internal/ZookeeperClusterNode.java | 16 +-
12 files changed, 257 insertions(+), 49 deletions(-)
diff --git
a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jol/GridAffinityAssignmentJolBenchmark.java
b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jol/GridAffinityAssignmentJolBenchmark.java
index d75f590be08..df474379073 100644
---
a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jol/GridAffinityAssignmentJolBenchmark.java
+++
b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jol/GridAffinityAssignmentJolBenchmark.java
@@ -328,7 +328,6 @@ public class GridAffinityAssignmentJolBenchmark {
private static ClusterNode node(int idx) {
TcpDiscoveryNode node = new TcpDiscoveryNode(
UUID.randomUUID(),
- null,
Collections.singletonList("127.0.0.1"),
Collections.singletonList("127.0.0.1"),
0,
diff --git
a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java
b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java
index ff02b9704a0..bc698c38889 100644
--- a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java
+++ b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java
@@ -23,6 +23,7 @@ import java.util.Map;
import java.util.UUID;
import org.apache.ignite.Ignite;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.lang.IgniteExperimental;
import org.apache.ignite.lang.IgniteProductVersion;
import org.jetbrains.annotations.Nullable;
@@ -183,7 +184,7 @@ public interface ClusterNode extends BaselineNode {
*/
@IgniteExperimental
@Nullable public default String dataCenterId() {
- return null;
+ return attribute(IgniteNodeAttributes.ATTR_DATA_CENTER_ID);
}
/**
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index a3e0ac10b61..1878ef042e8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -221,6 +221,7 @@ import org.jetbrains.annotations.Nullable;
import static java.util.Collections.singleton;
import static java.util.Optional.ofNullable;
import static
org.apache.ignite.IgniteSystemProperties.IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_DATA_CENTER_ID;
import static
org.apache.ignite.IgniteSystemProperties.IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID;
import static
org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK;
import static org.apache.ignite.IgniteSystemProperties.getBoolean;
@@ -236,6 +237,7 @@ import static
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_BUILD_DATE;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_BUILD_VER;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CLIENT_MODE;
import static
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CONSISTENCY_CHECK_SKIPPED;
+import static
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_CENTER_ID;
import static
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_STORAGE_CONFIG;
import static
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_STREAMER_POOL_SIZE;
import static
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE;
@@ -1606,6 +1608,11 @@ public class IgniteKernal implements IgniteEx,
Externalizable {
add(ATTR_TX_SERIALIZABLE_ENABLED,
cfg.getTransactionConfiguration().isTxSerializableEnabled());
add(ATTR_TX_AWARE_QUERIES_ENABLED,
cfg.getTransactionConfiguration().isTxAwareQueriesEnabled());
+ if (IgniteSystemProperties.getString(IGNITE_DATA_CENTER_ID) != null)
+ add(ATTR_DATA_CENTER_ID,
IgniteSystemProperties.getString(IGNITE_DATA_CENTER_ID));
+ else if (userAttrs != null && userAttrs.get(IGNITE_DATA_CENTER_ID) !=
null)
+ add(ATTR_DATA_CENTER_ID,
(Serializable)userAttrs.get(IGNITE_DATA_CENTER_ID));
+
// Stick in SPI versions and classes attributes.
addSpiAttributes(cfg.getCollisionSpi());
addSpiAttributes(cfg.getDiscoverySpi());
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 14e26e7af65..7334cfc16c9 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -150,7 +150,6 @@ import org.jetbrains.annotations.Nullable;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static
org.apache.ignite.IgniteSystemProperties.IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2;
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_DATA_CENTER_ID;
import static
org.apache.ignite.IgniteSystemProperties.IGNITE_DISCOVERY_HISTORY_SIZE;
import static
org.apache.ignite.IgniteSystemProperties.IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID;
import static
org.apache.ignite.IgniteSystemProperties.IGNITE_SECURITY_COMPATIBILITY_MODE;
@@ -166,7 +165,6 @@ import static
org.apache.ignite.events.EventType.EVT_NODE_METRICS_UPDATED;
import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED;
import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
-import static
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_CENTER_ID;
import static
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_REGIONS_OFFHEAP_SIZE;
import static
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE;
import static
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_LATE_AFFINITY_ASSIGNMENT;
@@ -486,12 +484,6 @@ public class GridDiscoveryManager extends
GridManagerAdapter<DiscoverySpi> {
ctx.addNodeAttribute(ATTR_OFFHEAP_SIZE, requiredOffheap());
ctx.addNodeAttribute(ATTR_DATA_REGIONS_OFFHEAP_SIZE,
configuredOffheap());
- // TODO When exposing to public interface, replace the retrieval in
IgniteClusterNode implementations.
- String dcId = IgniteSystemProperties.getString(IGNITE_DATA_CENTER_ID);
-
- if (dcId != null)
- ctx.addNodeAttribute(ATTR_DATA_CENTER_ID, dcId);
-
DiscoverySpi spi = getSpi();
discoOrdered = discoOrdered();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 606ba1352ae..b4126c955f9 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -19,7 +19,9 @@ package
org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
+import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -47,7 +49,9 @@ import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.SB;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgnitePredicate;
import org.jetbrains.annotations.Nullable;
import static
org.apache.ignite.IgniteSystemProperties.IGNITE_DISABLE_REBALANCING_CANCELLATION_OPTIMIZATION;
@@ -195,6 +199,10 @@ public class GridDhtPreloader extends
GridCachePreloaderAdapter {
CachePartitionFullCountersMap countersMap =
grp.topology().fullUpdateCounters();
+ String dcId = ctx.localNode().dataCenterId();
+ Collection<UUID> sameDcNodeIds = dcId == null ? null : new
HashSet<>(F.viewReadOnly(
+ ctx.discovery().aliveServerNodes(), ClusterNode::id, n ->
Objects.equals(n.dataCenterId(), dcId)));
+
for (int p = 0; p < partitions; p++) {
if (ctx.exchange().hasPendingServerExchange()) {
if (log.isDebugEnabled())
@@ -228,8 +236,12 @@ public class GridDhtPreloader extends
GridCachePreloaderAdapter {
if (grp.persistenceEnabled() && exchFut != null) {
List<UUID> nodeIds =
exchFut.partitionHistorySupplier(grp.groupId(), p, part.initialUpdateCounter());
- if (!F.isEmpty(nodeIds))
+ if (!F.isEmpty(nodeIds)) {
+ if (sameDcNodeIds != null)
+ nodeIds = retainNodesNotEmpty(nodeIds,
sameDcNodeIds::contains);
+
histSupplier = ctx.discovery().node(nodeIds.get(p %
nodeIds.size()));
+ }
}
if (histSupplier != null && !exchFut.isClearingPartition(grp,
p)) {
@@ -260,6 +272,9 @@ public class GridDhtPreloader extends
GridCachePreloaderAdapter {
});
if (!picked.isEmpty()) {
+ if (dcId != null)
+ picked = retainNodesNotEmpty(picked, n ->
Objects.equals(dcId, n.dataCenterId()));
+
ClusterNode n = picked.get(p % picked.size());
GridDhtPartitionDemandMessage msg = assignments.get(n);
@@ -303,6 +318,15 @@ public class GridDhtPreloader extends
GridCachePreloaderAdapter {
return assignments;
}
+ /**
+ * Retains nodes which satisfy filter. Returns original list if result set
is empty.
+ */
+ private <T> List<T> retainNodesNotEmpty(List<T> nodes, IgnitePredicate<T>
filter) {
+ List<T> nodes0 = U.arrayList(nodes, filter);
+
+ return !F.isEmpty(nodes0) ? nodes0 : nodes;
+ }
+
/** {@inheritDoc} */
@Override public void onReconnected() {
startFut = new GridFutureAdapter<>();
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index f3ee38abbdc..6df10d88a0f 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -114,7 +114,6 @@ import org.jetbrains.annotations.TestOnly;
import static
org.apache.ignite.IgniteSystemProperties.IGNITE_CONSISTENT_ID_BY_HOST_WITHOUT_PORT;
import static org.apache.ignite.IgniteSystemProperties.getBoolean;
import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
-import static
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_CENTER_ID;
import static
org.apache.ignite.internal.managers.discovery.GridDiscoveryManager.DISCO_METRICS;
/**
@@ -1169,8 +1168,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter
implements IgniteDiscovery
locNode = new TcpDiscoveryNode(
ignite.configuration().getNodeId(),
- //TODO remove usage of internal API when an alternative from
public API is available
-
(String)((IgniteEx)ignite).context().nodeAttributes().get(ATTR_DATA_CENTER_ID),
addrs.get1(),
addrs.get2(),
srvPort,
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
index 36727c5b66d..89f1f492e92 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
@@ -48,7 +48,6 @@ import
org.apache.ignite.spi.discovery.DiscoveryMetricsProvider;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.jetbrains.annotations.Nullable;
-import static
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_CENTER_ID;
import static
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_NODE_CONSISTENT_ID;
import static org.apache.ignite.internal.util.lang.ClusterNodeFunc.eqNodes;
@@ -74,9 +73,6 @@ public class TcpDiscoveryNode extends
GridMetadataAwareAdapter implements Ignite
@GridToStringExclude
private Map<String, Object> attrs;
- /** Data center ID of the node. */
- private String dcId;
-
/** Internal discovery addresses as strings. */
@GridToStringInclude
private Collection<String> addrs;
@@ -159,7 +155,6 @@ public class TcpDiscoveryNode extends
GridMetadataAwareAdapter implements Ignite
* Constructor.
*
* @param id Node Id.
- * @param dcId ID of a data center where this node is started ({@code
null} if there is only one data center).
* @param addrs Addresses.
* @param hostNames Host names.
* @param discPort Port.
@@ -168,7 +163,6 @@ public class TcpDiscoveryNode extends
GridMetadataAwareAdapter implements Ignite
* @param consistentId Node consistent ID.
*/
public TcpDiscoveryNode(UUID id,
- String dcId,
Collection<String> addrs,
Collection<String> hostNames,
int discPort,
@@ -181,7 +175,6 @@ public class TcpDiscoveryNode extends
GridMetadataAwareAdapter implements Ignite
assert ver != null;
this.id = id;
- this.dcId = dcId;
List<String> sortedAddrs = new ArrayList<>(addrs);
@@ -360,11 +353,6 @@ public class TcpDiscoveryNode extends
GridMetadataAwareAdapter implements Ignite
this.ver = ver;
}
- /** {@inheritDoc} */
- @Override public @Nullable String dataCenterId() {
- return dcId;
- }
-
/** {@inheritDoc} */
@Override public Collection<String> addresses() {
return addrs;
@@ -541,7 +529,7 @@ public class TcpDiscoveryNode extends
GridMetadataAwareAdapter implements Ignite
*/
public TcpDiscoveryNode clientReconnectNode(Map<String, Object> nodeAttrs)
{
TcpDiscoveryNode node = new TcpDiscoveryNode(
- id, dcId, addrs, hostNames, discPort, metricsProvider, ver, null
+ id, addrs, hostNames, discPort, metricsProvider, ver, null
);
node.attrs = Collections.unmodifiableMap(new HashMap<>(nodeAttrs));
@@ -627,8 +615,6 @@ public class TcpDiscoveryNode extends
GridMetadataAwareAdapter implements Ignite
consistentId = consistentIdAttr != null ? consistentIdAttr : id;
else
consistentId = consistentIdAttr != null ? consistentIdAttr :
U.consistentId(addrs, discPort);
-
- dcId = (String)attrs.get(ATTR_DATA_CENTER_ID);
}
/** {@inheritDoc} */
@@ -643,7 +629,7 @@ public class TcpDiscoveryNode extends
GridMetadataAwareAdapter implements Ignite
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(TcpDiscoveryNode.class, this, "isClient",
isClient());
+ return S.toString(TcpDiscoveryNode.class, this, "isClient",
isClient(), "dataCenterId", dataCenterId());
}
/**
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentV2Test.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentV2Test.java
index 7d3caae9f63..5d8e966851c 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentV2Test.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentV2Test.java
@@ -237,7 +237,6 @@ public class GridAffinityAssignmentV2Test {
protected TcpDiscoveryNode node(DiscoveryMetricsProvider metrics,
IgniteProductVersion v, String consistentId) {
TcpDiscoveryNode node = new TcpDiscoveryNode(
UUID.randomUUID(),
- null,
Collections.singletonList("127.0.0.1"),
Collections.singletonList("127.0.0.1"),
0,
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/MultiDcRebalancingTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/MultiDcRebalancingTest.java
new file mode 100644
index 00000000000..bb71087a7e7
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/MultiDcRebalancingTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemander;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static
org.apache.ignite.IgniteSystemProperties.IGNITE_PREFER_WAL_REBALANCE;
+
+/** */
+@WithSystemProperty(key = IGNITE_PREFER_WAL_REBALANCE, value = "true")
+public class MultiDcRebalancingTest extends GridCommonAbstractTest {
+ /** */
+ private static final String DC1 = "DC1";
+
+ /** */
+ private static final String DC2 = "DC2";
+
+ /** */
+ private boolean pds;
+
+ /** */
+ private IgniteEx startGrid(int idx, String dcId) throws Exception {
+ return startGrid(getConfiguration(getTestIgniteInstanceName(idx))
+ .setDataStorageConfiguration(new
DataStorageConfiguration().setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration().setPersistenceEnabled(pds)))
+ .setConsistentId(getTestIgniteInstanceName(idx))
+ .setCommunicationSpi(new RebalanceAwareCommSPI())
+
.setUserAttributes(F.asMap(IgniteSystemProperties.IGNITE_DATA_CENTER_ID,
dcId)));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ cleanPersistenceDir();
+ }
+
+ /** */
+ @Test
+ public void testFullRebalanceSameDC() throws Exception {
+ checkFullRebalance(false);
+ }
+
+ /** */
+ @Test
+ public void testFullRebalanceCrossDC() throws Exception {
+ checkFullRebalance(true);
+ }
+
+ /** */
+ @Test
+ public void testHistoricalRebalanceSameDC() throws Exception {
+ checkHistoricalRebalance(false);
+ }
+
+ /** */
+ @Test
+ public void testHistoricalRebalanceCrossDC() throws Exception {
+ checkHistoricalRebalance(true);
+ }
+
+ /** */
+ private void checkFullRebalance(boolean crossDC) throws Exception {
+ startGrid(0, DC1);
+ startGrid(1, crossDC ? DC1 : DC2);
+
+ grid(0).cluster().baselineAutoAdjustEnabled(false);
+
+ IgniteCache<Object, Object> cache = grid(0).createCache(
+ new CacheConfiguration<>(DEFAULT_CACHE_NAME).setBackups(1));
+
+ for (int i = 0; i < 10_000; i++)
+ cache.put(i, i);
+
+ IgniteEx ignite2 = startGrid(2, crossDC ? DC2 : DC1);
+ IgniteEx ignite3 = startGrid(3, DC2);
+
+ resetBaselineTopology();
+
+ waitRebalanceFinished(ignite2);
+ waitRebalanceFinished(ignite3);
+
+ assertTrue(commSPI(ignite2).rebalanceMsgCnt > 0);
+ assertTrue(commSPI(ignite3).rebalanceMsgCnt > 0);
+ assertFalse(commSPI(ignite2).historical);
+ assertFalse(commSPI(ignite3).historical);
+ assertEquals(crossDC, commSPI(ignite2).crossDcRebalance);
+ assertEquals(crossDC, commSPI(ignite3).crossDcRebalance);
+ }
+
+ /** */
+ private void checkHistoricalRebalance(boolean crossDC) throws Exception {
+ pds = true;
+
+ startGrid(0, DC1);
+ startGrid(1, DC2);
+ startGrid(2, DC1);
+
+ grid(0).cluster().state(ClusterState.ACTIVE);
+
+ IgniteCache<Object, Object> cache = grid(0).createCache(
+ new CacheConfiguration<>(DEFAULT_CACHE_NAME).setBackups(crossDC ?
1 : 2));
+
+ for (int i = 0; i < 10_000; i++)
+ cache.put(i, i);
+
+ forceCheckpoint();
+
+ stopGrid(2);
+
+ for (int i = 10_000; i < 20_000; i++)
+ cache.put(i, i);
+
+ IgniteEx ignite2 = startGrid(2, DC1);
+
+ waitRebalanceFinished(ignite2);
+
+ assertTrue(commSPI(ignite2).rebalanceMsgCnt > 0);
+ assertTrue(commSPI(ignite2).historical);
+ assertEquals(crossDC, commSPI(ignite2).crossDcRebalance);
+ }
+
+ /** */
+ private RebalanceAwareCommSPI commSPI(Ignite ignite) {
+ return
(RebalanceAwareCommSPI)(ignite.configuration().getCommunicationSpi());
+ }
+
+ /** */
+ private void waitRebalanceFinished(IgniteEx ignite) throws Exception {
+ assertTrue(GridTestUtils.waitForCondition(() -> {
+ IgniteInternalFuture<Boolean> fut =
ignite.cachex(DEFAULT_CACHE_NAME).context().preloader().rebalanceFuture();
+
+ GridDhtPartitionDemander.RebalanceFuture rebFut =
(GridDhtPartitionDemander.RebalanceFuture)fut;
+
+ return (!rebFut.isInitial() &&
rebFut.topologyVersion().topologyVersion() ==
ignite.cluster().topologyVersion());
+ }, 1000));
+
+
assertTrue(ignite.cachex(DEFAULT_CACHE_NAME).context().preloader().rebalanceFuture().get());
+ }
+
+ /** */
+ private static class RebalanceAwareCommSPI extends TcpCommunicationSpi {
+ /** */
+ private int rebalanceMsgCnt;
+
+ /** */
+ private boolean historical;
+
+ /** */
+ private boolean crossDcRebalance;
+
+ /** {@inheritDoc} */
+ @Override public void sendMessage(
+ ClusterNode node,
+ Message msg,
+ IgniteInClosure<IgniteException> ackClosure
+ ) throws IgniteSpiException {
+ if (((GridIoMessage)msg).message() instanceof
GridDhtPartitionDemandMessage) {
+ rebalanceMsgCnt++;
+
+ GridDhtPartitionDemandMessage demandMsg =
(GridDhtPartitionDemandMessage)((GridIoMessage)msg).message();
+
+ historical |= demandMsg.partitions().hasHistorical();
+
+ if
(!ignite.cluster().localNode().dataCenterId().equals(node.dataCenterId()))
+ crossDcRebalance = true;
+ }
+
+ super.sendMessage(node, msg, ackClosure);
+ }
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite8.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite8.java
index 065fff5173e..7e238e85f0b 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite8.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite8.java
@@ -31,6 +31,7 @@ import
org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridC
import
org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingSyncCheckDataTest;
import
org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingSyncSelfTest;
import
org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingUnmarshallingFailedSelfTest;
+import
org.apache.ignite.internal.processors.cache.distributed.rebalancing.MultiDcRebalancingTest;
import
org.apache.ignite.internal.processors.cache.persistence.CleanupRestoredCachesSlowTest;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.DynamicSuite;
@@ -76,6 +77,8 @@ public class IgniteCacheTestSuite8 {
GridTestUtils.addTestIfNeeded(suite,
GridCacheRebalancingAsyncSelfTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite,
GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite,
GridCacheRebalancingCancelTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, MultiDcRebalancingTest.class,
ignoredTests);
+
GridTestUtils.addTestIfNeeded(suite,
CacheStoreTxPutAllMultiNodeTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite,
CleanupRestoredCachesSlowTest.class, ignoredTests);
diff --git
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
index e72444fff1c..c439dce3708 100644
---
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
+++
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
@@ -64,7 +64,6 @@ import org.jetbrains.annotations.Nullable;
import static
org.apache.ignite.IgniteSystemProperties.IGNITE_CONSISTENT_ID_BY_HOST_WITHOUT_PORT;
import static org.apache.ignite.IgniteSystemProperties.getBoolean;
-import static
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_CENTER_ID;
import static
org.apache.ignite.internal.managers.discovery.GridDiscoveryManager.DISCO_METRICS;
/**
@@ -526,8 +525,6 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter
implements IgniteDis
ZookeeperClusterNode locNode = new ZookeeperClusterNode(
cfg.getNodeId(),
- //TODO remove usage of internal API when an alternative from
public API is available
-
(String)((IgniteEx)ignite).context().nodeAttributes().get(ATTR_DATA_CENTER_ID),
addrs.get1(),
addrs.get2(),
locNodeVer,
diff --git
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java
index d70b9d0aaa5..bd657912425 100644
---
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java
+++
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java
@@ -40,7 +40,6 @@ import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider;
import org.jetbrains.annotations.Nullable;
-import static
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_CENTER_ID;
import static
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_NODE_CONSISTENT_ID;
import static org.apache.ignite.internal.util.lang.ClusterNodeFunc.eqNodes;
@@ -72,9 +71,6 @@ public class ZookeeperClusterNode implements
IgniteClusterNode, Externalizable,
/** Node attributes. */
private Map<String, Object> attrs;
- /** Data Center ID. */
- private String dcId;
-
/** Internal discovery addresses as strings. */
private Collection<String> addrs;
@@ -107,7 +103,6 @@ public class ZookeeperClusterNode implements
IgniteClusterNode, Externalizable,
/**
* @param id Node ID.
- * @param dcId Data Center ID.
* @param addrs Node addresses.
* @param hostNames Node host names.
* @param ver Node version.
@@ -119,7 +114,6 @@ public class ZookeeperClusterNode implements
IgniteClusterNode, Externalizable,
*/
public ZookeeperClusterNode(
UUID id,
- String dcId,
Collection<String> addrs,
Collection<String> hostNames,
IgniteProductVersion ver,
@@ -134,7 +128,6 @@ public class ZookeeperClusterNode implements
IgniteClusterNode, Externalizable,
this.id = id;
this.ver = ver;
- this.dcId = dcId;
this.attrs = Collections.unmodifiableMap(attrs);
this.addrs = addrs;
this.hostNames = hostNames;
@@ -240,11 +233,6 @@ public class ZookeeperClusterNode implements
IgniteClusterNode, Externalizable,
return F.view(attrs, new SecurityCredentialsAttrFilterPredicate());
}
- /** {{@inheritDoc} */
- @Override public @Nullable String dataCenterId() {
- return dcId;
- }
-
/** {@inheritDoc} */
@Override public Collection<String> addresses() {
return addrs;
@@ -361,8 +349,6 @@ public class ZookeeperClusterNode implements
IgniteClusterNode, Externalizable,
if (mtr != null)
metrics = ClusterMetricsSnapshot.deserialize(mtr, 0);
-
- dcId = (String)attrs.get(ATTR_DATA_CENTER_ID);
}
/** {@inheritDoc} */
@@ -394,7 +380,7 @@ public class ZookeeperClusterNode implements
IgniteClusterNode, Externalizable,
/** {@inheritDoc} */
@Override public String toString() {
return "ZookeeperClusterNode [id=" + id +
- ", dataCenterId=" + dcId +
+ ", dataCenterId=" + dataCenterId() +
", addrs=" + addrs +
", order=" + order +
", loc=" + loc +