This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 4d0ee3dc736 IGNITE-26909 Thin client: Optimizations for MultiDC -
Fixes #12533.
4d0ee3dc736 is described below
commit 4d0ee3dc736f0e4f63aec0342b6298f6db33af39
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Tue Dec 2 14:37:06 2025 +0300
IGNITE-26909 Thin client: Optimizations for MultiDC - Fixes #12533.
Signed-off-by: Aleksey Plekhanov <[email protected]>
---
.../client/thin/ClientCacheAffinityContext.java | 97 ++++++-
.../client/thin/ClientCacheAffinityMapping.java | 94 +++++--
.../client/thin/ClientFieldsQueryPager.java | 19 ++
.../internal/client/thin/ClientOperation.java | 3 +
.../client/thin/ProtocolBitmaskFeature.java | 5 +-
.../internal/client/thin/ReliableChannel.java | 45 +++-
.../internal/client/thin/TcpClientCache.java | 4 +-
.../platform/client/ClientBitmaskFeature.java | 5 +-
.../platform/client/ClientMessageParser.java | 7 +
.../cache/ClientCachePartitionAwarenessGroup.java | 2 +-
.../client/cache/ClientCachePartitionMapping.java | 58 +++--
.../client/cache/ClientCachePartitionsRequest.java | 86 +++++--
.../ClientClusterGetDataCenterNodesRequest.java | 58 +++++
.../ClientClusterGetDataCenterNodesResponse.java | 59 +++++
.../org/apache/ignite/client/ReliabilityTest.java | 2 +-
.../ThinClientAbstractPartitionAwarenessTest.java | 16 ++
.../ThinClientPartitionAwarenessBalancingTest.java | 12 +-
.../ThinClientPartitionAwarenessMultiDcTest.java | 279 +++++++++++++++++++++
...lientPartitionAwarenessResourceReleaseTest.java | 4 +-
.../org/apache/ignite/client/ClientTestSuite.java | 2 +
20 files changed, 762 insertions(+), 95 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityContext.java
index de42b6532d3..9ac655460ef 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityContext.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityContext.java
@@ -17,10 +17,13 @@
package org.apache.ignite.internal.client.thin;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
@@ -32,6 +35,8 @@ import java.util.function.Predicate;
import org.apache.ignite.IgniteBinary;
import org.apache.ignite.client.ClientPartitionAwarenessMapper;
import org.apache.ignite.client.ClientPartitionAwarenessMapperFactory;
+import org.apache.ignite.internal.binary.BinaryUtils;
+import org.apache.ignite.internal.binary.BinaryWriterEx;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -46,6 +51,20 @@ public class ClientCacheAffinityContext {
/** If a factory needs to be removed. */
private static final long REMOVED_TS = 0;
+ /** Affinity operations, allowed to be executed on backup nodes. */
+ private static final EnumSet<ClientOperation> OPS_ALLOWED_ON_BACKUPS =
EnumSet.of(
+ ClientOperation.CACHE_GET,
+ ClientOperation.CACHE_CONTAINS_KEY,
+ ClientOperation.ATOMIC_LONG_VALUE_GET,
+ ClientOperation.OP_SET_VALUE_CONTAINS,
+ ClientOperation.OP_SET_VALUE_CONTAINS_ALL,
+ ClientOperation.OP_SET_ITERATOR_START,
+ ClientOperation.QUERY_SQL,
+ ClientOperation.QUERY_SQL_FIELDS,
+ ClientOperation.QUERY_SCAN,
+ ClientOperation.QUERY_INDEX
+ );
+
/**
* Factory for each cache id to produce key to partition mapping functions.
* This factory is also used to resolve cacheName from cacheId. If a cache
has default affinity mappings then
@@ -74,18 +93,24 @@ public class ClientCacheAffinityContext {
/** Predicate to check whether a connection to the node with the specified
ID is open. */
private final Predicate<UUID> connectionEstablishedPredicate;
+ /** Data center ID. */
+ private final String dataCenterId;
+
/**
* @param binary Binary data processor.
* @param factory Factory for caches with custom affinity.
+ * @param dataCenterId Data center ID.
*/
public ClientCacheAffinityContext(
IgniteBinary binary,
@Nullable ClientPartitionAwarenessMapperFactory factory,
- Predicate<UUID> connectionEstablishedPredicate
+ Predicate<UUID> connectionEstablishedPredicate,
+ String dataCenterId
) {
this.paMapFactory = factory;
this.binary = binary;
this.connectionEstablishedPredicate = connectionEstablishedPredicate;
+ this.dataCenterId = dataCenterId;
}
/**
@@ -150,7 +175,7 @@ public class ClientCacheAffinityContext {
// In case of IO error rq can hold previous mapping request. Just
overwrite it, we don't need it anymore.
rq = new CacheMappingRequest(cacheIds, lastAccessed);
- ClientCacheAffinityMapping.writeRequest(ch, rq.caches, rq.ts > 0);
+ ClientCacheAffinityMapping.writeRequest(ch, rq.caches, rq.ts > 0,
dataCenterId);
}
/**
@@ -221,6 +246,36 @@ public class ClientCacheAffinityContext {
return true;
}
+ /**
+ * @param ch Payload output channel.
+ */
+ public void writeDataCenterNodesRequest(PayloadOutputChannel ch) {
+ try (BinaryWriterEx w = BinaryUtils.writer(null, ch.out(), null)) {
+ w.writeString(dataCenterId);
+ }
+ }
+
+ /**
+ * @param ch Payload input channel.
+ */
+ public boolean readDataCenterNodesResponse(PayloadInputChannel ch) {
+ TopologyNodes top = lastTop.get();
+
+ if (top == null)
+ return false;
+
+ int cnt = ch.in().readInt();
+
+ List<UUID> dcNodes = new ArrayList<>(cnt);
+
+ for (int i = 0; i < cnt; i++)
+ dcNodes.add(new UUID(ch.in().readLong(), ch.in().readLong()));
+
+ top.dcNodes = dcNodes;
+
+ return true;
+ }
+
/**
* Gets last topology information.
*/
@@ -246,12 +301,15 @@ public class ClientCacheAffinityContext {
*
* @param cacheId Cache ID.
* @param key Key.
+ * @param op Client operation.
* @return Affinity node id or {@code null} if affinity node can't be
determined for given cache and key.
*/
- public UUID affinityNode(int cacheId, Object key) {
+ public UUID affinityNode(int cacheId, Object key, ClientOperation op) {
ClientCacheAffinityMapping mapping = currentMapping();
- return mapping == null ? null : mapping.affinityNode(binary, cacheId,
key);
+ boolean primary = !OPS_ALLOWED_ON_BACKUPS.contains(op);
+
+ return mapping == null ? null : mapping.affinityNode(binary, cacheId,
key, primary);
}
/**
@@ -259,12 +317,24 @@ public class ClientCacheAffinityContext {
*
* @param cacheId Cache ID.
* @param part Partition.
+ * @param op Client operation.
* @return Affinity node id or {@code null} if affinity node can't be
determined for given cache and partition.
*/
- public UUID affinityNode(int cacheId, int part) {
+ public UUID affinityNode(int cacheId, int part, ClientOperation op) {
ClientCacheAffinityMapping mapping = currentMapping();
- return mapping == null ? null : mapping.affinityNode(cacheId, part);
+ boolean primary = !OPS_ALLOWED_ON_BACKUPS.contains(op);
+
+ return mapping == null ? null : mapping.affinityNode(cacheId, part,
primary);
+ }
+
+ /**
+ * @return List of nodes in current data center.
+ */
+ public List<UUID> dataCenterNodes() {
+ TopologyNodes top = lastTop.get();
+
+ return top == null ? null : top.dataCenterNodes();
}
/**
@@ -314,6 +384,11 @@ public class ClientCacheAffinityContext {
}
}
+ /** */
+ public String dataCenterId() {
+ return dataCenterId;
+ }
+
/** */
private boolean isTopologyOutdated(TopologyNodes top,
AffinityTopologyVersion srvSideTopVer) {
if (top == null)
@@ -340,6 +415,9 @@ public class ClientCacheAffinityContext {
/** Nodes. */
private final Collection<UUID> nodes = new ConcurrentLinkedQueue<>();
+ /** Current data center nodes. */
+ private volatile List<UUID> dcNodes;
+
/**
* @param topVer Topology version.
* @param nodeId Node id.
@@ -357,6 +435,13 @@ public class ClientCacheAffinityContext {
return Collections.unmodifiableCollection(nodes);
}
+ /**
+ * Gets nodes of current data center.
+ */
+ public List<UUID> dataCenterNodes() {
+ return dcNodes;
+ }
+
/**
* @return Topology version.
*/
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityMapping.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityMapping.java
index ed25c44055c..4a944e0c691 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityMapping.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityMapping.java
@@ -31,12 +31,14 @@ import
org.apache.ignite.client.ClientFeatureNotSupportedByServerException;
import org.apache.ignite.client.ClientPartitionAwarenessMapper;
import org.apache.ignite.internal.binary.BinaryReaderEx;
import org.apache.ignite.internal.binary.BinaryUtils;
+import org.apache.ignite.internal.binary.BinaryWriterEx;
import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import static
org.apache.ignite.internal.client.thin.ProtocolBitmaskFeature.ALL_AFFINITY_MAPPINGS;
+import static
org.apache.ignite.internal.client.thin.ProtocolBitmaskFeature.DC_AWARE;
/**
* Affinity mapping (partition to nodes) for each cache.
@@ -44,16 +46,19 @@ import static
org.apache.ignite.internal.client.thin.ProtocolBitmaskFeature.ALL_
public class ClientCacheAffinityMapping {
/** CacheAffinityInfo for caches with not applicable partition awareness.
*/
private static final CacheAffinityInfo NOT_APPLICABLE_CACHE_AFFINITY_INFO =
- new CacheAffinityInfo(null, null, null);
+ new CacheAffinityInfo(null, null, null, null);
+
+ /** Empty part to node mapping. */
+ private static final UUID[] EMPTY_PART_MAPPING = new UUID[0];
/** Topology version. */
private final AffinityTopologyVersion topVer;
/** Affinity information for each cache. */
- private final Map<Integer, CacheAffinityInfo> cacheAffinity = new
HashMap<>();
+ private final Map<Integer, CacheAffinityInfo> cacheAff = new HashMap<>();
/** Unmodifiable collection of cache IDs. To preserve instance
immutability. */
- private final Collection<Integer> cacheIds =
Collections.unmodifiableCollection(cacheAffinity.keySet());
+ private final Collection<Integer> cacheIds =
Collections.unmodifiableCollection(cacheAff.keySet());
/**
* @param topVer Topology version.
@@ -82,10 +87,11 @@ public class ClientCacheAffinityMapping {
* @param binary Binary data processor (needed to extract affinity field
from the key).
* @param cacheId Cache ID.
* @param key Key.
+ * @param primary Force primary node.
* @return Affinity node id or {@code null} if affinity node can't be
determined for given cache and key.
*/
- public UUID affinityNode(IgniteBinary binary, int cacheId, Object key) {
- CacheAffinityInfo affInfo = cacheAffinity.get(cacheId);
+ public UUID affinityNode(IgniteBinary binary, int cacheId, Object key,
boolean primary) {
+ CacheAffinityInfo affInfo = cacheAff.get(cacheId);
if (affInfo == null || affInfo == NOT_APPLICABLE_CACHE_AFFINITY_INFO)
return null;
@@ -105,7 +111,7 @@ public class ClientCacheAffinityMapping {
}
}
- return affInfo.nodeForKey(binaryKey);
+ return affInfo.nodeForKey(binaryKey, primary);
}
/**
@@ -113,15 +119,16 @@ public class ClientCacheAffinityMapping {
*
* @param cacheId Cache ID.
* @param part Partition.
+ * @param primary Force primary node.
* @return Affinity node id or {@code null} if affinity node can't be
determined for given cache and partition.
*/
- public UUID affinityNode(int cacheId, int part) {
- CacheAffinityInfo affInfo = cacheAffinity.get(cacheId);
+ public UUID affinityNode(int cacheId, int part, boolean primary) {
+ CacheAffinityInfo affInfo = cacheAff.get(cacheId);
if (affInfo == null || affInfo == NOT_APPLICABLE_CACHE_AFFINITY_INFO)
return null;
- return affInfo.nodeForPartition(part);
+ return affInfo.nodeForPartition(part, primary);
}
/**
@@ -136,7 +143,7 @@ public class ClientCacheAffinityMapping {
assert res.topVer.equals(mapping.topVer) : "Mappings must have
identical topology versions [res.topVer=" +
res.topVer + ", mapping.topVer=" + mapping.topVer + ']';
- res.cacheAffinity.putAll(mapping.cacheAffinity);
+ res.cacheAff.putAll(mapping.cacheAff);
}
return res;
@@ -148,8 +155,14 @@ public class ClientCacheAffinityMapping {
* @param ch Output channel.
* @param cacheIds Set of cache ids to request.
* @param customMappingsRequired {@code true} if non-default affinity
mappings required.
+ * @param dcId Data center ID.
*/
- public static void writeRequest(PayloadOutputChannel ch,
Collection<Integer> cacheIds, boolean customMappingsRequired) {
+ public static void writeRequest(
+ PayloadOutputChannel ch,
+ Collection<Integer> cacheIds,
+ boolean customMappingsRequired,
+ String dcId
+ ) {
ProtocolContext ctx = ch.clientChannel().protocolCtx();
if (customMappingsRequired &&
!ctx.isFeatureSupported(ALL_AFFINITY_MAPPINGS))
@@ -160,6 +173,12 @@ public class ClientCacheAffinityMapping {
if (ctx.isFeatureSupported(ALL_AFFINITY_MAPPINGS))
out.writeBoolean(customMappingsRequired);
+ if (ctx.isFeatureSupported(DC_AWARE)) {
+ try (BinaryWriterEx w = BinaryUtils.writer(null, out, null)) {
+ w.writeString(dcId);
+ }
+ }
+
out.writeInt(cacheIds.size());
for (int cacheId : cacheIds)
@@ -197,7 +216,15 @@ public class ClientCacheAffinityMapping {
for (int j = 0; j < cachesCnt; j++)
cacheKeyCfg.put(in.readInt(),
readCacheKeyConfiguration(in));
- UUID[] partToNode = readNodePartitions(in);
+ UUID[] primaryPartToNode = readNodePartitions(in);
+ UUID[] dcPartToNode = primaryPartToNode;
+
+ if
(ch.clientChannel().protocolCtx().isFeatureSupported(DC_AWARE)) {
+ dcPartToNode = readNodePartitions(in);
+
+ if (dcPartToNode.length == 0)
+ dcPartToNode = primaryPartToNode;
+ }
boolean dfltMapping = true;
@@ -212,13 +239,15 @@ public class ClientCacheAffinityMapping {
if (factory == null)
continue;
- aff.cacheAffinity.put(keyCfg.getKey(),
- new CacheAffinityInfo(keyCfg.getValue(),
partToNode, factory.apply(partToNode.length)));
+ aff.cacheAff.put(keyCfg.getKey(),
+ new CacheAffinityInfo(keyCfg.getValue(),
primaryPartToNode, dcPartToNode,
+ factory.apply(primaryPartToNode.length))
+ );
}
}
else { // Partition awareness is not applicable for these
caches.
for (int j = 0; j < cachesCnt; j++)
- aff.cacheAffinity.put(in.readInt(),
NOT_APPLICABLE_CACHE_AFFINITY_INFO);
+ aff.cacheAff.put(in.readInt(),
NOT_APPLICABLE_CACHE_AFFINITY_INFO);
}
}
@@ -249,6 +278,9 @@ public class ClientCacheAffinityMapping {
private static UUID[] readNodePartitions(BinaryReaderEx in) {
int nodesCnt = in.readInt();
+ if (nodesCnt == 0)
+ return EMPTY_PART_MAPPING;
+
int maxPart = -1;
UUID[] partToNode = new UUID[1024];
@@ -283,20 +315,30 @@ public class ClientCacheAffinityMapping {
/** Key configuration. */
private final Map<Integer, Integer> keyCfg;
- /** Partition mapping. */
- private final UUID[] partMapping;
+ /** Primary partition mapping. */
+ private final UUID[] primaryPartMapping;
+
+ /** Partition mapping to nodes located in current DC. */
+ private final UUID[] dcPartMapping;
/** Mapper a cache key to a partition. */
private final ClientPartitionAwarenessMapper keyMapper;
/**
* @param keyCfg Cache key configuration or {@code null} if partition
awareness is not applicable for this cache.
- * @param partMapping Partition to node mapping or {@code null} if
partition awareness is not applicable for
- * this cache.
+ * @param primaryPartMapping Primary partition to node mapping or
{@code null} if partition awareness
+ * is not applicable for this cache.
+ * @param dcPartMapping Partition to node mapping located in current
DC.
*/
- private CacheAffinityInfo(Map<Integer, Integer> keyCfg, UUID[]
partMapping, ClientPartitionAwarenessMapper keyMapper) {
+ private CacheAffinityInfo(
+ Map<Integer, Integer> keyCfg,
+ UUID[] primaryPartMapping,
+ UUID[] dcPartMapping,
+ ClientPartitionAwarenessMapper keyMapper
+ ) {
this.keyCfg = keyCfg;
- this.partMapping = partMapping;
+ this.primaryPartMapping = primaryPartMapping;
+ this.dcPartMapping = dcPartMapping;
this.keyMapper = keyMapper;
}
@@ -304,20 +346,24 @@ public class ClientCacheAffinityMapping {
* Calculates node for given key.
*
* @param key Key.
+ * @param primary Force primary node.
*/
- private UUID nodeForKey(Object key) {
+ private UUID nodeForKey(Object key, boolean primary) {
if (keyMapper == null)
return null;
- return nodeForPartition(keyMapper.partition(key));
+ return nodeForPartition(keyMapper.partition(key), primary);
}
/**
* Calculates node for given partition.
*
* @param part Partition.
+ * @param primary Force primary node.
*/
- private UUID nodeForPartition(int part) {
+ private UUID nodeForPartition(int part, boolean primary) {
+ UUID[] partMapping = primary ? primaryPartMapping : dcPartMapping;
+
if (part < 0 || partMapping == null || part >= partMapping.length)
return null;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientFieldsQueryPager.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientFieldsQueryPager.java
index 157de864383..4116c851ab5 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientFieldsQueryPager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientFieldsQueryPager.java
@@ -38,6 +38,25 @@ class ClientFieldsQueryPager extends
GenericQueryPager<List<?>> implements Field
/** Serializer/deserializer. */
private final ClientUtils serDes;
+ /** Constructor. */
+ ClientFieldsQueryPager(
+ ReliableChannel ch,
+ @Nullable TcpClientTransaction tx,
+ ClientOperation qryOp,
+ ClientOperation pageQryOp,
+ Consumer<PayloadOutputChannel> qryWriter,
+ boolean keepBinary,
+ ClientBinaryMarshaller marsh,
+ int cacheId,
+ int partId
+ ) {
+ super(ch, tx, qryOp, pageQryOp, qryWriter, cacheId, partId);
+
+ this.keepBinary = keepBinary;
+
+ serDes = new ClientUtils(marsh);
+ }
+
/** Constructor. */
ClientFieldsQueryPager(
ReliableChannel ch,
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
index 6f0f31e89f1..112dc47c712 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
@@ -207,6 +207,9 @@ public enum ClientOperation {
/** Get nodes endpoints. */
CLUSTER_GROUP_GET_NODE_ENDPOINTS(5102),
+ /** Get nodes of data center. */
+ CLUSTER_GET_DC_NODES(5103),
+
/** Execute compute task. */
COMPUTE_TASK_EXECUTE(6000),
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java
index fb2508523a7..9277fdfd9ce 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java
@@ -109,7 +109,10 @@ public enum ProtocolBitmaskFeature {
*
* @see GridCacheProcessor#dynamicStartCache(CacheConfiguration, String,
NearCacheConfiguration, boolean, boolean, boolean)
*/
- SQL_CACHE_CREATION(21);
+ SQL_CACHE_CREATION(21),
+
+ /** Data-center information. */
+ DC_AWARE(22);
/** */
private static final EnumSet<ProtocolBitmaskFeature>
ALL_FEATURES_AS_ENUM_SET =
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
index e03eb948781..cacc7aa5ed0 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.client.thin;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
@@ -41,6 +40,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteBinary;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.client.ClientAuthenticationException;
import org.apache.ignite.client.ClientAuthorizationException;
import org.apache.ignite.client.ClientConnectionException;
@@ -142,10 +142,16 @@ final class ReliableChannel implements AutoCloseable {
partitionAwarenessEnabled = clientCfg.isPartitionAwarenessEnabled();
+ String dcId =
IgniteSystemProperties.getString(IgniteSystemProperties.IGNITE_DATA_CENTER_ID);
+
+ if (dcId == null && !F.isEmpty(clientCfg.getUserAttributes()))
+ dcId =
clientCfg.getUserAttributes().get(IgniteSystemProperties.IGNITE_DATA_CENTER_ID);
+
affinityCtx = new ClientCacheAffinityContext(
binary,
clientCfg.getPartitionAwarenessMapperFactory(),
- this::isConnectionEstablished
+ this::isConnectionEstablished,
+ dcId
);
discoveryCtx = new ClientDiscoveryContext(clientCfg);
@@ -191,7 +197,7 @@ final class ReliableChannel implements AutoCloseable {
Consumer<PayloadOutputChannel> payloadWriter,
Function<PayloadInputChannel, T> payloadReader
) throws ClientException, ClientError {
- return service(op, payloadWriter, payloadReader,
Collections.emptyList());
+ return service(op, payloadWriter, payloadReader,
affinityCtx.dataCenterNodes());
}
/**
@@ -246,7 +252,14 @@ final class ReliableChannel implements AutoCloseable {
List<ClientConnectionException> failures
) {
try {
- ClientChannel ch = applyOnDefaultChannel(Function.identity(),
null, failures);
+ List<UUID> targetNodes = affinityCtx.dataCenterNodes();
+
+ ClientChannel ch = F.isEmpty(targetNodes)
+ ? applyOnDefaultChannel(Function.identity(), null, failures)
+ : applyOnNodeChannelWithFallback(
+
targetNodes.get(ThreadLocalRandom.current().nextInt(targetNodes.size())),
+ Function.identity(),
+ null);
applyOnClientChannelAsync(fut, ch, op, payloadWriter,
payloadReader, failures);
}
@@ -401,7 +414,7 @@ final class ReliableChannel implements AutoCloseable {
Function<PayloadInputChannel, T> payloadReader
) throws ClientException, ClientError {
if (partitionAwarenessEnabled && affinityInfoIsUpToDate(cacheId)) {
- UUID affNodeId = affinityCtx.affinityNode(cacheId, key);
+ UUID affNodeId = affinityCtx.affinityNode(cacheId, key, op);
if (affNodeId != null) {
return applyOnNodeChannelWithFallback(affNodeId, channel ->
@@ -423,7 +436,7 @@ final class ReliableChannel implements AutoCloseable {
Function<PayloadInputChannel, T> payloadReader
) throws ClientException, ClientError {
if (partitionAwarenessEnabled && affinityInfoIsUpToDate(cacheId)) {
- UUID affNodeId = affinityCtx.affinityNode(cacheId, part);
+ UUID affNodeId = affinityCtx.affinityNode(cacheId, part, op);
if (affNodeId != null) {
return applyOnNodeChannelWithFallback(affNodeId, channel ->
@@ -445,7 +458,7 @@ final class ReliableChannel implements AutoCloseable {
Function<PayloadInputChannel, T> payloadReader
) throws ClientException, ClientError {
if (partitionAwarenessEnabled && affinityInfoIsUpToDate(cacheId)) {
- UUID affNodeId = affinityCtx.affinityNode(cacheId, key);
+ UUID affNodeId = affinityCtx.affinityNode(cacheId, key, op);
if (affNodeId != null) {
CompletableFuture<T> fut = new CompletableFuture<>();
@@ -505,10 +518,20 @@ final class ReliableChannel implements AutoCloseable {
if (lastTop != affinityCtx.lastTopology())
return false;
- Boolean result = applyOnNodeChannel(nodeId, channel ->
- channel.service(ClientOperation.CACHE_PARTITIONS,
- affinityCtx::writePartitionsUpdateRequest,
- affinityCtx::readPartitionsUpdateResponse),
+ Boolean result = applyOnNodeChannel(nodeId, channel ->
{
+ boolean updated =
channel.service(ClientOperation.CACHE_PARTITIONS,
+ affinityCtx::writePartitionsUpdateRequest,
+ affinityCtx::readPartitionsUpdateResponse);
+
+ if (updated && affinityCtx.dataCenterId() !=
null
+ &&
channel.protocolCtx().isFeatureSupported(ProtocolBitmaskFeature.DC_AWARE)) {
+
channel.service(ClientOperation.CLUSTER_GET_DC_NODES,
+
affinityCtx::writeDataCenterNodesRequest,
+
affinityCtx::readDataCenterNodesResponse);
+ }
+
+ return updated;
+ },
failures
);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
index b24dbb2eefd..36eca1ca954 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
@@ -1387,7 +1387,9 @@ public class TcpClientCache<K, V> implements
ClientCache<K, V> {
ClientOperation.QUERY_SQL_CURSOR_GET_PAGE,
qryWriter,
keepBinary,
- marsh
+ marsh,
+ cacheId,
+ qry.getPartitions().length >= 1 ? qry.getPartitions()[0] : -1
));
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java
index f7115d41af3..3aebc1c58bc 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java
@@ -106,7 +106,10 @@ public enum ClientBitmaskFeature implements
ThinProtocolFeature {
* Set this flag required when creating caches during dump restoration and
similar processes.
* @see GridCacheProcessor#dynamicStartCache(CacheConfiguration, String,
NearCacheConfiguration, boolean, boolean, boolean)
*/
- SQL_CACHE_CREATION(21);
+ SQL_CACHE_CREATION(21),
+
+ /** Data-center information. */
+ DC_AWARE(22);
/** */
private static final EnumSet<ClientBitmaskFeature>
ALL_FEATURES_AS_ENUM_SET =
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
index 1cba38e7d16..d0f54ef2136 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
@@ -77,6 +77,7 @@ import
org.apache.ignite.internal.processors.platform.client.cache.ClientCacheSc
import
org.apache.ignite.internal.processors.platform.client.cache.ClientCacheSqlFieldsQueryRequest;
import
org.apache.ignite.internal.processors.platform.client.cache.ClientCacheSqlQueryRequest;
import
org.apache.ignite.internal.processors.platform.client.cluster.ClientClusterChangeStateRequest;
+import
org.apache.ignite.internal.processors.platform.client.cluster.ClientClusterGetDataCenterNodesRequest;
import
org.apache.ignite.internal.processors.platform.client.cluster.ClientClusterGetStateRequest;
import
org.apache.ignite.internal.processors.platform.client.cluster.ClientClusterGroupGetNodeIdsRequest;
import
org.apache.ignite.internal.processors.platform.client.cluster.ClientClusterGroupGetNodesDetailsRequest;
@@ -313,6 +314,9 @@ public class ClientMessageParser implements
ClientListenerMessageParser {
/** */
private static final short OP_CLUSTER_GROUP_GET_NODE_ENDPOINTS = 5102;
+ /** */
+ private static final short OP_CLUSTER_GET_DATA_CENTER_NODES = 5103;
+
/* Compute operations. */
/** */
private static final short OP_COMPUTE_TASK_EXECUTE = 6000;
@@ -641,6 +645,9 @@ public class ClientMessageParser implements
ClientListenerMessageParser {
case OP_CLUSTER_GROUP_GET_NODE_ENDPOINTS:
return new ClientClusterGroupGetNodesEndpointsRequest(reader);
+ case OP_CLUSTER_GET_DATA_CENTER_NODES:
+ return new ClientClusterGetDataCenterNodesRequest(reader);
+
case OP_COMPUTE_TASK_EXECUTE:
return new ClientExecuteTaskRequest(reader);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionAwarenessGroup.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionAwarenessGroup.java
index 333cc8b3f2b..3a430affdc5 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionAwarenessGroup.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionAwarenessGroup.java
@@ -74,7 +74,7 @@ class ClientCachePartitionAwarenessGroup {
writeCacheKeyConfiguration(writer, proc,
entry.getValue().getKeyConfiguration());
}
- mapping.write(writer);
+ mapping.write(cpctx, writer);
if
(cpctx.isFeatureSupported(ClientBitmaskFeature.ALL_AFFINITY_MAPPINGS))
writer.writeBoolean(dfltAffinity);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionMapping.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionMapping.java
index 8a9c641812b..66978ed1767 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionMapping.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionMapping.java
@@ -17,45 +17,60 @@
package org.apache.ignite.internal.processors.platform.client.cache;
-import java.util.HashMap;
+import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import org.apache.ignite.binary.BinaryRawWriter;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
+import
org.apache.ignite.internal.processors.platform.client.ClientBitmaskFeature;
+import
org.apache.ignite.internal.processors.platform.client.ClientProtocolContext;
+import org.jetbrains.annotations.Nullable;
/**
* Cache partition mapping.
*/
public class ClientCachePartitionMapping {
- /** Partitions map for caches. */
- private final HashMap<UUID, Set<Integer>> partitionMap;
+ /** Primary partitions map for caches. */
+ private final Map<UUID, Set<Integer>> primaryPartitionMap;
+
+ /** Partitions map to nodes located in current data center for caches. */
+ @Nullable private final Map<UUID, Set<Integer>> dcPartitionMap;
/**
- * @param assignment Affinity assignment.
+ * @param primaryPartitionMap Primary partition mapping.
+ * @param dcPartitionMap Partition mapping to nodes located in current
data center.
*/
- public ClientCachePartitionMapping(AffinityAssignment assignment) {
- Set<ClusterNode> nodes = assignment.primaryPartitionNodes();
-
- partitionMap = new HashMap<>(nodes.size());
-
- for (ClusterNode node : nodes) {
- UUID nodeId = node.id();
- Set<Integer> parts = assignment.primaryPartitions(nodeId);
-
- partitionMap.put(nodeId, parts);
- }
+ @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
+ public ClientCachePartitionMapping(
+ Map<UUID, Set<Integer>> primaryPartitionMap,
+ @Nullable Map<UUID, Set<Integer>> dcPartitionMap
+ ) {
+ this.primaryPartitionMap = primaryPartitionMap;
+ this.dcPartitionMap = dcPartitionMap;
}
/**
* Write mapping using binary writer.
* @param writer Writer.
*/
- public void write(BinaryRawWriter writer) {
+ public void write(ClientProtocolContext ctx, BinaryRawWriter writer) {
+ writePartitionMap(writer, primaryPartitionMap);
+
+ if (ctx.isFeatureSupported(ClientBitmaskFeature.DC_AWARE))
+ writePartitionMap(writer, dcPartitionMap);
+ }
+
+ /** */
+ private static void writePartitionMap(BinaryRawWriter writer, @Nullable
Map<UUID, Set<Integer>> partitionMap) {
+ if (partitionMap == null) {
+ writer.writeInt(0);
+
+ return;
+ }
+
writer.writeInt(partitionMap.size());
- for (HashMap.Entry<UUID, Set<Integer>> nodeParts:
partitionMap.entrySet()) {
+ for (Map.Entry<UUID, Set<Integer>> nodeParts: partitionMap.entrySet())
{
UUID nodeUuid = nodeParts.getKey();
Set<Integer> parts = nodeParts.getValue();
@@ -77,11 +92,12 @@ public class ClientCachePartitionMapping {
ClientCachePartitionMapping mapping = (ClientCachePartitionMapping)o;
- return Objects.equals(partitionMap, mapping.partitionMap);
+ return Objects.equals(primaryPartitionMap, mapping.primaryPartitionMap)
+ && Objects.equals(dcPartitionMap, mapping.dcPartitionMap);
}
/** {@inheritDoc} */
@Override public int hashCode() {
- return Objects.hash(partitionMap);
+ return Objects.hash(primaryPartitionMap, dcPartitionMap);
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionsRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionsRequest.java
index 0fbf71b3277..152d904f8b1 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionsRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionsRequest.java
@@ -20,14 +20,18 @@ package
org.apache.ignite.internal.processors.platform.client.cache;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.ignite.binary.BinaryRawReader;
import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.cache.CacheType;
@@ -59,6 +63,9 @@ public class ClientCachePartitionsRequest extends
ClientRequest {
*/
private final boolean withCustomMappings;
+ /** Data center ID. */
+ private final String dcId;
+
/**
* Initializes a new instance of ClientRawRequest class.
* @param reader Reader.
@@ -71,6 +78,11 @@ public class ClientCachePartitionsRequest extends
ClientRequest {
else
withCustomMappings = false;
+ if (protocolCtx.isFeatureSupported(ClientBitmaskFeature.DC_AWARE))
+ dcId = reader.readString();
+ else
+ dcId = null;
+
int len = reader.readInt();
cacheIds = new int[len];
@@ -131,28 +143,21 @@ public class ClientCachePartitionsRequest extends
ClientRequest {
/**
* Process cache and create new partition mapping, if it does not belong
to any existent.
* @param ctx Connection context.
- * @param affinityVer Affinity topology version.
+ * @param affVer Affinity topology version.
* @param cacheDesc Cache descriptor.
* @param withCustomMappings {@code true} to verify a non-default affinity
function also.
* @return Null if cache was processed and new client cache partition
awareness group if it does not belong to any
* existent.
*/
- private static ClientCachePartitionAwarenessGroup processCache(
+ private ClientCachePartitionAwarenessGroup processCache(
ClientConnectionContext ctx,
- ClientAffinityTopologyVersion affinityVer,
+ ClientAffinityTopologyVersion affVer,
DynamicCacheDescriptor cacheDesc,
boolean withCustomMappings
) {
- AffinityAssignment assignment = getCacheAssignment(ctx, affinityVer,
cacheDesc.cacheId());
-
- // If assignment is not available for the cache for required affinity
version, ignore the cache.
- if (assignment == null)
- return null;
-
- ClientCachePartitionMapping mapping = null;
-
- if (isApplicable(cacheDesc.cacheConfiguration(), withCustomMappings))
- mapping = new ClientCachePartitionMapping(assignment);
+ ClientCachePartitionMapping mapping =
isApplicable(cacheDesc.cacheConfiguration(), withCustomMappings)
+ ? getCachePartitionMapping(ctx, affVer, cacheDesc.cacheId(), dcId)
+ : null;
return new ClientCachePartitionAwarenessGroup(mapping,
!withCustomMappings ||
isDefaultMapping(cacheDesc.cacheConfiguration()));
@@ -161,15 +166,58 @@ public class ClientCachePartitionsRequest extends
ClientRequest {
/**
* Get assignment for a cache in a safe way.
* @param ctx Client connection context.
- * @param affinityVer Affinity version.
+ * @param affVer Affinity version.
* @param cacheId Cache ID.
- * @return Affinity assignment for a cache, or null if is not possible to
get.
+ * @param dcId Data center ID.
+ * @return Partition mapping for a cache, or null if is not possible to
get.
*/
- @Nullable private static AffinityAssignment
getCacheAssignment(ClientConnectionContext ctx,
- ClientAffinityTopologyVersion affinityVer, int cacheId) {
+ @Nullable private static ClientCachePartitionMapping
getCachePartitionMapping(
+ ClientConnectionContext ctx,
+ ClientAffinityTopologyVersion affVer,
+ int cacheId,
+ String dcId
+ ) {
try {
- GridCacheContext cacheCtx =
ctx.kernalContext().cache().context().cacheContext(cacheId);
- return cacheCtx.affinity().assignment(affinityVer.getVersion());
+ GridCacheContext<?, ?> cacheCtx =
ctx.kernalContext().cache().context().cacheContext(cacheId);
+
+ AffinityAssignment assignment =
cacheCtx.affinity().assignment(affVer.getVersion());
+
+ Set<ClusterNode> nodes = assignment.primaryPartitionNodes();
+
+ Map<UUID, Set<Integer>> primaryPartitionMap = new
HashMap<>(nodes.size());
+
+ for (ClusterNode node : nodes) {
+ UUID nodeId = node.id();
+ Set<Integer> parts = assignment.primaryPartitions(nodeId);
+
+ primaryPartitionMap.put(nodeId, parts);
+ }
+
+ Map<UUID, Set<Integer>> dcPartitionMap = null;
+
+ if (dcId != null && cacheCtx.config().isReadFromBackup()
+ && cacheCtx.config().getWriteSynchronizationMode() !=
CacheWriteSynchronizationMode.PRIMARY_SYNC) {
+ // Filter partitions, located in current DC.
+ dcPartitionMap = new HashMap<>();
+
+ List<List<ClusterNode>> partAssignments =
assignment.assignment();
+
+ for (int p = 0; p < partAssignments.size(); p++) {
+ List<ClusterNode> partAssignment = partAssignments.get(p);
+
+ ClusterNode node = F.find(partAssignment, null, n ->
dcId.equals(n.dataCenterId()));
+
+ if (node != null)
+ dcPartitionMap.computeIfAbsent(node.id(), id -> new
HashSet<>()).add(p);
+ else {
+ dcPartitionMap = null;
+
+ break;
+ }
+ }
+ }
+
+ return new ClientCachePartitionMapping(primaryPartitionMap,
dcPartitionMap);
}
catch (Exception e) {
return null;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cluster/ClientClusterGetDataCenterNodesRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cluster/ClientClusterGetDataCenterNodesRequest.java
new file mode 100644
index 00000000000..95ab92e41fb
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cluster/ClientClusterGetDataCenterNodesRequest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.client.cluster;
+
+import java.util.Collection;
+import java.util.UUID;
+import org.apache.ignite.binary.BinaryRawReader;
+import org.apache.ignite.cluster.ClusterNode;
+import
org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
+import org.apache.ignite.internal.processors.platform.client.ClientRequest;
+import org.apache.ignite.internal.processors.platform.client.ClientResponse;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ * Cluster get data center nodes request .
+ */
+public class ClientClusterGetDataCenterNodesRequest extends ClientRequest {
+ /** */
+ private final String dcId;
+
+ /**
+ * Constructor.
+ *
+ * @param reader Reader.
+ */
+ public ClientClusterGetDataCenterNodesRequest(BinaryRawReader reader) {
+ super(reader);
+
+ dcId = reader.readString();
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClientResponse process(ClientConnectionContext ctx) {
+ if (dcId == null)
+ throw new IllegalArgumentException("Data center ID cannot be
null");
+
+ Collection<ClusterNode> srvNodes =
ctx.kernalContext().discovery().aliveServerNodes();
+
+ Collection<UUID> nodeIds = F.viewReadOnly(srvNodes, ClusterNode::id, n
-> dcId.equals(n.dataCenterId()));
+
+ return new ClientClusterGetDataCenterNodesResponse(requestId(),
nodeIds);
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cluster/ClientClusterGetDataCenterNodesResponse.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cluster/ClientClusterGetDataCenterNodesResponse.java
new file mode 100644
index 00000000000..ac89a1f276d
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cluster/ClientClusterGetDataCenterNodesResponse.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.client.cluster;
+
+import java.util.Collection;
+import java.util.UUID;
+import org.apache.ignite.internal.binary.BinaryWriterEx;
+import
org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
+import org.apache.ignite.internal.processors.platform.client.ClientResponse;
+
+/**
+ * Cluster group get data center nodes response.
+ */
+public class ClientClusterGetDataCenterNodesResponse extends ClientResponse {
+ /** Node ids. */
+ private final Collection<UUID> nodeIds;
+
+ /**
+ * Constructor.
+ *
+ * @param reqId Request id.
+ * @param nodeIds Node ids.
+ */
+ public ClientClusterGetDataCenterNodesResponse(long reqId,
Collection<UUID> nodeIds) {
+ super(reqId);
+ this.nodeIds = nodeIds;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void encode(ClientConnectionContext ctx, BinaryWriterEx
writer) {
+ super.encode(ctx, writer);
+
+ int cnt = 0;
+ int pos = writer.reserveInt();
+
+ for (UUID nodeId: nodeIds) {
+ cnt++;
+ writer.writeLong(nodeId.getMostSignificantBits());
+ writer.writeLong(nodeId.getLeastSignificantBits());
+ }
+
+ writer.writeInt(pos, cnt);
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
b/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
index 86e00d30137..67206d56b0b 100644
--- a/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
@@ -356,7 +356,7 @@ public class ReliabilityTest extends AbstractThinClientTest
{
String nullOpsNames =
nullOps.stream().map(Enum::name).collect(Collectors.joining(", "));
- long expectedNullCnt = 23;
+ long expectedNullCnt = 24;
String msg = nullOps.size()
+ " operation codes do not have public equivalent. When adding
new codes, update ClientOperationType too. Missing ops: "
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java
index 8430fc5ff81..6bc4c3dec45 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.client.thin;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.UUID;
@@ -88,6 +89,9 @@ public abstract class
ThinClientAbstractPartitionAwarenessTest extends GridCommo
/** Channels. */
protected final TestTcpClientChannel[] channels = new
TestTcpClientChannel[MAX_CLUSTER_SIZE];
+ /** Channels list. */
+ protected final List<TestTcpClientChannel> channelsList =
Collections.unmodifiableList(F.asList(channels));
+
/** Operations queue. */
protected final Queue<T2<TestTcpClientChannel, ClientOperation>> opsQueue
= new ConcurrentLinkedQueue<>();
@@ -183,6 +187,18 @@ public abstract class
ThinClientAbstractPartitionAwarenessTest extends GridCommo
}
}
+ /**
+ * Next operation channel index.
+ */
+ protected int nextOpChannelIdx() {
+ T2<TestTcpClientChannel, ClientOperation> nextOp = opsQueue.poll();
+
+ if (nextOp == null)
+ return -1;
+
+ return channelsList.indexOf(nextOp.get1());
+ }
+
/**
* Calculates affinity channel for cache and key.
*/
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessBalancingTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessBalancingTest.java
index f89dffe8c3f..903ea7dddd3 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessBalancingTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessBalancingTest.java
@@ -18,9 +18,6 @@
package org.apache.ignite.internal.client.thin;
import java.util.BitSet;
-import java.util.List;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
import org.junit.Test;
/**
@@ -39,12 +36,13 @@ public class ThinClientPartitionAwarenessBalancingTest
extends ThinClientAbstrac
for (int i = 0; i < 100; i++)
client.cacheNames(); // Non-affinity requests should be randomly
distributed among connections.
- List<TestTcpClientChannel> channelList = F.asList(channels);
+ while (true) {
+ int channelIdx = nextOpChannelIdx();
- while (!opsQueue.isEmpty()) {
- T2<TestTcpClientChannel, ClientOperation> op = opsQueue.poll();
+ if (channelIdx < 0)
+ break;
- usedConnections.set(channelList.indexOf(op.get1()));
+ usedConnections.set(channelIdx);
}
assertEquals(BitSet.valueOf(new byte[] {7}), usedConnections); // 7 =
set of {0, 1, 2}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessMultiDcTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessMultiDcTest.java
new file mode 100644
index 00000000000..e35025ee6e2
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessMultiDcTest.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import
org.apache.ignite.cache.affinity.rendezvous.ClusterNodeAttributeAffinityBackupFilter;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.client.ClientCache;
+import org.apache.ignite.client.ClientTransaction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.configuration.ClientConnectorConfiguration;
+import org.apache.ignite.configuration.ThinClientConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.typedef.F;
+import org.junit.Test;
+
+/**
+ * Test partition awareness of thin client in multi data-center environment.
+ */
+public class ThinClientPartitionAwarenessMultiDcTest extends
ThinClientAbstractPartitionAwarenessTest {
+ /** */
+ private static final String DC1 = "dc1";
+
+ /** */
+ private static final String DC2 = "dc2";
+
+ /** */
+ private static final String DC3 = "dc3";
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGrid(0, DC1);
+ startGrid(1, DC1);
+
+ startGrid(2, DC2);
+ startGrid(3, DC2);
+ }
+
+ /** */
+ private IgniteEx startGrid(int idx, String dcId) throws Exception {
+ return startGrid(getConfiguration(getTestIgniteInstanceName(idx))
+ .setClientConnectorConfiguration(new ClientConnectorConfiguration()
+ .setThinClientConfiguration(new ThinClientConfiguration()
+ .setMaxActiveComputeTasksPerConnection(1)))
+
.setUserAttributes(F.asMap(IgniteSystemProperties.IGNITE_DATA_CENTER_ID,
dcId)));
+ }
+
+ /** */
+ private void initClient(
+ ClientConfiguration clientCfg,
+ String dcId,
+ int... chIdxs
+ ) throws IgniteInterruptedCheckedException {
+
initClient(clientCfg.setUserAttributes(F.asMap(IgniteSystemProperties.IGNITE_DATA_CENTER_ID,
dcId)), chIdxs);
+ }
+
+ /** */
+ @Test
+ public void testPartitionAwarenessRequests() throws Exception {
+ List<IgniteEx> curDcNodes = F.asList(grid(0), grid(1));
+ List<IgniteEx> anotherDcNodes = F.asList(grid(2), grid(3));
+
+ initClient(getClientConfiguration(0), DC1, 0, 1, 2, 3);
+
+ checkDcAwarePaRequests("test0",
CacheWriteSynchronizationMode.PRIMARY_SYNC, true,
+ curDcNodes, anotherDcNodes, false);
+
+ checkDcAwarePaRequests("test1",
CacheWriteSynchronizationMode.FULL_SYNC, false,
+ curDcNodes, anotherDcNodes, false);
+
+ checkDcAwarePaRequests("test2",
CacheWriteSynchronizationMode.FULL_SYNC, true,
+ curDcNodes, anotherDcNodes, true);
+
+ checkDcAwarePaRequests("test3",
CacheWriteSynchronizationMode.FULL_ASYNC, true,
+ curDcNodes, anotherDcNodes, true);
+ }
+
+ /** */
+ @Test
+ public void testPartitionAwarenessRequestsNoNodesInDc() throws Exception {
+ initClient(getClientConfiguration(0), DC3, 0, 1, 2, 3);
+
+ checkDcAwarePaRequests("test4",
CacheWriteSynchronizationMode.FULL_SYNC, true,
+ Collections.emptyList(), F.asList(grid(0), grid(1), grid(2),
grid(3)), false);
+ }
+
+ /** */
+ private void checkDcAwarePaRequests(
+ String cacheName,
+ CacheWriteSynchronizationMode mode,
+ boolean readFromBackup,
+ List<IgniteEx> curDcNodes,
+ List<IgniteEx> otherDcNodes,
+ boolean expReqToBackup
+ ) throws Exception {
+ grid(0).createCache(new CacheConfiguration<>(cacheName)
+ .setBackups(1)
+ .setWriteSynchronizationMode(mode)
+ .setReadFromBackup(readFromBackup)
+ .setAffinity(new
RendezvousAffinityFunction().setAffinityBackupFilter(
+ new
ClusterNodeAttributeAffinityBackupFilter(IgniteSystemProperties.IGNITE_DATA_CENTER_ID)))
+ );
+
+ ClientCache<Object, Object> cache = client.cache(cacheName);
+
+ // Init partitions request.
+ cache.put(0, 0);
+
+ // Ignore first put and partitions request.
+ opsQueue.clear();
+
+ Set<Integer> curDcNodeIdxs = new HashSet<>();
+
+ for (IgniteEx primaryNode : curDcNodes) {
+ curDcNodeIdxs.add(getTestIgniteInstanceIndex(primaryNode.name()));
+
+ List<Integer> keys = primaryKeys(primaryNode.cache(cacheName), 10);
+
+ for (Integer key : keys) {
+ int primaryNodeIdx =
getTestIgniteInstanceIndex(primaryNode.name());
+
+ // If primary in current DC, read and write requests always
sent to primary node.
+ cache.put(key, 0);
+
+ assertOpOnChannel(channels[primaryNodeIdx],
ClientOperation.CACHE_PUT);
+
+ cache.putAsync(key, 0).get();
+
+ assertOpOnChannel(channels[primaryNodeIdx],
ClientOperation.CACHE_PUT);
+
+ cache.get(key);
+
+ assertOpOnChannel(channels[primaryNodeIdx],
ClientOperation.CACHE_GET);
+
+ cache.getAsync(key).get();
+
+ assertOpOnChannel(channels[primaryNodeIdx],
ClientOperation.CACHE_GET);
+ }
+ }
+
+ for (IgniteEx primaryNode : otherDcNodes) {
+ List<Integer> keys = primaryKeys(primaryNode.cache(cacheName), 10);
+
+ for (Integer key : keys) {
+ int primaryNodeIdx =
getTestIgniteInstanceIndex(primaryNode.name());
+
+ // If primary in another DC, write requests always sent to
primary node.
+ cache.put(key, 0);
+
+ assertOpOnChannel(channels[primaryNodeIdx],
ClientOperation.CACHE_PUT);
+
+ cache.putAsync(key, 0).get();
+
+ assertOpOnChannel(channels[primaryNodeIdx],
ClientOperation.CACHE_PUT);
+
+ int backupNodeIdx = getTestIgniteInstanceIndex(backupNode(key,
cacheName).name());
+
+ assertTrue(curDcNodeIdxs.isEmpty() ||
curDcNodeIdxs.contains(backupNodeIdx));
+
+ // But read requests can be sent to current DC backup node.
+ cache.get(key);
+
+ assertOpOnChannel(channels[expReqToBackup ? backupNodeIdx :
primaryNodeIdx], ClientOperation.CACHE_GET);
+
+ cache.getAsync(key).get();
+
+ assertOpOnChannel(channels[expReqToBackup ? backupNodeIdx :
primaryNodeIdx], ClientOperation.CACHE_GET);
+
+ cache.containsKey(key);
+
+ assertOpOnChannel(channels[expReqToBackup ? backupNodeIdx :
primaryNodeIdx], ClientOperation.CACHE_CONTAINS_KEY);
+
+ int partIdx =
primaryNode.context().affinity().partition(cacheName, key);
+
+ cache.query(new ScanQuery<>().setPartition(partIdx)).getAll();
+
+ assertOpOnChannel(channels[expReqToBackup ? backupNodeIdx :
primaryNodeIdx], ClientOperation.QUERY_SCAN);
+ }
+ }
+ }
+
+ /** */
+ @Test
+ public void testNonPartitionAwarenessRequests() throws Exception {
+ String cacheName = "txCache0";
+
+ grid(0).createCache(new
CacheConfiguration<>(cacheName).setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL));
+
+ checkDcAwareNonPaRequests(cacheName, DC1, 0, 1);
+ checkDcAwareNonPaRequests(cacheName, DC2, 2, 3);
+ }
+
+ /** */
+ @Test
+ public void testNonPartitionAwarenessRequestsNoNodesInDc() throws
Exception {
+ String cacheName = "txCache1";
+
+ grid(0).createCache(new
CacheConfiguration<>(cacheName).setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL));
+
+ checkDcAwareNonPaRequests(cacheName, DC3, 0, 1, 2, 3);
+ }
+
+ /** */
+ private void checkDcAwareNonPaRequests(String cacheName, String dcId,
int... expNodeIdxs) throws Exception {
+ initClient(getClientConfiguration(0), dcId, 0, 1, 2, 3);
+
+ ClientCache<Object, Object> cache = client.cache(cacheName);
+
+ cache.put(0, 0);
+
+ opsQueue.clear();
+
+ for (int i = 0; i < 10; i++) {
+ cache.query(new ScanQuery<>()).getAll();
+
+ assertExpectedChannel(expNodeIdxs);
+
+ client.cluster().state();
+
+ assertExpectedChannel(expNodeIdxs);
+
+ client.compute().executeAsync2(TestTask.class.getName(),
null).get();
+
+ assertExpectedChannel(expNodeIdxs);
+
+ try (ClientTransaction tx = client.transactions().txStart()) {
+ cache.put(ThreadLocalRandom.current().nextInt(10), 0);
+ cache.get(ThreadLocalRandom.current().nextInt(10));
+
+ tx.commit();
+ }
+
+ while (true) {
+ int channelIdx = nextOpChannelIdx();
+
+ if (channelIdx < 0)
+ break;
+
+ assertTrue(F.contains(expNodeIdxs, channelIdx));
+ }
+ }
+ }
+
+ /** */
+ private void assertExpectedChannel(int... expChannelIdxs) {
+ int channelIdx = nextOpChannelIdx();
+
+ assertTrue(F.contains(expChannelIdxs, channelIdx));
+
+ assertTrue("Ops queue not empty: " + opsQueue, F.isEmpty(opsQueue));
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java
index 6de86591c82..6ddc0833435 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java
@@ -101,7 +101,7 @@ public class
ThinClientPartitionAwarenessResourceReleaseTest extends ThinClientA
awaitPartitionMapExchange();
// Cache destroyed, but mappings still exist on the client side.
- assertEquals(opCh.serverNodeId(), affCtx.affinityNode(cacheId,
Integer.valueOf(0)));
+ assertEquals(opCh.serverNodeId(), affCtx.affinityNode(cacheId,
Integer.valueOf(0), ClientOperation.CACHE_PUT));
client.cache(PART_CACHE_NAME).put(1, 1);
@@ -116,7 +116,7 @@ public class
ThinClientPartitionAwarenessResourceReleaseTest extends ThinClientA
}, 5_000L));
// Mapping for previous caches become outdated and will be updated on
the next request.
- assertNull(affCtx.currentMapping().affinityNode(cacheId, 0));
+ assertNull(affCtx.currentMapping().affinityNode(cacheId, 0, true));
// Trigger the next affinity mappings update. The outdated cache with
custom affinity was added
// to pending caches list and will be processed and cleared.
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
index ccaa6cd15f1..6d6ff42c17e 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
@@ -44,6 +44,7 @@ import
org.apache.ignite.internal.client.thin.ThinClientEnpointsDiscoveryTest;
import
org.apache.ignite.internal.client.thin.ThinClientNonTransactionalOperationsInTxTest;
import
org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessBalancingTest;
import
org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessDiscoveryTest;
+import
org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessMultiDcTest;
import
org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessResourceReleaseTest;
import
org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessStableTopologyTest;
import
org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessUnstableTopologyTest;
@@ -85,6 +86,7 @@ import org.junit.runners.Suite;
ThinClientPartitionAwarenessResourceReleaseTest.class,
ThinClientPartitionAwarenessDiscoveryTest.class,
ThinClientPartitionAwarenessBalancingTest.class,
+ ThinClientPartitionAwarenessMultiDcTest.class,
ThinClientNonTransactionalOperationsInTxTest.class,
ReliableChannelTest.class,
CacheAsyncTest.class,