This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d0ee3dc736 IGNITE-26909 Thin client: Optimizations for MultiDC - 
Fixes #12533.
4d0ee3dc736 is described below

commit 4d0ee3dc736f0e4f63aec0342b6298f6db33af39
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Tue Dec 2 14:37:06 2025 +0300

    IGNITE-26909 Thin client: Optimizations for MultiDC - Fixes #12533.
    
    Signed-off-by: Aleksey Plekhanov <[email protected]>
---
 .../client/thin/ClientCacheAffinityContext.java    |  97 ++++++-
 .../client/thin/ClientCacheAffinityMapping.java    |  94 +++++--
 .../client/thin/ClientFieldsQueryPager.java        |  19 ++
 .../internal/client/thin/ClientOperation.java      |   3 +
 .../client/thin/ProtocolBitmaskFeature.java        |   5 +-
 .../internal/client/thin/ReliableChannel.java      |  45 +++-
 .../internal/client/thin/TcpClientCache.java       |   4 +-
 .../platform/client/ClientBitmaskFeature.java      |   5 +-
 .../platform/client/ClientMessageParser.java       |   7 +
 .../cache/ClientCachePartitionAwarenessGroup.java  |   2 +-
 .../client/cache/ClientCachePartitionMapping.java  |  58 +++--
 .../client/cache/ClientCachePartitionsRequest.java |  86 +++++--
 .../ClientClusterGetDataCenterNodesRequest.java    |  58 +++++
 .../ClientClusterGetDataCenterNodesResponse.java   |  59 +++++
 .../org/apache/ignite/client/ReliabilityTest.java  |   2 +-
 .../ThinClientAbstractPartitionAwarenessTest.java  |  16 ++
 .../ThinClientPartitionAwarenessBalancingTest.java |  12 +-
 .../ThinClientPartitionAwarenessMultiDcTest.java   | 279 +++++++++++++++++++++
 ...lientPartitionAwarenessResourceReleaseTest.java |   4 +-
 .../org/apache/ignite/client/ClientTestSuite.java  |   2 +
 20 files changed, 762 insertions(+), 95 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityContext.java
index de42b6532d3..9ac655460ef 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityContext.java
@@ -17,10 +17,13 @@
 
 package org.apache.ignite.internal.client.thin;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
@@ -32,6 +35,8 @@ import java.util.function.Predicate;
 import org.apache.ignite.IgniteBinary;
 import org.apache.ignite.client.ClientPartitionAwarenessMapper;
 import org.apache.ignite.client.ClientPartitionAwarenessMapperFactory;
+import org.apache.ignite.internal.binary.BinaryUtils;
+import org.apache.ignite.internal.binary.BinaryWriterEx;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -46,6 +51,20 @@ public class ClientCacheAffinityContext {
     /** If a factory needs to be removed. */
     private static final long REMOVED_TS = 0;
 
+    /** Affinity operations, allowed to be executed on backup nodes. */
+    private static final EnumSet<ClientOperation> OPS_ALLOWED_ON_BACKUPS = 
EnumSet.of(
+        ClientOperation.CACHE_GET,
+        ClientOperation.CACHE_CONTAINS_KEY,
+        ClientOperation.ATOMIC_LONG_VALUE_GET,
+        ClientOperation.OP_SET_VALUE_CONTAINS,
+        ClientOperation.OP_SET_VALUE_CONTAINS_ALL,
+        ClientOperation.OP_SET_ITERATOR_START,
+        ClientOperation.QUERY_SQL,
+        ClientOperation.QUERY_SQL_FIELDS,
+        ClientOperation.QUERY_SCAN,
+        ClientOperation.QUERY_INDEX
+    );
+
     /**
      * Factory for each cache id to produce key to partition mapping functions.
      * This factory is also used to resolve cacheName from cacheId. If a cache 
has default affinity mappings then
@@ -74,18 +93,24 @@ public class ClientCacheAffinityContext {
     /** Predicate to check whether a connection to the node with the specified 
ID is open. */
     private final Predicate<UUID> connectionEstablishedPredicate;
 
+    /** Data center ID. */
+    private final String dataCenterId;
+
     /**
      * @param binary Binary data processor.
      * @param factory Factory for caches with custom affinity.
+     * @param dataCenterId Data center ID.
      */
     public ClientCacheAffinityContext(
         IgniteBinary binary,
         @Nullable ClientPartitionAwarenessMapperFactory factory,
-        Predicate<UUID> connectionEstablishedPredicate
+        Predicate<UUID> connectionEstablishedPredicate,
+        String dataCenterId
     ) {
         this.paMapFactory = factory;
         this.binary = binary;
         this.connectionEstablishedPredicate = connectionEstablishedPredicate;
+        this.dataCenterId = dataCenterId;
     }
 
     /**
@@ -150,7 +175,7 @@ public class ClientCacheAffinityContext {
 
         // In case of IO error rq can hold previous mapping request. Just 
overwrite it, we don't need it anymore.
         rq = new CacheMappingRequest(cacheIds, lastAccessed);
-        ClientCacheAffinityMapping.writeRequest(ch, rq.caches, rq.ts > 0);
+        ClientCacheAffinityMapping.writeRequest(ch, rq.caches, rq.ts > 0, 
dataCenterId);
     }
 
     /**
@@ -221,6 +246,36 @@ public class ClientCacheAffinityContext {
         return true;
     }
 
+    /**
+     * @param ch Payload output channel.
+     */
+    public void writeDataCenterNodesRequest(PayloadOutputChannel ch) {
+        try (BinaryWriterEx w = BinaryUtils.writer(null, ch.out(), null)) {
+            w.writeString(dataCenterId);
+        }
+    }
+
+    /**
+     * @param ch Payload input channel.
+     */
+    public boolean readDataCenterNodesResponse(PayloadInputChannel ch) {
+        TopologyNodes top = lastTop.get();
+
+        if (top == null)
+            return false;
+
+        int cnt = ch.in().readInt();
+
+        List<UUID> dcNodes = new ArrayList<>(cnt);
+
+        for (int i = 0; i < cnt; i++)
+            dcNodes.add(new UUID(ch.in().readLong(), ch.in().readLong()));
+
+        top.dcNodes = dcNodes;
+
+        return true;
+    }
+
     /**
      * Gets last topology information.
      */
@@ -246,12 +301,15 @@ public class ClientCacheAffinityContext {
      *
      * @param cacheId Cache ID.
      * @param key Key.
+     * @param op Client operation.
      * @return Affinity node id or {@code null} if affinity node can't be 
determined for given cache and key.
      */
-    public UUID affinityNode(int cacheId, Object key) {
+    public UUID affinityNode(int cacheId, Object key, ClientOperation op) {
         ClientCacheAffinityMapping mapping = currentMapping();
 
-        return mapping == null ? null : mapping.affinityNode(binary, cacheId, 
key);
+        boolean primary = !OPS_ALLOWED_ON_BACKUPS.contains(op);
+
+        return mapping == null ? null : mapping.affinityNode(binary, cacheId, 
key, primary);
     }
 
     /**
@@ -259,12 +317,24 @@ public class ClientCacheAffinityContext {
      *
      * @param cacheId Cache ID.
      * @param part Partition.
+     * @param op Client operation.
      * @return Affinity node id or {@code null} if affinity node can't be 
determined for given cache and partition.
      */
-    public UUID affinityNode(int cacheId, int part) {
+    public UUID affinityNode(int cacheId, int part, ClientOperation op) {
         ClientCacheAffinityMapping mapping = currentMapping();
 
-        return mapping == null ? null : mapping.affinityNode(cacheId, part);
+        boolean primary = !OPS_ALLOWED_ON_BACKUPS.contains(op);
+
+        return mapping == null ? null : mapping.affinityNode(cacheId, part, 
primary);
+    }
+
+    /**
+     * @return List of nodes in current data center.
+     */
+    public List<UUID> dataCenterNodes() {
+        TopologyNodes top = lastTop.get();
+
+        return top == null ? null : top.dataCenterNodes();
     }
 
     /**
@@ -314,6 +384,11 @@ public class ClientCacheAffinityContext {
         }
     }
 
+    /** */
+    public String dataCenterId() {
+        return dataCenterId;
+    }
+
     /** */
     private boolean isTopologyOutdated(TopologyNodes top, 
AffinityTopologyVersion srvSideTopVer) {
         if (top == null)
@@ -340,6 +415,9 @@ public class ClientCacheAffinityContext {
         /** Nodes. */
         private final Collection<UUID> nodes = new ConcurrentLinkedQueue<>();
 
+        /** Current data center nodes. */
+        private volatile List<UUID> dcNodes;
+
         /**
          * @param topVer Topology version.
          * @param nodeId Node id.
@@ -357,6 +435,13 @@ public class ClientCacheAffinityContext {
             return Collections.unmodifiableCollection(nodes);
         }
 
+        /**
+         * Gets nodes of current data center.
+         */
+        public List<UUID> dataCenterNodes() {
+            return dcNodes;
+        }
+
         /**
          * @return Topology version.
          */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityMapping.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityMapping.java
index ed25c44055c..4a944e0c691 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityMapping.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityMapping.java
@@ -31,12 +31,14 @@ import 
org.apache.ignite.client.ClientFeatureNotSupportedByServerException;
 import org.apache.ignite.client.ClientPartitionAwarenessMapper;
 import org.apache.ignite.internal.binary.BinaryReaderEx;
 import org.apache.ignite.internal.binary.BinaryUtils;
+import org.apache.ignite.internal.binary.BinaryWriterEx;
 import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
 import static 
org.apache.ignite.internal.client.thin.ProtocolBitmaskFeature.ALL_AFFINITY_MAPPINGS;
+import static 
org.apache.ignite.internal.client.thin.ProtocolBitmaskFeature.DC_AWARE;
 
 /**
  * Affinity mapping (partition to nodes) for each cache.
@@ -44,16 +46,19 @@ import static 
org.apache.ignite.internal.client.thin.ProtocolBitmaskFeature.ALL_
 public class ClientCacheAffinityMapping {
     /** CacheAffinityInfo for caches with not applicable partition awareness. 
*/
     private static final CacheAffinityInfo NOT_APPLICABLE_CACHE_AFFINITY_INFO =
-        new CacheAffinityInfo(null, null, null);
+        new CacheAffinityInfo(null, null, null, null);
+
+    /** Empty part to node mapping. */
+    private static final UUID[] EMPTY_PART_MAPPING = new UUID[0];
 
     /** Topology version. */
     private final AffinityTopologyVersion topVer;
 
     /** Affinity information for each cache. */
-    private final Map<Integer, CacheAffinityInfo> cacheAffinity = new 
HashMap<>();
+    private final Map<Integer, CacheAffinityInfo> cacheAff = new HashMap<>();
 
     /** Unmodifiable collection of cache IDs. To preserve instance 
immutability. */
-    private final Collection<Integer> cacheIds = 
Collections.unmodifiableCollection(cacheAffinity.keySet());
+    private final Collection<Integer> cacheIds = 
Collections.unmodifiableCollection(cacheAff.keySet());
 
     /**
      * @param topVer Topology version.
@@ -82,10 +87,11 @@ public class ClientCacheAffinityMapping {
      * @param binary Binary data processor (needed to extract affinity field 
from the key).
      * @param cacheId Cache ID.
      * @param key Key.
+     * @param primary Force primary node.
      * @return Affinity node id or {@code null} if affinity node can't be 
determined for given cache and key.
      */
-    public UUID affinityNode(IgniteBinary binary, int cacheId, Object key) {
-        CacheAffinityInfo affInfo = cacheAffinity.get(cacheId);
+    public UUID affinityNode(IgniteBinary binary, int cacheId, Object key, 
boolean primary) {
+        CacheAffinityInfo affInfo = cacheAff.get(cacheId);
 
         if (affInfo == null || affInfo == NOT_APPLICABLE_CACHE_AFFINITY_INFO)
             return null;
@@ -105,7 +111,7 @@ public class ClientCacheAffinityMapping {
             }
         }
 
-        return affInfo.nodeForKey(binaryKey);
+        return affInfo.nodeForKey(binaryKey, primary);
     }
 
     /**
@@ -113,15 +119,16 @@ public class ClientCacheAffinityMapping {
      *
      * @param cacheId Cache ID.
      * @param part Partition.
+     * @param primary Force primary node.
      * @return Affinity node id or {@code null} if affinity node can't be 
determined for given cache and partition.
      */
-    public UUID affinityNode(int cacheId, int part) {
-        CacheAffinityInfo affInfo = cacheAffinity.get(cacheId);
+    public UUID affinityNode(int cacheId, int part, boolean primary) {
+        CacheAffinityInfo affInfo = cacheAff.get(cacheId);
 
         if (affInfo == null || affInfo == NOT_APPLICABLE_CACHE_AFFINITY_INFO)
             return null;
 
-        return affInfo.nodeForPartition(part);
+        return affInfo.nodeForPartition(part, primary);
     }
 
     /**
@@ -136,7 +143,7 @@ public class ClientCacheAffinityMapping {
             assert res.topVer.equals(mapping.topVer) : "Mappings must have 
identical topology versions [res.topVer=" +
                 res.topVer + ", mapping.topVer=" + mapping.topVer + ']';
 
-            res.cacheAffinity.putAll(mapping.cacheAffinity);
+            res.cacheAff.putAll(mapping.cacheAff);
         }
 
         return res;
@@ -148,8 +155,14 @@ public class ClientCacheAffinityMapping {
      * @param ch Output channel.
      * @param cacheIds Set of cache ids to request.
      * @param customMappingsRequired {@code true} if non-default affinity 
mappings required.
+     * @param dcId Data center ID.
      */
-    public static void writeRequest(PayloadOutputChannel ch, 
Collection<Integer> cacheIds, boolean customMappingsRequired) {
+    public static void writeRequest(
+        PayloadOutputChannel ch,
+        Collection<Integer> cacheIds,
+        boolean customMappingsRequired,
+        String dcId
+    ) {
         ProtocolContext ctx = ch.clientChannel().protocolCtx();
 
         if (customMappingsRequired && 
!ctx.isFeatureSupported(ALL_AFFINITY_MAPPINGS))
@@ -160,6 +173,12 @@ public class ClientCacheAffinityMapping {
         if (ctx.isFeatureSupported(ALL_AFFINITY_MAPPINGS))
             out.writeBoolean(customMappingsRequired);
 
+        if (ctx.isFeatureSupported(DC_AWARE)) {
+            try (BinaryWriterEx w = BinaryUtils.writer(null, out, null)) {
+                w.writeString(dcId);
+            }
+        }
+
         out.writeInt(cacheIds.size());
 
         for (int cacheId : cacheIds)
@@ -197,7 +216,15 @@ public class ClientCacheAffinityMapping {
                     for (int j = 0; j < cachesCnt; j++)
                         cacheKeyCfg.put(in.readInt(), 
readCacheKeyConfiguration(in));
 
-                    UUID[] partToNode = readNodePartitions(in);
+                    UUID[] primaryPartToNode = readNodePartitions(in);
+                    UUID[] dcPartToNode = primaryPartToNode;
+
+                    if 
(ch.clientChannel().protocolCtx().isFeatureSupported(DC_AWARE)) {
+                        dcPartToNode = readNodePartitions(in);
+
+                        if (dcPartToNode.length == 0)
+                            dcPartToNode = primaryPartToNode;
+                    }
 
                     boolean dfltMapping = true;
 
@@ -212,13 +239,15 @@ public class ClientCacheAffinityMapping {
                         if (factory == null)
                             continue;
 
-                        aff.cacheAffinity.put(keyCfg.getKey(),
-                            new CacheAffinityInfo(keyCfg.getValue(), 
partToNode, factory.apply(partToNode.length)));
+                        aff.cacheAff.put(keyCfg.getKey(),
+                            new CacheAffinityInfo(keyCfg.getValue(), 
primaryPartToNode, dcPartToNode,
+                                factory.apply(primaryPartToNode.length))
+                        );
                     }
                 }
                 else { // Partition awareness is not applicable for these 
caches.
                     for (int j = 0; j < cachesCnt; j++)
-                        aff.cacheAffinity.put(in.readInt(), 
NOT_APPLICABLE_CACHE_AFFINITY_INFO);
+                        aff.cacheAff.put(in.readInt(), 
NOT_APPLICABLE_CACHE_AFFINITY_INFO);
                 }
             }
 
@@ -249,6 +278,9 @@ public class ClientCacheAffinityMapping {
     private static UUID[] readNodePartitions(BinaryReaderEx in) {
         int nodesCnt = in.readInt();
 
+        if (nodesCnt == 0)
+            return EMPTY_PART_MAPPING;
+
         int maxPart = -1;
 
         UUID[] partToNode = new UUID[1024];
@@ -283,20 +315,30 @@ public class ClientCacheAffinityMapping {
         /** Key configuration. */
         private final Map<Integer, Integer> keyCfg;
 
-        /** Partition mapping. */
-        private final UUID[] partMapping;
+        /** Primary partition mapping. */
+        private final UUID[] primaryPartMapping;
+
+        /** Partition mapping to nodes located in current DC. */
+        private final UUID[] dcPartMapping;
 
         /** Mapper a cache key to a partition. */
         private final ClientPartitionAwarenessMapper keyMapper;
 
         /**
          * @param keyCfg Cache key configuration or {@code null} if partition 
awareness is not applicable for this cache.
-         * @param partMapping Partition to node mapping or {@code null} if 
partition awareness is not applicable for
-         * this cache.
+         * @param primaryPartMapping Primary partition to node mapping or 
{@code null} if partition awareness
+         * is not applicable for this cache.
+         * @param dcPartMapping Partition to node mapping located in current 
DC.
          */
-        private CacheAffinityInfo(Map<Integer, Integer> keyCfg, UUID[] 
partMapping, ClientPartitionAwarenessMapper keyMapper) {
+        private CacheAffinityInfo(
+            Map<Integer, Integer> keyCfg,
+            UUID[] primaryPartMapping,
+            UUID[] dcPartMapping,
+            ClientPartitionAwarenessMapper keyMapper
+        ) {
             this.keyCfg = keyCfg;
-            this.partMapping = partMapping;
+            this.primaryPartMapping = primaryPartMapping;
+            this.dcPartMapping = dcPartMapping;
             this.keyMapper = keyMapper;
         }
 
@@ -304,20 +346,24 @@ public class ClientCacheAffinityMapping {
          * Calculates node for given key.
          *
          * @param key Key.
+         * @param primary Force primary node.
          */
-        private UUID nodeForKey(Object key) {
+        private UUID nodeForKey(Object key, boolean primary) {
             if (keyMapper == null)
                 return null;
 
-            return nodeForPartition(keyMapper.partition(key));
+            return nodeForPartition(keyMapper.partition(key), primary);
         }
 
         /**
          * Calculates node for given partition.
          *
          * @param part Partition.
+         * @param primary Force primary node.
          */
-        private UUID nodeForPartition(int part) {
+        private UUID nodeForPartition(int part, boolean primary) {
+            UUID[] partMapping = primary ? primaryPartMapping : dcPartMapping;
+
             if (part < 0 || partMapping == null || part >= partMapping.length)
                 return null;
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientFieldsQueryPager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientFieldsQueryPager.java
index 157de864383..4116c851ab5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientFieldsQueryPager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientFieldsQueryPager.java
@@ -38,6 +38,25 @@ class ClientFieldsQueryPager extends 
GenericQueryPager<List<?>> implements Field
     /** Serializer/deserializer. */
     private final ClientUtils serDes;
 
+    /** Constructor. */
+    ClientFieldsQueryPager(
+        ReliableChannel ch,
+        @Nullable TcpClientTransaction tx,
+        ClientOperation qryOp,
+        ClientOperation pageQryOp,
+        Consumer<PayloadOutputChannel> qryWriter,
+        boolean keepBinary,
+        ClientBinaryMarshaller marsh,
+        int cacheId,
+        int partId
+    ) {
+        super(ch, tx, qryOp, pageQryOp, qryWriter, cacheId, partId);
+
+        this.keepBinary = keepBinary;
+
+        serDes = new ClientUtils(marsh);
+    }
+
     /** Constructor. */
     ClientFieldsQueryPager(
         ReliableChannel ch,
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
index 6f0f31e89f1..112dc47c712 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
@@ -207,6 +207,9 @@ public enum ClientOperation {
     /** Get nodes endpoints. */
     CLUSTER_GROUP_GET_NODE_ENDPOINTS(5102),
 
+    /** Get nodes of data center. */
+    CLUSTER_GET_DC_NODES(5103),
+
     /** Execute compute task. */
     COMPUTE_TASK_EXECUTE(6000),
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java
index fb2508523a7..9277fdfd9ce 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java
@@ -109,7 +109,10 @@ public enum ProtocolBitmaskFeature {
      *
      * @see GridCacheProcessor#dynamicStartCache(CacheConfiguration, String, 
NearCacheConfiguration, boolean, boolean, boolean)
      */
-    SQL_CACHE_CREATION(21);
+    SQL_CACHE_CREATION(21),
+
+    /** Data-center information. */
+    DC_AWARE(22);
 
     /** */
     private static final EnumSet<ProtocolBitmaskFeature> 
ALL_FEATURES_AS_ENUM_SET =
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
index e03eb948781..cacc7aa5ed0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.client.thin;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
@@ -41,6 +40,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.ignite.IgniteBinary;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.client.ClientAuthenticationException;
 import org.apache.ignite.client.ClientAuthorizationException;
 import org.apache.ignite.client.ClientConnectionException;
@@ -142,10 +142,16 @@ final class ReliableChannel implements AutoCloseable {
 
         partitionAwarenessEnabled = clientCfg.isPartitionAwarenessEnabled();
 
+        String dcId = 
IgniteSystemProperties.getString(IgniteSystemProperties.IGNITE_DATA_CENTER_ID);
+
+        if (dcId == null && !F.isEmpty(clientCfg.getUserAttributes()))
+            dcId = 
clientCfg.getUserAttributes().get(IgniteSystemProperties.IGNITE_DATA_CENTER_ID);
+
         affinityCtx = new ClientCacheAffinityContext(
             binary,
             clientCfg.getPartitionAwarenessMapperFactory(),
-            this::isConnectionEstablished
+            this::isConnectionEstablished,
+            dcId
         );
 
         discoveryCtx = new ClientDiscoveryContext(clientCfg);
@@ -191,7 +197,7 @@ final class ReliableChannel implements AutoCloseable {
         Consumer<PayloadOutputChannel> payloadWriter,
         Function<PayloadInputChannel, T> payloadReader
     ) throws ClientException, ClientError {
-        return service(op, payloadWriter, payloadReader, 
Collections.emptyList());
+        return service(op, payloadWriter, payloadReader, 
affinityCtx.dataCenterNodes());
     }
 
     /**
@@ -246,7 +252,14 @@ final class ReliableChannel implements AutoCloseable {
         List<ClientConnectionException> failures
     ) {
         try {
-            ClientChannel ch = applyOnDefaultChannel(Function.identity(), 
null, failures);
+            List<UUID> targetNodes = affinityCtx.dataCenterNodes();
+
+            ClientChannel ch = F.isEmpty(targetNodes)
+                ? applyOnDefaultChannel(Function.identity(), null, failures)
+                : applyOnNodeChannelWithFallback(
+                    
targetNodes.get(ThreadLocalRandom.current().nextInt(targetNodes.size())),
+                    Function.identity(),
+                null);
 
             applyOnClientChannelAsync(fut, ch, op, payloadWriter, 
payloadReader, failures);
         }
@@ -401,7 +414,7 @@ final class ReliableChannel implements AutoCloseable {
         Function<PayloadInputChannel, T> payloadReader
     ) throws ClientException, ClientError {
         if (partitionAwarenessEnabled && affinityInfoIsUpToDate(cacheId)) {
-            UUID affNodeId = affinityCtx.affinityNode(cacheId, key);
+            UUID affNodeId = affinityCtx.affinityNode(cacheId, key, op);
 
             if (affNodeId != null) {
                 return applyOnNodeChannelWithFallback(affNodeId, channel ->
@@ -423,7 +436,7 @@ final class ReliableChannel implements AutoCloseable {
         Function<PayloadInputChannel, T> payloadReader
     ) throws ClientException, ClientError {
         if (partitionAwarenessEnabled && affinityInfoIsUpToDate(cacheId)) {
-            UUID affNodeId = affinityCtx.affinityNode(cacheId, part);
+            UUID affNodeId = affinityCtx.affinityNode(cacheId, part, op);
 
             if (affNodeId != null) {
                 return applyOnNodeChannelWithFallback(affNodeId, channel ->
@@ -445,7 +458,7 @@ final class ReliableChannel implements AutoCloseable {
         Function<PayloadInputChannel, T> payloadReader
     ) throws ClientException, ClientError {
         if (partitionAwarenessEnabled && affinityInfoIsUpToDate(cacheId)) {
-            UUID affNodeId = affinityCtx.affinityNode(cacheId, key);
+            UUID affNodeId = affinityCtx.affinityNode(cacheId, key, op);
 
             if (affNodeId != null) {
                 CompletableFuture<T> fut = new CompletableFuture<>();
@@ -505,10 +518,20 @@ final class ReliableChannel implements AutoCloseable {
                         if (lastTop != affinityCtx.lastTopology())
                             return false;
 
-                        Boolean result = applyOnNodeChannel(nodeId, channel ->
-                            channel.service(ClientOperation.CACHE_PARTITIONS,
-                                affinityCtx::writePartitionsUpdateRequest,
-                                affinityCtx::readPartitionsUpdateResponse),
+                        Boolean result = applyOnNodeChannel(nodeId, channel -> 
{
+                                boolean updated = 
channel.service(ClientOperation.CACHE_PARTITIONS,
+                                    affinityCtx::writePartitionsUpdateRequest,
+                                    affinityCtx::readPartitionsUpdateResponse);
+
+                                if (updated && affinityCtx.dataCenterId() != 
null
+                                    && 
channel.protocolCtx().isFeatureSupported(ProtocolBitmaskFeature.DC_AWARE)) {
+                                    
channel.service(ClientOperation.CLUSTER_GET_DC_NODES,
+                                        
affinityCtx::writeDataCenterNodesRequest,
+                                        
affinityCtx::readDataCenterNodesResponse);
+                                }
+
+                                return updated;
+                            },
                             failures
                         );
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
index b24dbb2eefd..36eca1ca954 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
@@ -1387,7 +1387,9 @@ public class TcpClientCache<K, V> implements 
ClientCache<K, V> {
             ClientOperation.QUERY_SQL_CURSOR_GET_PAGE,
             qryWriter,
             keepBinary,
-            marsh
+            marsh,
+            cacheId,
+            qry.getPartitions().length >= 1 ? qry.getPartitions()[0] : -1
         ));
     }
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java
index f7115d41af3..3aebc1c58bc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java
@@ -106,7 +106,10 @@ public enum ClientBitmaskFeature implements 
ThinProtocolFeature {
      * Set this flag required when creating caches during dump restoration and 
similar processes.
      * @see GridCacheProcessor#dynamicStartCache(CacheConfiguration, String, 
NearCacheConfiguration, boolean, boolean, boolean)
      */
-    SQL_CACHE_CREATION(21);
+    SQL_CACHE_CREATION(21),
+
+    /** Data-center information. */
+    DC_AWARE(22);
 
     /** */
     private static final EnumSet<ClientBitmaskFeature> 
ALL_FEATURES_AS_ENUM_SET =
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
index 1cba38e7d16..d0f54ef2136 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
@@ -77,6 +77,7 @@ import 
org.apache.ignite.internal.processors.platform.client.cache.ClientCacheSc
 import 
org.apache.ignite.internal.processors.platform.client.cache.ClientCacheSqlFieldsQueryRequest;
 import 
org.apache.ignite.internal.processors.platform.client.cache.ClientCacheSqlQueryRequest;
 import 
org.apache.ignite.internal.processors.platform.client.cluster.ClientClusterChangeStateRequest;
+import 
org.apache.ignite.internal.processors.platform.client.cluster.ClientClusterGetDataCenterNodesRequest;
 import 
org.apache.ignite.internal.processors.platform.client.cluster.ClientClusterGetStateRequest;
 import 
org.apache.ignite.internal.processors.platform.client.cluster.ClientClusterGroupGetNodeIdsRequest;
 import 
org.apache.ignite.internal.processors.platform.client.cluster.ClientClusterGroupGetNodesDetailsRequest;
@@ -313,6 +314,9 @@ public class ClientMessageParser implements 
ClientListenerMessageParser {
     /** */
     private static final short OP_CLUSTER_GROUP_GET_NODE_ENDPOINTS = 5102;
 
+    /** */
+    private static final short OP_CLUSTER_GET_DATA_CENTER_NODES = 5103;
+
     /* Compute operations. */
     /** */
     private static final short OP_COMPUTE_TASK_EXECUTE = 6000;
@@ -641,6 +645,9 @@ public class ClientMessageParser implements 
ClientListenerMessageParser {
             case OP_CLUSTER_GROUP_GET_NODE_ENDPOINTS:
                 return new ClientClusterGroupGetNodesEndpointsRequest(reader);
 
+            case OP_CLUSTER_GET_DATA_CENTER_NODES:
+                return new ClientClusterGetDataCenterNodesRequest(reader);
+
             case OP_COMPUTE_TASK_EXECUTE:
                 return new ClientExecuteTaskRequest(reader);
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionAwarenessGroup.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionAwarenessGroup.java
index 333cc8b3f2b..3a430affdc5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionAwarenessGroup.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionAwarenessGroup.java
@@ -74,7 +74,7 @@ class ClientCachePartitionAwarenessGroup {
                 writeCacheKeyConfiguration(writer, proc, 
entry.getValue().getKeyConfiguration());
             }
 
-            mapping.write(writer);
+            mapping.write(cpctx, writer);
 
             if 
(cpctx.isFeatureSupported(ClientBitmaskFeature.ALL_AFFINITY_MAPPINGS))
                 writer.writeBoolean(dfltAffinity);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionMapping.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionMapping.java
index 8a9c641812b..66978ed1767 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionMapping.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionMapping.java
@@ -17,45 +17,60 @@
 
 package org.apache.ignite.internal.processors.platform.client.cache;
 
-import java.util.HashMap;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 import org.apache.ignite.binary.BinaryRawWriter;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
+import 
org.apache.ignite.internal.processors.platform.client.ClientBitmaskFeature;
+import 
org.apache.ignite.internal.processors.platform.client.ClientProtocolContext;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Cache partition mapping.
  */
 public class ClientCachePartitionMapping {
-    /** Partitions map for caches. */
-    private final HashMap<UUID, Set<Integer>> partitionMap;
+    /** Primary partitions map for caches. */
+    private final Map<UUID, Set<Integer>> primaryPartitionMap;
+
+    /** Partitions map to nodes located in current data center for caches. */
+    @Nullable private final Map<UUID, Set<Integer>> dcPartitionMap;
 
     /**
-     * @param assignment Affinity assignment.
+     * @param primaryPartitionMap Primary partition mapping.
+     * @param dcPartitionMap Partition mapping to nodes located in current 
data center.
      */
-    public ClientCachePartitionMapping(AffinityAssignment assignment) {
-        Set<ClusterNode> nodes = assignment.primaryPartitionNodes();
-
-        partitionMap = new HashMap<>(nodes.size());
-
-        for (ClusterNode node : nodes) {
-            UUID nodeId = node.id();
-            Set<Integer> parts = assignment.primaryPartitions(nodeId);
-
-            partitionMap.put(nodeId, parts);
-        }
+    @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
+    public ClientCachePartitionMapping(
+        Map<UUID, Set<Integer>> primaryPartitionMap,
+        @Nullable Map<UUID, Set<Integer>> dcPartitionMap
+    ) {
+        this.primaryPartitionMap = primaryPartitionMap;
+        this.dcPartitionMap = dcPartitionMap;
     }
 
     /**
      * Write mapping using binary writer.
      * @param writer Writer.
      */
-    public void write(BinaryRawWriter writer) {
+    public void write(ClientProtocolContext ctx, BinaryRawWriter writer) {
+        writePartitionMap(writer, primaryPartitionMap);
+
+        if (ctx.isFeatureSupported(ClientBitmaskFeature.DC_AWARE))
+            writePartitionMap(writer, dcPartitionMap);
+    }
+
+    /** */
+    private static void writePartitionMap(BinaryRawWriter writer, @Nullable 
Map<UUID, Set<Integer>> partitionMap) {
+        if (partitionMap == null) {
+            writer.writeInt(0);
+
+            return;
+        }
+
         writer.writeInt(partitionMap.size());
 
-        for (HashMap.Entry<UUID, Set<Integer>> nodeParts: 
partitionMap.entrySet()) {
+        for (Map.Entry<UUID, Set<Integer>> nodeParts: partitionMap.entrySet()) 
{
             UUID nodeUuid = nodeParts.getKey();
             Set<Integer> parts = nodeParts.getValue();
 
@@ -77,11 +92,12 @@ public class ClientCachePartitionMapping {
 
         ClientCachePartitionMapping mapping = (ClientCachePartitionMapping)o;
 
-        return Objects.equals(partitionMap, mapping.partitionMap);
+        return Objects.equals(primaryPartitionMap, mapping.primaryPartitionMap)
+            && Objects.equals(dcPartitionMap, mapping.dcPartitionMap);
     }
 
     /** {@inheritDoc} */
     @Override public int hashCode() {
-        return Objects.hash(partitionMap);
+        return Objects.hash(primaryPartitionMap, dcPartitionMap);
     }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionsRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionsRequest.java
index 0fbf71b3277..152d904f8b1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionsRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionsRequest.java
@@ -20,14 +20,18 @@ package 
org.apache.ignite.internal.processors.platform.client.cache;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.UUID;
 import java.util.stream.Collectors;
 import org.apache.ignite.binary.BinaryRawReader;
 import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.cache.CacheType;
@@ -59,6 +63,9 @@ public class ClientCachePartitionsRequest extends 
ClientRequest {
      */
     private final boolean withCustomMappings;
 
+    /** Data center ID. */
+    private final String dcId;
+
     /**
      * Initializes a new instance of ClientRawRequest class.
      * @param reader Reader.
@@ -71,6 +78,11 @@ public class ClientCachePartitionsRequest extends 
ClientRequest {
         else
             withCustomMappings = false;
 
+        if (protocolCtx.isFeatureSupported(ClientBitmaskFeature.DC_AWARE))
+            dcId = reader.readString();
+        else
+            dcId = null;
+
         int len = reader.readInt();
 
         cacheIds = new int[len];
@@ -131,28 +143,21 @@ public class ClientCachePartitionsRequest extends 
ClientRequest {
     /**
      * Process cache and create new partition mapping, if it does not belong 
to any existent.
      * @param ctx Connection context.
-     * @param affinityVer Affinity topology version.
+     * @param affVer Affinity topology version.
      * @param cacheDesc Cache descriptor.
      * @param withCustomMappings {@code true} to verify a non-default affinity 
function also.
      * @return Null if cache was processed and new client cache partition 
awareness group if it does not belong to any
      * existent.
      */
-    private static ClientCachePartitionAwarenessGroup processCache(
+    private ClientCachePartitionAwarenessGroup processCache(
         ClientConnectionContext ctx,
-        ClientAffinityTopologyVersion affinityVer,
+        ClientAffinityTopologyVersion affVer,
         DynamicCacheDescriptor cacheDesc,
         boolean withCustomMappings
     ) {
-        AffinityAssignment assignment = getCacheAssignment(ctx, affinityVer, 
cacheDesc.cacheId());
-
-        // If assignment is not available for the cache for required affinity 
version, ignore the cache.
-        if (assignment == null)
-            return null;
-
-        ClientCachePartitionMapping mapping = null;
-
-        if (isApplicable(cacheDesc.cacheConfiguration(), withCustomMappings))
-            mapping = new ClientCachePartitionMapping(assignment);
+        ClientCachePartitionMapping mapping = 
isApplicable(cacheDesc.cacheConfiguration(), withCustomMappings)
+            ? getCachePartitionMapping(ctx, affVer, cacheDesc.cacheId(), dcId)
+            : null;
 
         return new ClientCachePartitionAwarenessGroup(mapping,
             !withCustomMappings || 
isDefaultMapping(cacheDesc.cacheConfiguration()));
@@ -161,15 +166,58 @@ public class ClientCachePartitionsRequest extends 
ClientRequest {
     /**
      * Get assignment for a cache in a safe way.
      * @param ctx Client connection context.
-     * @param affinityVer Affinity version.
+     * @param affVer Affinity version.
      * @param cacheId Cache ID.
-     * @return Affinity assignment for a cache, or null if is not possible to 
get.
+     * @param dcId Data center ID.
+     * @return Partition mapping for a cache, or null if is not possible to 
get.
      */
-    @Nullable private static AffinityAssignment 
getCacheAssignment(ClientConnectionContext ctx,
-        ClientAffinityTopologyVersion affinityVer, int cacheId) {
+    @Nullable private static ClientCachePartitionMapping 
getCachePartitionMapping(
+        ClientConnectionContext ctx,
+        ClientAffinityTopologyVersion affVer,
+        int cacheId,
+        String dcId
+    ) {
         try {
-            GridCacheContext cacheCtx = 
ctx.kernalContext().cache().context().cacheContext(cacheId);
-            return cacheCtx.affinity().assignment(affinityVer.getVersion());
+            GridCacheContext<?, ?> cacheCtx = 
ctx.kernalContext().cache().context().cacheContext(cacheId);
+
+            AffinityAssignment assignment = 
cacheCtx.affinity().assignment(affVer.getVersion());
+
+            Set<ClusterNode> nodes = assignment.primaryPartitionNodes();
+
+            Map<UUID, Set<Integer>> primaryPartitionMap = new 
HashMap<>(nodes.size());
+
+            for (ClusterNode node : nodes) {
+                UUID nodeId = node.id();
+                Set<Integer> parts = assignment.primaryPartitions(nodeId);
+
+                primaryPartitionMap.put(nodeId, parts);
+            }
+
+            Map<UUID, Set<Integer>> dcPartitionMap = null;
+
+            if (dcId != null && cacheCtx.config().isReadFromBackup()
+                && cacheCtx.config().getWriteSynchronizationMode() != 
CacheWriteSynchronizationMode.PRIMARY_SYNC) {
+                // Filter partitions, located in current DC.
+                dcPartitionMap = new HashMap<>();
+
+                List<List<ClusterNode>> partAssignments = 
assignment.assignment();
+
+                for (int p = 0; p < partAssignments.size(); p++) {
+                    List<ClusterNode> partAssignment = partAssignments.get(p);
+
+                    ClusterNode node = F.find(partAssignment, null, n -> 
dcId.equals(n.dataCenterId()));
+
+                    if (node != null)
+                        dcPartitionMap.computeIfAbsent(node.id(), id -> new 
HashSet<>()).add(p);
+                    else {
+                        dcPartitionMap = null;
+
+                        break;
+                    }
+                }
+            }
+
+            return new ClientCachePartitionMapping(primaryPartitionMap, 
dcPartitionMap);
         }
         catch (Exception e) {
             return null;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cluster/ClientClusterGetDataCenterNodesRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cluster/ClientClusterGetDataCenterNodesRequest.java
new file mode 100644
index 00000000000..95ab92e41fb
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cluster/ClientClusterGetDataCenterNodesRequest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.client.cluster;
+
+import java.util.Collection;
+import java.util.UUID;
+import org.apache.ignite.binary.BinaryRawReader;
+import org.apache.ignite.cluster.ClusterNode;
+import 
org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
+import org.apache.ignite.internal.processors.platform.client.ClientRequest;
+import org.apache.ignite.internal.processors.platform.client.ClientResponse;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ * Cluster get data center nodes request .
+ */
+public class ClientClusterGetDataCenterNodesRequest extends ClientRequest {
+    /** */
+    private final String dcId;
+
+    /**
+     * Constructor.
+     *
+     * @param reader Reader.
+     */
+    public ClientClusterGetDataCenterNodesRequest(BinaryRawReader reader) {
+        super(reader);
+
+        dcId = reader.readString();
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClientResponse process(ClientConnectionContext ctx) {
+        if (dcId == null)
+            throw new IllegalArgumentException("Data center ID cannot be 
null");
+
+        Collection<ClusterNode> srvNodes = 
ctx.kernalContext().discovery().aliveServerNodes();
+
+        Collection<UUID> nodeIds = F.viewReadOnly(srvNodes, ClusterNode::id, n 
-> dcId.equals(n.dataCenterId()));
+
+        return new ClientClusterGetDataCenterNodesResponse(requestId(), 
nodeIds);
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cluster/ClientClusterGetDataCenterNodesResponse.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cluster/ClientClusterGetDataCenterNodesResponse.java
new file mode 100644
index 00000000000..ac89a1f276d
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cluster/ClientClusterGetDataCenterNodesResponse.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.client.cluster;
+
+import java.util.Collection;
+import java.util.UUID;
+import org.apache.ignite.internal.binary.BinaryWriterEx;
+import 
org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
+import org.apache.ignite.internal.processors.platform.client.ClientResponse;
+
+/**
+ * Cluster group get data center nodes response.
+ */
+public class ClientClusterGetDataCenterNodesResponse extends ClientResponse {
+    /** Node ids. */
+    private final Collection<UUID> nodeIds;
+
+    /**
+     * Constructor.
+     *
+     * @param reqId Request id.
+     * @param nodeIds Node ids.
+     */
+    public ClientClusterGetDataCenterNodesResponse(long reqId, 
Collection<UUID> nodeIds) {
+        super(reqId);
+        this.nodeIds = nodeIds;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void encode(ClientConnectionContext ctx, BinaryWriterEx 
writer) {
+        super.encode(ctx, writer);
+
+        int cnt = 0;
+        int pos = writer.reserveInt();
+
+        for (UUID nodeId: nodeIds) {
+            cnt++;
+            writer.writeLong(nodeId.getMostSignificantBits());
+            writer.writeLong(nodeId.getLeastSignificantBits());
+        }
+
+        writer.writeInt(pos, cnt);
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java 
b/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
index 86e00d30137..67206d56b0b 100644
--- a/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
@@ -356,7 +356,7 @@ public class ReliabilityTest extends AbstractThinClientTest 
{
 
         String nullOpsNames = 
nullOps.stream().map(Enum::name).collect(Collectors.joining(", "));
 
-        long expectedNullCnt = 23;
+        long expectedNullCnt = 24;
 
         String msg = nullOps.size()
                 + " operation codes do not have public equivalent. When adding 
new codes, update ClientOperationType too. Missing ops: "
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java
index 8430fc5ff81..6bc4c3dec45 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.client.thin;
 import java.net.InetSocketAddress;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Queue;
 import java.util.UUID;
@@ -88,6 +89,9 @@ public abstract class 
ThinClientAbstractPartitionAwarenessTest extends GridCommo
     /** Channels. */
     protected final TestTcpClientChannel[] channels = new 
TestTcpClientChannel[MAX_CLUSTER_SIZE];
 
+    /** Channels list. */
+    protected final List<TestTcpClientChannel> channelsList = 
Collections.unmodifiableList(F.asList(channels));
+
     /** Operations queue. */
     protected final Queue<T2<TestTcpClientChannel, ClientOperation>> opsQueue 
= new ConcurrentLinkedQueue<>();
 
@@ -183,6 +187,18 @@ public abstract class 
ThinClientAbstractPartitionAwarenessTest extends GridCommo
         }
     }
 
+    /**
+     * Next operation channel index.
+     */
+    protected int nextOpChannelIdx() {
+        T2<TestTcpClientChannel, ClientOperation> nextOp = opsQueue.poll();
+
+        if (nextOp == null)
+            return -1;
+
+        return channelsList.indexOf(nextOp.get1());
+    }
+
     /**
      * Calculates affinity channel for cache and key.
      */
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessBalancingTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessBalancingTest.java
index f89dffe8c3f..903ea7dddd3 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessBalancingTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessBalancingTest.java
@@ -18,9 +18,6 @@
 package org.apache.ignite.internal.client.thin;
 
 import java.util.BitSet;
-import java.util.List;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
 import org.junit.Test;
 
 /**
@@ -39,12 +36,13 @@ public class ThinClientPartitionAwarenessBalancingTest 
extends ThinClientAbstrac
         for (int i = 0; i < 100; i++)
             client.cacheNames(); // Non-affinity requests should be randomly 
distributed among connections.
 
-        List<TestTcpClientChannel> channelList = F.asList(channels);
+        while (true) {
+            int channelIdx = nextOpChannelIdx();
 
-        while (!opsQueue.isEmpty()) {
-            T2<TestTcpClientChannel, ClientOperation> op = opsQueue.poll();
+            if (channelIdx < 0)
+                break;
 
-            usedConnections.set(channelList.indexOf(op.get1()));
+            usedConnections.set(channelIdx);
         }
 
         assertEquals(BitSet.valueOf(new byte[] {7}), usedConnections); // 7 = 
set of {0, 1, 2}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessMultiDcTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessMultiDcTest.java
new file mode 100644
index 00000000000..e35025ee6e2
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessMultiDcTest.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import 
org.apache.ignite.cache.affinity.rendezvous.ClusterNodeAttributeAffinityBackupFilter;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.client.ClientCache;
+import org.apache.ignite.client.ClientTransaction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.configuration.ClientConnectorConfiguration;
+import org.apache.ignite.configuration.ThinClientConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.typedef.F;
+import org.junit.Test;
+
+/**
+ * Test partition awareness of thin client in multi data-center environment.
+ */
+public class ThinClientPartitionAwarenessMultiDcTest extends 
ThinClientAbstractPartitionAwarenessTest {
+    /** */
+    private static final String DC1 = "dc1";
+
+    /** */
+    private static final String DC2 = "dc2";
+
+    /** */
+    private static final String DC3 = "dc3";
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrid(0, DC1);
+        startGrid(1, DC1);
+
+        startGrid(2, DC2);
+        startGrid(3, DC2);
+    }
+
+    /** */
+    private IgniteEx startGrid(int idx, String dcId) throws Exception {
+        return startGrid(getConfiguration(getTestIgniteInstanceName(idx))
+            .setClientConnectorConfiguration(new ClientConnectorConfiguration()
+                .setThinClientConfiguration(new ThinClientConfiguration()
+                    .setMaxActiveComputeTasksPerConnection(1)))
+            
.setUserAttributes(F.asMap(IgniteSystemProperties.IGNITE_DATA_CENTER_ID, 
dcId)));
+    }
+
+    /** */
+    private void initClient(
+        ClientConfiguration clientCfg,
+        String dcId,
+        int... chIdxs
+    ) throws IgniteInterruptedCheckedException {
+        
initClient(clientCfg.setUserAttributes(F.asMap(IgniteSystemProperties.IGNITE_DATA_CENTER_ID,
 dcId)), chIdxs);
+    }
+
+    /** */
+    @Test
+    public void testPartitionAwarenessRequests() throws Exception {
+        List<IgniteEx> curDcNodes = F.asList(grid(0), grid(1));
+        List<IgniteEx> anotherDcNodes = F.asList(grid(2), grid(3));
+
+        initClient(getClientConfiguration(0), DC1, 0, 1, 2, 3);
+
+        checkDcAwarePaRequests("test0", 
CacheWriteSynchronizationMode.PRIMARY_SYNC, true,
+            curDcNodes, anotherDcNodes, false);
+
+        checkDcAwarePaRequests("test1", 
CacheWriteSynchronizationMode.FULL_SYNC, false,
+            curDcNodes, anotherDcNodes, false);
+
+        checkDcAwarePaRequests("test2", 
CacheWriteSynchronizationMode.FULL_SYNC, true,
+            curDcNodes, anotherDcNodes, true);
+
+        checkDcAwarePaRequests("test3", 
CacheWriteSynchronizationMode.FULL_ASYNC, true,
+            curDcNodes, anotherDcNodes, true);
+    }
+
+    /** */
+    @Test
+    public void testPartitionAwarenessRequestsNoNodesInDc() throws Exception {
+        initClient(getClientConfiguration(0), DC3, 0, 1, 2, 3);
+
+        checkDcAwarePaRequests("test4", 
CacheWriteSynchronizationMode.FULL_SYNC, true,
+            Collections.emptyList(), F.asList(grid(0), grid(1), grid(2), 
grid(3)), false);
+    }
+
+    /** */
+    private void checkDcAwarePaRequests(
+        String cacheName,
+        CacheWriteSynchronizationMode mode,
+        boolean readFromBackup,
+        List<IgniteEx> curDcNodes,
+        List<IgniteEx> otherDcNodes,
+        boolean expReqToBackup
+    ) throws Exception {
+        grid(0).createCache(new CacheConfiguration<>(cacheName)
+            .setBackups(1)
+            .setWriteSynchronizationMode(mode)
+            .setReadFromBackup(readFromBackup)
+            .setAffinity(new 
RendezvousAffinityFunction().setAffinityBackupFilter(
+                new 
ClusterNodeAttributeAffinityBackupFilter(IgniteSystemProperties.IGNITE_DATA_CENTER_ID)))
+        );
+
+        ClientCache<Object, Object> cache = client.cache(cacheName);
+
+        // Init partitions request.
+        cache.put(0, 0);
+
+        // Ignore first put and partitions request.
+        opsQueue.clear();
+
+        Set<Integer> curDcNodeIdxs = new HashSet<>();
+
+        for (IgniteEx primaryNode : curDcNodes) {
+            curDcNodeIdxs.add(getTestIgniteInstanceIndex(primaryNode.name()));
+
+            List<Integer> keys = primaryKeys(primaryNode.cache(cacheName), 10);
+
+            for (Integer key : keys) {
+                int primaryNodeIdx = 
getTestIgniteInstanceIndex(primaryNode.name());
+
+                // If primary in current DC, read and write requests always 
sent to primary node.
+                cache.put(key, 0);
+
+                assertOpOnChannel(channels[primaryNodeIdx], 
ClientOperation.CACHE_PUT);
+
+                cache.putAsync(key, 0).get();
+
+                assertOpOnChannel(channels[primaryNodeIdx], 
ClientOperation.CACHE_PUT);
+
+                cache.get(key);
+
+                assertOpOnChannel(channels[primaryNodeIdx], 
ClientOperation.CACHE_GET);
+
+                cache.getAsync(key).get();
+
+                assertOpOnChannel(channels[primaryNodeIdx], 
ClientOperation.CACHE_GET);
+            }
+        }
+
+        for (IgniteEx primaryNode : otherDcNodes) {
+            List<Integer> keys = primaryKeys(primaryNode.cache(cacheName), 10);
+
+            for (Integer key : keys) {
+                int primaryNodeIdx = 
getTestIgniteInstanceIndex(primaryNode.name());
+
+                // If primary in another DC, write requests always sent to 
primary node.
+                cache.put(key, 0);
+
+                assertOpOnChannel(channels[primaryNodeIdx], 
ClientOperation.CACHE_PUT);
+
+                cache.putAsync(key, 0).get();
+
+                assertOpOnChannel(channels[primaryNodeIdx], 
ClientOperation.CACHE_PUT);
+
+                int backupNodeIdx = getTestIgniteInstanceIndex(backupNode(key, 
cacheName).name());
+
+                assertTrue(curDcNodeIdxs.isEmpty() || 
curDcNodeIdxs.contains(backupNodeIdx));
+
+                // But read requests can be sent to current DC backup node.
+                cache.get(key);
+
+                assertOpOnChannel(channels[expReqToBackup ? backupNodeIdx : 
primaryNodeIdx], ClientOperation.CACHE_GET);
+
+                cache.getAsync(key).get();
+
+                assertOpOnChannel(channels[expReqToBackup ? backupNodeIdx : 
primaryNodeIdx], ClientOperation.CACHE_GET);
+
+                cache.containsKey(key);
+
+                assertOpOnChannel(channels[expReqToBackup ? backupNodeIdx : 
primaryNodeIdx], ClientOperation.CACHE_CONTAINS_KEY);
+
+                int partIdx = 
primaryNode.context().affinity().partition(cacheName, key);
+
+                cache.query(new ScanQuery<>().setPartition(partIdx)).getAll();
+
+                assertOpOnChannel(channels[expReqToBackup ? backupNodeIdx : 
primaryNodeIdx], ClientOperation.QUERY_SCAN);
+            }
+        }
+    }
+
+    /** */
+    @Test
+    public void testNonPartitionAwarenessRequests() throws Exception {
+        String cacheName = "txCache0";
+
+        grid(0).createCache(new 
CacheConfiguration<>(cacheName).setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL));
+
+        checkDcAwareNonPaRequests(cacheName, DC1, 0, 1);
+        checkDcAwareNonPaRequests(cacheName, DC2, 2, 3);
+    }
+
+    /** */
+    @Test
+    public void testNonPartitionAwarenessRequestsNoNodesInDc() throws 
Exception {
+        String cacheName = "txCache1";
+
+        grid(0).createCache(new 
CacheConfiguration<>(cacheName).setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL));
+
+        checkDcAwareNonPaRequests(cacheName, DC3, 0, 1, 2, 3);
+    }
+
+    /** */
+    private void checkDcAwareNonPaRequests(String cacheName, String dcId, 
int... expNodeIdxs) throws Exception {
+        initClient(getClientConfiguration(0), dcId, 0, 1, 2, 3);
+
+        ClientCache<Object, Object> cache = client.cache(cacheName);
+
+        cache.put(0, 0);
+
+        opsQueue.clear();
+
+        for (int i = 0; i < 10; i++) {
+            cache.query(new ScanQuery<>()).getAll();
+
+            assertExpectedChannel(expNodeIdxs);
+
+            client.cluster().state();
+
+            assertExpectedChannel(expNodeIdxs);
+
+            client.compute().executeAsync2(TestTask.class.getName(), 
null).get();
+
+            assertExpectedChannel(expNodeIdxs);
+
+            try (ClientTransaction tx = client.transactions().txStart()) {
+                cache.put(ThreadLocalRandom.current().nextInt(10), 0);
+                cache.get(ThreadLocalRandom.current().nextInt(10));
+
+                tx.commit();
+            }
+
+            while (true) {
+                int channelIdx = nextOpChannelIdx();
+
+                if (channelIdx < 0)
+                    break;
+
+                assertTrue(F.contains(expNodeIdxs, channelIdx));
+            }
+        }
+    }
+
+    /** */
+    private void assertExpectedChannel(int... expChannelIdxs) {
+        int channelIdx = nextOpChannelIdx();
+
+        assertTrue(F.contains(expChannelIdxs, channelIdx));
+
+        assertTrue("Ops queue not empty: " + opsQueue, F.isEmpty(opsQueue));
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java
index 6de86591c82..6ddc0833435 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java
@@ -101,7 +101,7 @@ public class 
ThinClientPartitionAwarenessResourceReleaseTest extends ThinClientA
         awaitPartitionMapExchange();
 
         // Cache destroyed, but mappings still exist on the client side.
-        assertEquals(opCh.serverNodeId(), affCtx.affinityNode(cacheId, 
Integer.valueOf(0)));
+        assertEquals(opCh.serverNodeId(), affCtx.affinityNode(cacheId, 
Integer.valueOf(0), ClientOperation.CACHE_PUT));
 
         client.cache(PART_CACHE_NAME).put(1, 1);
 
@@ -116,7 +116,7 @@ public class 
ThinClientPartitionAwarenessResourceReleaseTest extends ThinClientA
         }, 5_000L));
 
         // Mapping for previous caches become outdated and will be updated on 
the next request.
-        assertNull(affCtx.currentMapping().affinityNode(cacheId, 0));
+        assertNull(affCtx.currentMapping().affinityNode(cacheId, 0, true));
 
         // Trigger the next affinity mappings update. The outdated cache with 
custom affinity was added
         // to pending caches list and will be processed and cleared.
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java 
b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
index ccaa6cd15f1..6d6ff42c17e 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
@@ -44,6 +44,7 @@ import 
org.apache.ignite.internal.client.thin.ThinClientEnpointsDiscoveryTest;
 import 
org.apache.ignite.internal.client.thin.ThinClientNonTransactionalOperationsInTxTest;
 import 
org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessBalancingTest;
 import 
org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessDiscoveryTest;
+import 
org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessMultiDcTest;
 import 
org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessResourceReleaseTest;
 import 
org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessStableTopologyTest;
 import 
org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessUnstableTopologyTest;
@@ -85,6 +86,7 @@ import org.junit.runners.Suite;
     ThinClientPartitionAwarenessResourceReleaseTest.class,
     ThinClientPartitionAwarenessDiscoveryTest.class,
     ThinClientPartitionAwarenessBalancingTest.class,
+    ThinClientPartitionAwarenessMultiDcTest.class,
     ThinClientNonTransactionalOperationsInTxTest.class,
     ReliableChannelTest.class,
     CacheAsyncTest.class,

Reply via email to