This is an automated email from the ASF dual-hosted git repository.
av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 86d4313f58a IGNITE-26582 Thick client optimizations for MultiDC
(#12504)
86d4313f58a is described below
commit 86d4313f58aa2a8d840876825e270f173eb755d8
Author: Anton Vinogradov <[email protected]>
AuthorDate: Mon Nov 17 21:45:44 2025 +0300
IGNITE-26582 Thick client optimizations for MultiDC (#12504)
---
.../ignite/spi/discovery/tcp/ClientImpl.java | 144 +++++++++++----------
.../ignite/spi/discovery/tcp/ServerImpl.java | 33 ++++-
.../tcp/messages/TcpDiscoveryAbstractMessage.java | 5 +-
.../tcp/messages/TcpDiscoveryHandshakeRequest.java | 14 ++
.../messages/TcpDiscoveryHandshakeResponse.java | 22 ++--
.../MultiDataCenterClientRoutingTest.java | 139 ++++++++++++++++++++
...a => MultiDataCenterTopologyValidatorTest.java} | 7 +-
.../IgniteSpiDiscoverySelfTestSuite.java | 4 +-
.../testsuites/IgniteSpiFailoverSelfTestSuite.java | 4 +-
9 files changed, 281 insertions(+), 91 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index caf8a530ffa..094d44ac1ca 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -76,7 +76,6 @@ import
org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
-import org.apache.ignite.internal.util.typedef.T3;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -559,7 +558,7 @@ class ClientImpl extends TcpDiscoveryImpl {
* @throws IgniteSpiException If failed.
* @see TcpDiscoverySpi#joinTimeout
*/
- @Nullable private T2<SocketStream, Boolean> joinTopology(
+ @Nullable private SocketStream joinTopology(
InetSocketAddress prevAddr,
long timeout,
@Nullable Runnable beforeEachSleep,
@@ -605,50 +604,13 @@ class ClientImpl extends TcpDiscoveryImpl {
Collection<InetSocketAddress> addrs0 = new ArrayList<>(addrs);
- boolean wait = false;
+ T2<Boolean, T2<SocketStream, Integer>> waitAndRes =
sendJoinRequests(prevAddr != null, addrs);
- for (int i = addrs.size() - 1; i >= 0; i--) {
- if (Thread.currentThread().isInterrupted())
- throw new InterruptedException();
+ boolean wait = waitAndRes.get1();
+ T2<SocketStream, Integer> res = waitAndRes.get2();
- InetSocketAddress addr = addrs.get(i);
-
- boolean recon = prevAddr != null;
-
- T3<SocketStream, Integer, Boolean> sockAndRes =
sendJoinRequest(recon, addr);
-
- if (sockAndRes == null) {
- addrs.remove(i);
-
- continue;
- }
-
- assert sockAndRes.get1() != null && sockAndRes.get2() != null
: sockAndRes;
-
- Socket sock = sockAndRes.get1().socket();
-
- if (log.isDebugEnabled())
- log.debug("Received response to join request [addr=" +
addr + ", res=" + sockAndRes.get2() + ']');
-
- switch (sockAndRes.get2()) {
- case RES_OK:
- return new T2<>(sockAndRes.get1(), sockAndRes.get3());
-
- case RES_CONTINUE_JOIN:
- case RES_WAIT:
- wait = true;
-
- U.closeQuiet(sock);
-
- break;
-
- default:
- if (log.isDebugEnabled())
- log.debug("Received unexpected response to join
request: " + sockAndRes.get2());
-
- U.closeQuiet(sock);
- }
- }
+ if (res != null)
+ return res.get1();
if (timeout > 0 && U.millisSinceNanos(startNanos) > timeout)
return null;
@@ -669,6 +631,50 @@ class ClientImpl extends TcpDiscoveryImpl {
}
}
+ /** */
+ private T2<Boolean, T2<SocketStream, Integer>> sendJoinRequests(
+ boolean recon,
+ Collection<InetSocketAddress> addrs
+ ) throws InterruptedException {
+ for (InetSocketAddress addr : addrs) {
+ if (Thread.currentThread().isInterrupted())
+ throw new InterruptedException();
+
+ T2<SocketStream, Integer> sockAndRes = sendJoinRequest(recon,
addr);
+
+ if (sockAndRes == null)
+ continue;
+
+ assert sockAndRes.get1() != null && sockAndRes.get2() != null :
sockAndRes;
+
+ Socket sock = sockAndRes.get1().socket();
+
+ if (log.isDebugEnabled())
+ log.debug("Received response to join request [addr=" + addr +
", res=" + sockAndRes.get2() + ']');
+
+ switch (sockAndRes.get2()) {
+ case RES_OK:
+ return new T2<>(false, sockAndRes);
+
+ case RES_CONTINUE_JOIN:
+ case RES_WAIT:
+ U.closeQuiet(sock);
+
+ return new T2<>(true, null);
+
+ default:
+ if (log.isDebugEnabled())
+ log.debug("Received unexpected response to join
request: " + sockAndRes.get2());
+
+ U.closeQuiet(sock);
+ }
+ }
+
+ addrs.clear();
+
+ return new T2<>(false, null);
+ }
+
/** */
private static void sleepEx(long millis, Runnable before, Runnable after)
throws InterruptedException {
if (before != null)
@@ -688,8 +694,8 @@ class ClientImpl extends TcpDiscoveryImpl {
* @param addr Address.
* @return Socket, connect response and client acknowledge support flag.
*/
- @Nullable private T3<SocketStream, Integer, Boolean>
sendJoinRequest(boolean recon,
- InetSocketAddress addr) {
+ @Nullable private T2<SocketStream, Integer> sendJoinRequest(boolean recon,
+ InetSocketAddress addr) throws InterruptedException {
assert addr != null;
if (log.isDebugEnabled())
@@ -724,16 +730,26 @@ class ClientImpl extends TcpDiscoveryImpl {
TcpDiscoveryIoSession ses = createSession(sock);
- openSock = true;
-
TcpDiscoveryHandshakeRequest req = new
TcpDiscoveryHandshakeRequest(locNodeId);
req.client(true);
+ req.dcId(locNode.dataCenterId());
spi.writeMessage(ses, req,
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
TcpDiscoveryHandshakeResponse res = spi.readMessage(ses,
ackTimeout0);
+ if (res.redirectAddresses() != null) {
+ U.closeQuiet(sock);
+
+ if (log.isInfoEnabled())
+ log.info("Reconnecting to the addresses of a proper DC
[addrs=" + res.redirectAddresses() + ']');
+
+ T2<Boolean, T2<SocketStream, Integer>> redirectedRes =
sendJoinRequests(recon, res.redirectAddresses());
+
+ return redirectedRes.get2();
+ }
+
UUID rmtNodeId = res.creatorNodeId();
assert rmtNodeId != null;
@@ -793,9 +809,7 @@ class ClientImpl extends TcpDiscoveryImpl {
log.debug("Message has been sent to address [msg=" + msg +
", addr=" + addr +
", rmtNodeId=" + rmtNodeId + ']');
- return new T3<>(new SocketStream(sock),
- spi.readReceipt(sock,
timeoutHelper.nextTimeoutChunk(ackTimeout0)),
- res.clientAck());
+ return new T2<>(new SocketStream(sock), spi.readReceipt(sock,
timeoutHelper.nextTimeoutChunk(ackTimeout0)));
}
catch (IOException | IgniteCheckedException e) {
U.closeQuiet(sock);
@@ -1266,9 +1280,6 @@ class ClientImpl extends TcpDiscoveryImpl {
/** */
private TcpDiscoveryIoSession ses;
- /** */
- private boolean clientAck;
-
/** */
private final Queue<TcpDiscoveryAbstractMessage> queue = new
ArrayDeque<>();
@@ -1327,16 +1338,13 @@ class ClientImpl extends TcpDiscoveryImpl {
/**
* @param sock Socket.
- * @param clientAck {@code True} is server supports client message
acknowlede.
*/
- private void setSocket(Socket sock, boolean clientAck) {
+ private void setSocket(Socket sock) {
synchronized (mux) {
this.sock = sock;
ses = createSession(sock);
- this.clientAck = clientAck;
-
unackedMsg = null;
mux.notifyAll();
@@ -1421,7 +1429,7 @@ class ClientImpl extends TcpDiscoveryImpl {
for (IgniteInClosure<TcpDiscoveryAbstractMessage> msgLsnr :
spi.sndMsgLsnrs)
msgLsnr.apply(msg);
- boolean ack = clientAck && !(msg instanceof
TcpDiscoveryPingResponse);
+ boolean ack = !(msg instanceof TcpDiscoveryPingResponse);
try {
if (ack) {
@@ -1529,9 +1537,6 @@ class ClientImpl extends TcpDiscoveryImpl {
/** */
private volatile SocketStream sockStream;
- /** */
- private boolean clientAck;
-
/** */
private final boolean join;
@@ -1576,7 +1581,7 @@ class ClientImpl extends TcpDiscoveryImpl {
try {
while (true) {
- T2<SocketStream, Boolean> joinRes = joinTopology(prevAddr,
timeout, null, null);
+ SocketStream joinRes = joinTopology(prevAddr, timeout,
null, null);
if (joinRes == null) {
if (join) {
@@ -1591,8 +1596,7 @@ class ClientImpl extends TcpDiscoveryImpl {
return;
}
- sockStream = joinRes.get1();
- clientAck = joinRes.get2();
+ sockStream = joinRes;
Socket sock = sockStream.socket();
TcpDiscoveryIoSession ses = createSession(sock);
@@ -2087,7 +2091,7 @@ class ClientImpl extends TcpDiscoveryImpl {
joinCnt++;
- T2<SocketStream, Boolean> joinRes;
+ SocketStream joinRes;
try {
joinRes = joinTopology(null, spi.joinTimeout,
@@ -2120,9 +2124,9 @@ class ClientImpl extends TcpDiscoveryImpl {
return;
}
- currSock = joinRes.get1();
+ currSock = joinRes;
- sockWriter.setSocket(joinRes.get1().socket(), joinRes.get2());
+ sockWriter.setSocket(joinRes.socket());
if (spi.joinTimeout > 0) {
final int joinCnt0 = joinCnt;
@@ -2132,7 +2136,7 @@ class ClientImpl extends TcpDiscoveryImpl {
}, spi.joinTimeout, MILLISECONDS);
}
- sockReader.setSocket(joinRes.get1(), locNode.clientRouterNodeId());
+ sockReader.setSocket(joinRes, locNode.clientRouterNodeId());
}
/**
@@ -2532,7 +2536,7 @@ class ClientImpl extends TcpDiscoveryImpl {
currSock = reconnector.sockStream;
- sockWriter.setSocket(currSock.socket(),
reconnector.clientAck);
+ sockWriter.setSocket(currSock.socket());
sockReader.setSocket(currSock,
locNode.clientRouterNodeId());
reconnector = null;
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 8b5bdadbebd..a4d6a8c503e 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -159,6 +159,8 @@ import org.apache.ignite.spi.tracing.SpanStatus;
import org.apache.ignite.thread.IgniteThreadPoolExecutor;
import org.jetbrains.annotations.Nullable;
+import static java.util.stream.Collectors.collectingAndThen;
+import static java.util.stream.Collectors.toList;
import static
org.apache.ignite.IgniteSystemProperties.IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2;
import static
org.apache.ignite.IgniteSystemProperties.IGNITE_DISCOVERY_CLIENT_RECONNECT_HISTORY_SIZE;
import static
org.apache.ignite.IgniteSystemProperties.IGNITE_NODE_IDS_HISTORY_SIZE;
@@ -6833,8 +6835,35 @@ class ServerImpl extends TcpDiscoveryImpl {
TcpDiscoveryHandshakeResponse res =
new TcpDiscoveryHandshakeResponse(locNodeId,
locNode.internalOrder());
- if (req.client())
- res.clientAck(true);
+ if (req.client()) {
+ if (req.dcId() != null && !Objects.equals(req.dcId(),
locNode.dataCenterId())) {
+ List<TcpDiscoveryNode> dcNodes =
ring.serverNodes().stream()
+ .filter(TcpDiscoveryNode::visible)
+ .filter(node -> node.dataCenterId() != null &&
node.dataCenterId().equals(req.dcId()))
+ .collect(
+ collectingAndThen(
+ toList(),
+ l -> {
+ Collections.shuffle(l);
+ return l;
+ }
+ ));
+
+ if (!dcNodes.isEmpty()) {
+ Collection<InetSocketAddress> addrs = new
ArrayList<>(dcNodes.size());
+
+ for (TcpDiscoveryNode dcNode : dcNodes) {
+ addrs.addAll(dcNode.socketAddresses());
+ }
+
+ res.redirectAddresses(addrs);
+
+ spi.writeMessage(ses, res,
spi.getEffectiveSocketTimeout(srvSock));
+
+ return;
+ }
+ }
+ }
else if (req.changeTopology()) {
// Node cannot connect to it's next (for local node
it's previous).
// Need to check connectivity to it.
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
index 505980bef3d..fbe4236d102 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
@@ -50,10 +50,7 @@ public abstract class TcpDiscoveryAbstractMessage implements
Serializable {
protected static final int CHANGE_TOPOLOGY_FLAG_POS = 3;
/** */
- protected static final int CLIENT_ACK_FLAG_POS = 4;
-
- /** */
- protected static final int FORCE_FAIL_FLAG_POS = 8;
+ protected static final int FORCE_FAIL_FLAG_POS = 4;
/** Sender of the message (transient). */
private transient UUID sndNodeId;
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeRequest.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeRequest.java
index 8a5981f814b..384caf1a46c 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeRequest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.spi.discovery.tcp.messages;
import java.util.UUID;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
/**
* Handshake request.
@@ -30,6 +31,9 @@ public class TcpDiscoveryHandshakeRequest extends
TcpDiscoveryAbstractMessage {
/** */
private UUID prevNodeId;
+ /** */
+ private String dcId;
+
/**
* Constructor.
*
@@ -69,6 +73,16 @@ public class TcpDiscoveryHandshakeRequest extends
TcpDiscoveryAbstractMessage {
this.prevNodeId = prevNodeId;
}
+ /** @return DataCenter id. */
+ @Nullable public String dcId() {
+ return dcId;
+ }
+
+ /** @param dcId DataCenter id. */
+ public void dcId(String dcId) {
+ this.dcId = dcId;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(TcpDiscoveryHandshakeRequest.class, this, "super",
super.toString(),
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeResponse.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeResponse.java
index 2f5b76249c8..8eb1b2b6510 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeResponse.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeResponse.java
@@ -17,8 +17,11 @@
package org.apache.ignite.spi.discovery.tcp.messages;
+import java.net.InetSocketAddress;
+import java.util.Collection;
import java.util.UUID;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
/**
* Handshake response.
@@ -30,6 +33,9 @@ public class TcpDiscoveryHandshakeResponse extends
TcpDiscoveryAbstractMessage {
/** */
private long order;
+ /** Redirect addresses. */
+ private Collection<InetSocketAddress> redirectAddresses;
+
/**
* Constructor.
*
@@ -80,18 +86,14 @@ public class TcpDiscoveryHandshakeResponse extends
TcpDiscoveryAbstractMessage {
this.order = order;
}
- /**
- * @return {@code True} if server supports client message acknowledge.
- */
- public boolean clientAck() {
- return getFlag(CLIENT_ACK_FLAG_POS);
+ /** @return Socket addresses list for redirect. */
+ @Nullable public Collection<InetSocketAddress> redirectAddresses() {
+ return redirectAddresses;
}
- /**
- * @param clientAck {@code True} if server supports client message
acknowledge.
- */
- public void clientAck(boolean clientAck) {
- setFlag(CLIENT_ACK_FLAG_POS, clientAck);
+ /** @param socketAddresses Socket addresses list for redirect. */
+ public void redirectAddresses(Collection<InetSocketAddress>
socketAddresses) {
+ this.redirectAddresses = socketAddresses;
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/datacenter/MultiDataCenterClientRoutingTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/datacenter/MultiDataCenterClientRoutingTest.java
new file mode 100644
index 00000000000..1a997ada7db
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/datacenter/MultiDataCenterClientRoutingTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.datacenter;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteNodeAttributes;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/** */
+public class MultiDataCenterClientRoutingTest extends GridCommonAbstractTest {
+ /** */
+ private static final String DC_ID_0 = "DC0";
+
+ /** */
+ private static final String DC_ID_1 = "DC1";
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+
+ System.clearProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
+ // Enforce different mac adresses to emulate distributed environment
by default.
+ cfg.setUserAttributes(Collections.singletonMap(
+ IgniteNodeAttributes.ATTR_MACS_OVERRIDE,
UUID.randomUUID().toString()));
+
+ return cfg;
+ }
+
+ /** */
+ @Test
+ public void testConnectionToProperDc() throws Exception {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ boolean bool = rnd.nextBoolean();
+
+ int nodes = 20;
+
+ for (int i = 0; i < nodes; i += 2) {
+ System.setProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID,
bool ? DC_ID_0 : DC_ID_1);
+
+ startGrid(i);
+
+ System.setProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID,
bool ? DC_ID_1 : DC_ID_0);
+
+ startGrid(i + 1);
+ }
+
+ awaitPartitionMapExchange();
+
+ System.setProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID, bool
? DC_ID_0 : DC_ID_1);
+
+ IgniteEx client = startClientGrid();
+
+ UUID routerId =
((TcpDiscoveryNode)client.localNode()).clientRouterNodeId();
+
+ List<ClusterNode> routers = client.cluster().nodes().stream()
+ .filter(node -> node.id().equals(routerId))
+ .collect(Collectors.toList());
+
+ assertTrue(routers.size() == 1);
+ assertEquals(bool ? DC_ID_0 : DC_ID_1, routers.get(0).dataCenterId());
+ }
+
+ /** */
+ @Test
+ public void testConnectionToAnyDcWhenNoOtherOption() throws Exception {
+ System.setProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID,
DC_ID_0);
+
+ startGrid(0);
+
+ System.setProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID,
DC_ID_1);
+
+ IgniteEx client = startClientGrid();
+
+ UUID routerId =
((TcpDiscoveryNode)client.localNode()).clientRouterNodeId();
+
+ List<ClusterNode> routers = client.cluster().nodes().stream()
+ .filter(node -> node.id().equals(routerId))
+ .collect(Collectors.toList());
+
+ assertTrue(routers.size() == 1);
+ assertEquals(DC_ID_0, routers.get(0).dataCenterId());
+ }
+
+ /** */
+ @Test
+ public void testConnectionToUnconfiguredDcWhenNoOtherOption() throws
Exception {
+ startGrid(0);
+
+ System.setProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID,
DC_ID_1);
+
+ IgniteEx client = startClientGrid();
+
+ UUID routerId =
((TcpDiscoveryNode)client.localNode()).clientRouterNodeId();
+
+ List<ClusterNode> routers = client.cluster().nodes().stream()
+ .filter(node -> node.id().equals(routerId))
+ .collect(Collectors.toList());
+
+ assertTrue(routers.size() == 1);
+ assertNull(routers.get(0).dataCenterId());
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/failover/topology/validator/MdcTopologyValidatorTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/failover/topology/validator/MultiDataCenterTopologyValidatorTest.java
similarity index 97%
rename from
modules/core/src/test/java/org/apache/ignite/spi/failover/topology/validator/MdcTopologyValidatorTest.java
rename to
modules/core/src/test/java/org/apache/ignite/spi/failover/topology/validator/MultiDataCenterTopologyValidatorTest.java
index fa30e32d404..7091268bb38 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/failover/topology/validator/MdcTopologyValidatorTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/failover/topology/validator/MultiDataCenterTopologyValidatorTest.java
@@ -24,6 +24,7 @@ import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
@@ -37,7 +38,7 @@ import org.apache.ignite.topology.MdcTopologyValidator;
import org.junit.Test;
/** */
-public class MdcTopologyValidatorTest extends GridCommonAbstractTest {
+public class MultiDataCenterTopologyValidatorTest extends
GridCommonAbstractTest {
/** */
private static final String DC_ID_0 = "DC0";
@@ -326,7 +327,9 @@ public class MdcTopologyValidatorTest extends
GridCommonAbstractTest {
srv0.cluster().state(ClusterState.ACTIVE);
- CacheConfiguration<Object, Object> cfgCache = new
CacheConfiguration<>("cache").setTopologyValidator(topValidator);
+ CacheConfiguration<Object, Object> cfgCache = new
CacheConfiguration<>("cache")
+ .setTopologyValidator(topValidator)
+ .setCacheMode(CacheMode.REPLICATED);
return srv0.createCache(cfgCache);
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
index a2f818b1111..05d934f4fbf 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
@@ -26,6 +26,7 @@ import
org.apache.ignite.spi.discovery.FilterDataForClientNodeDiscoveryTest;
import org.apache.ignite.spi.discovery.IgniteClientReconnectEventHandlingTest;
import org.apache.ignite.spi.discovery.IgniteDiscoveryCacheReuseSelfTest;
import org.apache.ignite.spi.discovery.LongClientConnectToClusterTest;
+import
org.apache.ignite.spi.discovery.datacenter.MultiDataCenterClientRoutingTest;
import
org.apache.ignite.spi.discovery.datacenter.MultiDataCenterDeploymentTest;
import org.apache.ignite.spi.discovery.tcp.DiscoveryClientSocketTest;
import org.apache.ignite.spi.discovery.tcp.DiscoveryUnmarshalVulnerabilityTest;
@@ -184,7 +185,8 @@ import static
org.apache.ignite.IgniteSystemProperties.IGNITE_OVERRIDE_MCAST_GRP
TcpDiscoveryDeadNodeAddressResolvingTest.class,
- MultiDataCenterDeploymentTest.class
+ MultiDataCenterDeploymentTest.class,
+ MultiDataCenterClientRoutingTest.class,
})
public class IgniteSpiDiscoverySelfTestSuite {
/** */
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiFailoverSelfTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiFailoverSelfTestSuite.java
index eb6268ed600..1fcf36195fb 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiFailoverSelfTestSuite.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiFailoverSelfTestSuite.java
@@ -26,7 +26,7 @@ import
org.apache.ignite.spi.failover.jobstealing.GridJobStealingFailoverSpiSelf
import
org.apache.ignite.spi.failover.jobstealing.GridJobStealingFailoverSpiStartStopSelfTest;
import org.apache.ignite.spi.failover.never.GridNeverFailoverSpiSelfTest;
import
org.apache.ignite.spi.failover.never.GridNeverFailoverSpiStartStopSelfTest;
-import
org.apache.ignite.spi.failover.topology.validator.MdcTopologyValidatorTest;
+import
org.apache.ignite.spi.failover.topology.validator.MultiDataCenterTopologyValidatorTest;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
@@ -50,7 +50,7 @@ import org.junit.runners.Suite;
GridJobStealingFailoverSpiConfigSelfTest.class,
// Topology validator.
- MdcTopologyValidatorTest.class,
+ MultiDataCenterTopologyValidatorTest.class,
})
public class IgniteSpiFailoverSelfTestSuite {
}