This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new d72a123 IGNITE-11898 Java thin client: Affinity awareness support -
Fixes #6980.
d72a123 is described below
commit d72a123e122ffe1b4f715f98c2db5d79293f0c90
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Tue Nov 12 17:57:42 2019 +0300
IGNITE-11898 Java thin client: Affinity awareness support - Fixes #6980.
---
.../ignite/configuration/ClientConfiguration.java | 73 +++++
.../ignite/internal/client/thin/ClientBinary.java | 3 +-
.../client/thin/ClientCacheAffinityContext.java | 227 ++++++++++++++
.../client/thin/ClientCacheAffinityMapping.java | 269 ++++++++++++++++
.../ignite/internal/client/thin/ClientChannel.java | 17 +
.../client/thin/ClientChannelConfiguration.java | 70 +++--
.../internal/client/thin/ClientOperation.java | 1 +
.../internal/client/thin/ReliableChannel.java | 344 +++++++++++++++++----
.../apache/ignite/internal/client/thin/Result.java | 60 ----
.../internal/client/thin/TcpClientCache.java | 117 ++++---
.../internal/client/thin/TcpClientChannel.java | 40 ++-
.../internal/client/thin/TcpIgniteClient.java | 23 +-
.../org/apache/ignite/client/ReliabilityTest.java | 46 +++
.../ThinClientAbstractAffinityAwarenessTest.java | 322 +++++++++++++++++++
...nClientAffinityAwarenessStableTopologyTest.java | 206 ++++++++++++
...lientAffinityAwarenessUnstableTopologyTest.java | 220 +++++++++++++
.../org/apache/ignite/client/ClientTestSuite.java | 6 +-
17 files changed, 1826 insertions(+), 218 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/configuration/ClientConfiguration.java
b/modules/core/src/main/java/org/apache/ignite/configuration/ClientConfiguration.java
index 3949919..a4fe74c 100644
---
a/modules/core/src/main/java/org/apache/ignite/configuration/ClientConfiguration.java
+++
b/modules/core/src/main/java/org/apache/ignite/configuration/ClientConfiguration.java
@@ -93,6 +93,25 @@ public final class ClientConfiguration implements
Serializable {
private ClientTransactionConfiguration txCfg = new
ClientTransactionConfiguration();
/**
+ * Whether affinity awareness should be enabled.
+ *
+ * When {@code true} client attempts to send the request directly to the
primary node for the given cache key.
+ * To do so, connection is established to every known server node.
+ * By default {@code false} only one connection is established at a given
moment to a random server node.
+ */
+ private boolean affinityAwarenessEnabled;
+
+ /**
+ * Reconnect throttling period (in milliseconds). There are no more than
{@code reconnectThrottlingRetries}
+ * attempts to reconnect will be made within {@code
reconnectThrottlingPeriod} in case of connection loss.
+ * Throttling is disabled if either {@code reconnectThrottlingRetries} or
{@code reconnectThrottlingPeriod} is 0.
+ */
+ private long reconnectThrottlingPeriod = 30_000L;
+
+ /** Reconnect throttling retries. See {@code reconnectThrottlingPeriod}. */
+ private int reconnectThrottlingRetries = 3;
+
+ /**
* @return Host addresses.
*/
public String[] getAddresses() {
@@ -416,6 +435,60 @@ public final class ClientConfiguration implements
Serializable {
return this;
}
+ /**
+ * @return Whether affinity awareness should be enabled.
+ */
+ public boolean isAffinityAwarenessEnabled() {
+ return affinityAwarenessEnabled;
+ }
+
+ /**
+ * Enable or disable affinity awareness.
+ *
+ * @return {@code this} for chaining.
+ */
+ public ClientConfiguration setAffinityAwarenessEnabled(boolean
affinityAwarenessEnabled) {
+ this.affinityAwarenessEnabled = affinityAwarenessEnabled;
+
+ return this;
+ }
+
+ /**
+ * Gets reconnect throttling period.
+ */
+ public long getReconnectThrottlingPeriod() {
+ return reconnectThrottlingPeriod;
+ }
+
+ /**
+ * Sets reconnect throttling period.
+ *
+ * @return {@code this} for chaining.
+ */
+ public ClientConfiguration setReconnectThrottlingPeriod(long
reconnectThrottlingPeriod) {
+ this.reconnectThrottlingPeriod = reconnectThrottlingPeriod;
+
+ return this;
+ }
+
+ /**
+ * Gets reconnect throttling retries.
+ */
+ public int getReconnectThrottlingRetries() {
+ return reconnectThrottlingRetries;
+ }
+
+ /**
+ * Sets reconnect throttling retries.
+ *
+ * @return {@code this} for chaining.
+ */
+ public ClientConfiguration setReconnectThrottlingRetries(int
reconnectThrottlingRetries) {
+ this.reconnectThrottlingRetries = reconnectThrottlingRetries;
+
+ return this;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(ClientConfiguration.class, this);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientBinary.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientBinary.java
index 4655cbf..8f381a6 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientBinary.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientBinary.java
@@ -27,6 +27,7 @@ import org.apache.ignite.binary.BinaryType;
import org.apache.ignite.internal.binary.BinaryContext;
import org.apache.ignite.internal.binary.BinaryEnumObjectImpl;
import org.apache.ignite.internal.binary.BinaryMetadata;
+import org.apache.ignite.internal.binary.BinaryUtils;
import org.apache.ignite.internal.binary.builder.BinaryObjectBuilderImpl;
import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream;
@@ -54,7 +55,7 @@ class ClientBinary implements IgniteBinary {
if (obj == null)
return null;
- if (obj instanceof IgniteBinary)
+ if (BinaryUtils.isBinaryType(obj.getClass()))
return (T)obj;
byte[] objBytes = marsh.marshal(obj);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityContext.java
new file mode 100644
index 0000000..29bff6e
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityContext.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.IgniteBinary;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+
+/**
+ * Client cache affinity awareness context.
+ */
+public class ClientCacheAffinityContext {
+ /** Binary data processor. */
+ private final IgniteBinary binary;
+
+ /** Contains last topology version and known nodes of this version. */
+ private final AtomicReference<TopologyNodes> lastTop = new
AtomicReference<>();
+
+ /** Current affinity mapping. */
+ private volatile ClientCacheAffinityMapping affinityMapping;
+
+ /** Cache IDs, which should be included to the next affinity mapping
request. */
+ private final Set<Integer> pendingCacheIds = new GridConcurrentHashSet<>();
+
+ /**
+ * @param binary Binary data processor.
+ */
+ public ClientCacheAffinityContext(IgniteBinary binary) {
+ this.binary = binary;
+ }
+
+ /**
+ * Update topology version if it's greater than current version and store
nodes for last topology.
+ *
+ * @param topVer Topology version.
+ * @param nodeId Node id.
+ * @return {@code True} if last topology was updated to the new version.
+ */
+ public boolean updateLastTopologyVersion(AffinityTopologyVersion topVer,
UUID nodeId) {
+ while (true) {
+ TopologyNodes lastTop = this.lastTop.get();
+
+ if (lastTop == null || topVer.compareTo(lastTop.topVer) > 0) {
+ if (this.lastTop.compareAndSet(lastTop, new
TopologyNodes(topVer, nodeId)))
+ return true;
+ }
+ else if (topVer.equals(lastTop.topVer)) {
+ lastTop.nodes.add(nodeId);
+
+ return false;
+ }
+ else
+ return false;
+ }
+ }
+
+ /**
+ * Is affinity update required for given cache.
+ *
+ * @param cacheId Cache id.
+ */
+ public boolean affinityUpdateRequired(int cacheId) {
+ TopologyNodes top = lastTop.get();
+
+ if (top == null) { // Don't know current topology.
+ pendingCacheIds.add(cacheId);
+
+ return false;
+ }
+
+ ClientCacheAffinityMapping mapping = affinityMapping;
+
+ if (mapping == null) {
+ pendingCacheIds.add(cacheId);
+
+ return true;
+ }
+
+ if (top.topVer.compareTo(mapping.topologyVersion()) > 0) {
+ pendingCacheIds.add(cacheId);
+
+ return true;
+ }
+
+ if (mapping.cacheIds().contains(cacheId))
+ return false;
+ else {
+ pendingCacheIds.add(cacheId);
+
+ return true;
+ }
+ }
+
+ /**
+ * @param ch Payload output channel.
+ */
+ public void writePartitionsUpdateRequest(PayloadOutputChannel ch) {
+ ClientCacheAffinityMapping.writeRequest(ch, pendingCacheIds);
+ }
+
+ /**
+ * @param ch Payload input channel.
+ */
+ public synchronized boolean
readPartitionsUpdateResponse(PayloadInputChannel ch) {
+ if (lastTop.get() == null)
+ return false;
+
+ ClientCacheAffinityMapping newMapping =
ClientCacheAffinityMapping.readResponse(ch);
+
+ ClientCacheAffinityMapping oldMapping = affinityMapping;
+
+ if (oldMapping == null ||
newMapping.topologyVersion().compareTo(oldMapping.topologyVersion()) > 0) {
+ affinityMapping = newMapping;
+
+ if (oldMapping != null)
+ pendingCacheIds.addAll(oldMapping.cacheIds());
+
+ pendingCacheIds.removeAll(newMapping.cacheIds());
+
+ return true;
+ }
+
+ if (newMapping.topologyVersion().equals(oldMapping.topologyVersion()))
{
+ affinityMapping = ClientCacheAffinityMapping.merge(oldMapping,
newMapping);
+
+ pendingCacheIds.removeAll(newMapping.cacheIds());
+
+ return true;
+ }
+
+ // Obsolete mapping.
+ return true;
+ }
+
+ /**
+ * Gets last topology information.
+ */
+ public TopologyNodes lastTopology() {
+ return lastTop.get();
+ }
+
+ /**
+ * Resets affinity context.
+ *
+ * @param top Topology which triggers reset.
+ */
+ public synchronized void reset(TopologyNodes top) {
+ if (lastTop.compareAndSet(top, null)) {
+ affinityMapping = null;
+
+ pendingCacheIds.clear();
+ }
+ }
+
+ /**
+ * Calculates affinity node for given cache and key.
+ *
+ * @param cacheId Cache ID.
+ * @param key Key.
+ * @return Affinity node id or {@code null} if affinity node can't be
determined for given cache and key.
+ */
+ public UUID affinityNode(int cacheId, Object key) {
+ TopologyNodes top = lastTop.get();
+
+ if (top == null)
+ return null;
+
+ ClientCacheAffinityMapping mapping = affinityMapping;
+
+ if (mapping == null)
+ return null;
+
+ if (top.topVer.compareTo(mapping.topologyVersion()) > 0)
+ return null;
+
+ return mapping.affinityNode(binary, cacheId, key);
+ }
+
+ /**
+ * Holder for list of nodes for topology version.
+ */
+ static class TopologyNodes {
+ /** Topology version. */
+ private final AffinityTopologyVersion topVer;
+
+ /** Nodes. */
+ private final Collection<UUID> nodes = new ConcurrentLinkedQueue<>();
+
+ /**
+ * @param topVer Topology version.
+ * @param nodeId Node id.
+ */
+ private TopologyNodes(AffinityTopologyVersion topVer, UUID nodeId) {
+ this.topVer = topVer;
+
+ nodes.add(nodeId);
+ }
+
+ /**
+ * Gets nodes of this topology.
+ */
+ public Iterable<UUID> nodes() {
+ return Collections.unmodifiableCollection(nodes);
+ }
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityMapping.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityMapping.java
new file mode 100644
index 0000000..84e4074
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityMapping.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.IgniteBinary;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.internal.binary.BinaryObjectExImpl;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Affinity mapping (partition to nodes) for each cache.
+ */
+public class ClientCacheAffinityMapping {
+ /** CacheAffinityInfo for caches with not applicable affinity awareness. */
+ private static final CacheAffinityInfo NOT_APPLICABLE_CACHE_AFFINITY_INFO =
+ new CacheAffinityInfo(null, null);
+
+ /** Topology version. */
+ private final AffinityTopologyVersion topVer;
+
+ /** Affinity information for each cache. */
+ private final Map<Integer, CacheAffinityInfo> cacheAffinity = new
HashMap<>();
+
+ /** Unmodifiable collection of cache IDs. To preserve instance
immutability. */
+ private final Collection<Integer> cacheIds =
Collections.unmodifiableCollection(cacheAffinity.keySet());
+
+ /**
+ * @param topVer Topology version.
+ */
+ private ClientCacheAffinityMapping(AffinityTopologyVersion topVer) {
+ this.topVer = topVer;
+ }
+
+ /**
+ * Gets topology version.
+ */
+ public AffinityTopologyVersion topologyVersion() {
+ return topVer;
+ }
+
+ /**
+ * Gets cache IDs.
+ */
+ public Collection<Integer> cacheIds() {
+ return cacheIds;
+ }
+
+ /**
+ * Calculates affinity node for given cache and key.
+ *
+ * @param binary Binary data processor (needed to extract affinity field
from the key).
+ * @param cacheId Cache ID.
+ * @param key Key.
+ * @return Affinity node id or {@code null} if affinity node can't be
determined for given cache and key.
+ */
+ public UUID affinityNode(IgniteBinary binary, int cacheId, Object key) {
+ CacheAffinityInfo affinityInfo = cacheAffinity.get(cacheId);
+
+ if (affinityInfo == null || affinityInfo ==
NOT_APPLICABLE_CACHE_AFFINITY_INFO)
+ return null;
+
+ Object binaryKey = binary.toBinary(key);
+
+ if (!affinityInfo.keyCfg.isEmpty()) {
+ int typeId = binary.typeId(key.getClass().getName());
+
+ Integer fieldId = affinityInfo.keyCfg.get(typeId);
+
+ if (fieldId != null) {
+ if (binaryKey instanceof BinaryObjectExImpl)
+ binaryKey = ((BinaryObjectExImpl)binaryKey).field(fieldId);
+ else // Can't get field value, affinity node can't be
determined in this case.
+ return null;
+ }
+ }
+
+ return affinityInfo.nodeForKey(binaryKey);
+ }
+
+ /**
+ * Merge specified mappings into one instance.
+ */
+ public static ClientCacheAffinityMapping merge(ClientCacheAffinityMapping
... mappings) {
+ assert !F.isEmpty(mappings);
+
+ ClientCacheAffinityMapping res = new
ClientCacheAffinityMapping(mappings[0].topVer);
+
+ for (ClientCacheAffinityMapping mapping : mappings) {
+ assert res.topVer.equals(mapping.topVer) : "Mappings must have
identical topology versions [res.topVer=" +
+ res.topVer + ", mapping.topVer=" + mapping.topVer + ']';
+
+ for (Map.Entry<Integer, CacheAffinityInfo> entry :
mapping.cacheAffinity.entrySet())
+ res.cacheAffinity.put(entry.getKey(), entry.getValue());
+ }
+
+ return res;
+ }
+
+ /**
+ * Writes caches affinity request to the output channel.
+ *
+ * @param ch Output channel.
+ * @param cacheIds Cache IDs.
+ */
+ public static void writeRequest(PayloadOutputChannel ch,
Collection<Integer> cacheIds) {
+ BinaryOutputStream out = ch.out();
+
+ out.writeInt(cacheIds.size());
+
+ for (int cacheId : cacheIds)
+ out.writeInt(cacheId);
+ }
+
+ /**
+ * Reads caches affinity response from the input channel and creates
{@code ClientCacheAffinityMapping} instance
+ * from this response.
+ *
+ * @param ch Input channel.
+ */
+ public static ClientCacheAffinityMapping readResponse(PayloadInputChannel
ch) {
+ try (BinaryReaderExImpl in = new BinaryReaderExImpl(null, ch.in(),
null, true)) {
+ long topVer = in.readLong();
+ int minorTopVer = in.readInt();
+
+ ClientCacheAffinityMapping aff = new ClientCacheAffinityMapping(
+ new AffinityTopologyVersion(topVer, minorTopVer));
+
+ int mappingsCnt = in.readInt();
+
+ for (int i = 0; i < mappingsCnt; i++) {
+ boolean applicable = in.readBoolean();
+
+ int cachesCnt = in.readInt();
+
+ if (applicable) { // Affinity awareness is applicable for this
caches.
+ Map<Integer, Map<Integer, Integer>> cacheKeyCfg =
U.newHashMap(cachesCnt);
+
+ for (int j = 0; j < cachesCnt; j++)
+ cacheKeyCfg.put(in.readInt(),
readCacheKeyConfiguration(in));
+
+ UUID[] partToNode = readNodePartitions(in);
+
+ for (Map.Entry<Integer, Map<Integer, Integer>> keyCfg :
cacheKeyCfg.entrySet())
+ aff.cacheAffinity.put(keyCfg.getKey(), new
CacheAffinityInfo(keyCfg.getValue(), partToNode));
+ }
+ else { // Affinity awareness is not applicable for this caches.
+ for (int j = 0; j < cachesCnt; j++)
+ aff.cacheAffinity.put(in.readInt(),
NOT_APPLICABLE_CACHE_AFFINITY_INFO);
+ }
+ }
+
+ return aff;
+ }
+ catch (IOException e) {
+ throw new ClientError(e);
+ }
+ }
+
+ /**
+ * @param in Input reader.
+ */
+ private static Map<Integer, Integer>
readCacheKeyConfiguration(BinaryReaderExImpl in) {
+ int keyCfgCnt = in.readInt();
+
+ Map<Integer, Integer> keyCfg = U.newHashMap(keyCfgCnt);
+
+ for (int i = 0; i < keyCfgCnt; i++)
+ keyCfg.put(in.readInt(), in.readInt());
+
+ return keyCfg;
+ }
+
+ /**
+ * @param in Input reader.
+ */
+ private static UUID[] readNodePartitions(BinaryReaderExImpl in) {
+ int nodesCnt = in.readInt();
+
+ int maxPart = -1;
+
+ UUID[] partToNode = new UUID[1024];
+
+ for (int i = 0; i < nodesCnt; i++) {
+ UUID nodeId = in.readUuid();
+
+ int partCnt = in.readInt();
+
+ for (int j = 0; j < partCnt; j++) {
+ int part = in.readInt();
+
+ if (part > maxPart) {
+ maxPart = part;
+
+ // Expand partToNode if needed.
+ if (part >= partToNode.length)
+ partToNode = Arrays.copyOf(partToNode, U.ceilPow2(part
+ 1));
+ }
+
+ partToNode[part] = nodeId;
+ }
+ }
+
+ return Arrays.copyOf(partToNode, maxPart + 1);
+ }
+
+ /**
+ * Class to store affinity information for cache.
+ */
+ private static class CacheAffinityInfo {
+ /** Key configuration. */
+ private final Map<Integer, Integer> keyCfg;
+
+ /** Partition mapping. */
+ private final UUID[] partMapping;
+
+ /** Affinity mask. */
+ private final int affinityMask;
+
+ /**
+ * @param keyCfg Cache key configuration or {@code null} if affinity
awareness is not applicable for this cache.
+ * @param partMapping Partition to node mapping or {@code null} if
affinity awareness is not applicable for
+ * this cache.
+ */
+ private CacheAffinityInfo(Map<Integer, Integer> keyCfg, UUID[]
partMapping) {
+ this.keyCfg = keyCfg;
+ this.partMapping = partMapping;
+ affinityMask = partMapping != null ?
RendezvousAffinityFunction.calculateMask(partMapping.length) : 0;
+ }
+
+ /**
+ * Calculates node for given key.
+ *
+ * @param key Key.
+ */
+ private UUID nodeForKey(Object key) {
+ assert partMapping != null;
+
+ int part = RendezvousAffinityFunction.calculatePartition(key,
affinityMask, partMapping.length);
+
+ return partMapping[part];
+ }
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannel.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannel.java
index de7062e..3cb8790 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannel.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannel.java
@@ -17,10 +17,12 @@
package org.apache.ignite.internal.client.thin;
+import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.ignite.client.ClientAuthorizationException;
import org.apache.ignite.client.ClientConnectionException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
/**
* Processing thin client requests and responses.
@@ -41,4 +43,19 @@ interface ClientChannel extends AutoCloseable {
* @return Server version.
*/
public ProtocolVersion serverVersion();
+
+ /**
+ * @return Server node ID.
+ */
+ public UUID serverNodeId();
+
+ /**
+ * @return Server topology version.
+ */
+ public AffinityTopologyVersion serverTopologyVersion();
+
+ /**
+ * Add topology change listener.
+ */
+ public void addTopologyChangeListener(Consumer<ClientChannel> lsnr);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannelConfiguration.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannelConfiguration.java
index eaf5055..6260d62 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannelConfiguration.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannelConfiguration.java
@@ -29,63 +29,69 @@ import org.apache.ignite.client.SslProtocol;
*/
final class ClientChannelConfiguration {
/** Host. */
- private InetSocketAddress addr;
+ private final InetSocketAddress addr;
/** Ssl mode. */
- private SslMode sslMode;
+ private final SslMode sslMode;
/** Tcp no delay. */
- private boolean tcpNoDelay;
+ private final boolean tcpNoDelay;
/** Timeout. */
- private int timeout;
+ private final int timeout;
/** Send buffer size. */
- private int sndBufSize;
+ private final int sndBufSize;
/** Receive buffer size. */
- private int rcvBufSize;
+ private final int rcvBufSize;
/** Ssl client certificate key store path. */
- private String sslClientCertKeyStorePath;
+ private final String sslClientCertKeyStorePath;
/** Ssl client certificate key store type. */
- private String sslClientCertKeyStoreType;
+ private final String sslClientCertKeyStoreType;
/** Ssl client certificate key store password. */
- private String sslClientCertKeyStorePwd;
+ private final String sslClientCertKeyStorePwd;
/** Ssl trust certificate key store path. */
- private String sslTrustCertKeyStorePath;
+ private final String sslTrustCertKeyStorePath;
/** Ssl trust certificate key store type. */
- private String sslTrustCertKeyStoreType;
+ private final String sslTrustCertKeyStoreType;
/** Ssl trust certificate key store password. */
- private String sslTrustCertKeyStorePwd;
+ private final String sslTrustCertKeyStorePwd;
/** Ssl key algorithm. */
- private String sslKeyAlgorithm;
+ private final String sslKeyAlgorithm;
/** Ssl protocol. */
- private SslProtocol sslProto;
+ private final SslProtocol sslProto;
/** Ssl trust all. */
- private boolean sslTrustAll;
+ private final boolean sslTrustAll;
/** SSL Context Factory. */
- private Factory<SSLContext> sslCtxFactory;
+ private final Factory<SSLContext> sslCtxFactory;
/** User. */
- private String userName;
+ private final String userName;
/** Password. */
- private String userPwd;
+ private final String userPwd;
+
+ /** Reconnect period (for throttling). */
+ private final long reconnectThrottlingPeriod;
+
+ /** Reconnect retries within period (for throttling). */
+ private final int reconnectThrottlingRetries;
/**
* Constructor.
*/
- ClientChannelConfiguration(ClientConfiguration cfg) {
+ ClientChannelConfiguration(ClientConfiguration cfg, InetSocketAddress
addr) {
this.sslMode = cfg.getSslMode();
this.tcpNoDelay = cfg.isTcpNoDelay();
this.timeout = cfg.getTimeout();
@@ -103,6 +109,9 @@ final class ClientChannelConfiguration {
this.sslCtxFactory = cfg.getSslContextFactory();
this.userName = cfg.getUserName();
this.userPwd = cfg.getUserPassword();
+ this.reconnectThrottlingPeriod = cfg.getReconnectThrottlingPeriod();
+ this.reconnectThrottlingRetries = cfg.getReconnectThrottlingRetries();
+ this.addr = addr;
}
/**
@@ -113,15 +122,6 @@ final class ClientChannelConfiguration {
}
/**
- * @param newVal Address.
- */
- public ClientChannelConfiguration setAddress(InetSocketAddress newVal) {
- addr = newVal;
-
- return this;
- }
-
- /**
* @return SSL Mode.
*/
public SslMode getSslMode() {
@@ -239,4 +239,18 @@ final class ClientChannelConfiguration {
public String getUserPassword() {
return userPwd;
}
+
+ /**
+ * @return Reconnect period (for throttling).
+ */
+ public long getReconnectThrottlingPeriod() {
+ return reconnectThrottlingPeriod;
+ }
+
+ /**
+ * @return Reconnect retries within period (for throttling).
+ */
+ public int getReconnectThrottlingRetries() {
+ return reconnectThrottlingRetries;
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
index c0017f3..9091849 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
@@ -44,6 +44,7 @@ enum ClientOperation {
/** Cache get and replace. */CACHE_GET_AND_REPLACE(1006),
/** Cache put if absent. */CACHE_PUT_IF_ABSENT(1002),
/** Cache clear. */CACHE_CLEAR(1013),
+ /** Cache partitions. */CACHE_PARTITIONS(1101),
/** Query scan. */QUERY_SCAN(2000),
/** Query scan cursor get page. */QUERY_SCAN_CURSOR_GET_PAGE(2001),
/** Query sql. */QUERY_SQL(2002),
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
index 2e8deef..abcf484 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
@@ -20,14 +20,20 @@ package org.apache.ignite.internal.client.thin;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Deque;
-import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import org.apache.ignite.IgniteBinary;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.client.ClientConnectionException;
import org.apache.ignite.client.ClientException;
@@ -36,28 +42,44 @@ import
org.apache.ignite.configuration.ClientConnectorConfiguration;
import org.apache.ignite.internal.util.HostAndPortRange;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.NotNull;
/**
- * Adds failover abd thread-safety to {@link ClientChannel}.
+ * Communication channel with failover and affinity awareness.
*/
final class ReliableChannel implements AutoCloseable {
- /** Raw channel. */
- private final Function<ClientChannelConfiguration, Result<ClientChannel>>
chFactory;
+ /** Channel factory. */
+ private final Function<ClientChannelConfiguration, ClientChannel>
chFactory;
- /** Servers count. */
- private final int srvCnt;
+ /** Client channel holders for each configured address. */
+ private final ClientChannelHolder[] channels;
- /** Primary server. */
- private InetSocketAddress primary;
+ /** Index of the current channel. */
+ private int curChIdx;
- /** Backup servers. */
- private final Deque<InetSocketAddress> backups = new LinkedList<>();
+ /** Affinity awareness enabled. */
+ private final boolean affinityAwarenessEnabled;
- /** Channel. */
- private ClientChannel ch;
+ /** Cache affinity awareness context. */
+ private final ClientCacheAffinityContext affinityCtx;
- /** Ignite config. */
- private final ClientConfiguration clientCfg;
+ /** Node channels. */
+ private final Map<UUID, ClientChannelHolder> nodeChannels = new
ConcurrentHashMap<>();
+
+ /** Async tasks thread pool. */
+ private final ExecutorService asyncRunner =
Executors.newSingleThreadExecutor(
+ new ThreadFactory() {
+ @Override public Thread newThread(@NotNull Runnable r) {
+ return new Thread(r, "thin-client-channel-async-runner");
+ }
+ }
+ );
+
+ /** Channels reinit was scheduled. */
+ private final AtomicBoolean scheduledChannelsReinit = new AtomicBoolean();
+
+ /** Affinity map update is in progress. */
+ private final AtomicBoolean affinityUpdateInProgress = new AtomicBoolean();
/** Channel is closed. */
private boolean closed;
@@ -66,8 +88,9 @@ final class ReliableChannel implements AutoCloseable {
* Constructor.
*/
ReliableChannel(
- Function<ClientChannelConfiguration, Result<ClientChannel>> chFactory,
- ClientConfiguration clientCfg
+ Function<ClientChannelConfiguration, ClientChannel> chFactory,
+ ClientConfiguration clientCfg,
+ IgniteBinary binary
) throws ClientException {
if (chFactory == null)
throw new NullPointerException("chFactory");
@@ -76,30 +99,34 @@ final class ReliableChannel implements AutoCloseable {
throw new NullPointerException("clientCfg");
this.chFactory = chFactory;
- this.clientCfg = clientCfg;
List<InetSocketAddress> addrs =
parseAddresses(clientCfg.getAddresses());
- srvCnt = addrs.size();
+ channels = new ClientChannelHolder[addrs.size()];
- primary = addrs.get(new Random().nextInt(addrs.size())); // we already
verified there is at least one address
+ for (int i = 0; i < channels.length; i++)
+ channels[i] = new ClientChannelHolder(new
ClientChannelConfiguration(clientCfg, addrs.get(i)));
- for (InetSocketAddress a : addrs) {
- if (a != primary)
- backups.add(a);
- }
+ curChIdx = new Random().nextInt(channels.length); // We already
verified there is at least one address.
+
+ affinityAwarenessEnabled = clientCfg.isAffinityAwarenessEnabled() &&
channels.length > 1;
+
+ affinityCtx = new ClientCacheAffinityContext(binary);
ClientConnectionException lastEx = null;
- for (int i = 0; i < addrs.size(); i++) {
+ for (int i = 0; i < channels.length; i++) {
try {
- ch = chFactory.apply(new
ClientChannelConfiguration(clientCfg).setAddress(primary)).get();
+ channels[curChIdx].getOrCreateChannel();
+
+ if (affinityAwarenessEnabled)
+ initAllChannelsAsync();
return;
} catch (ClientConnectionException e) {
lastEx = e;
- rollAddress();
+ rollCurrentChannel();
}
}
@@ -107,14 +134,11 @@ final class ReliableChannel implements AutoCloseable {
}
/** {@inheritDoc} */
- @Override public synchronized void close() throws Exception {
+ @Override public synchronized void close() {
closed = true;
- if (ch != null) {
- ch.close();
-
- ch = null;
- }
+ for (ClientChannelHolder hld : channels)
+ hld.closeChannel();
}
/**
@@ -127,7 +151,7 @@ final class ReliableChannel implements AutoCloseable {
) throws ClientException {
ClientConnectionException failure = null;
- for (int i = 0; i < srvCnt; i++) {
+ for (int i = 0; i < channels.length; i++) {
ClientChannel ch = null;
try {
@@ -141,7 +165,7 @@ final class ReliableChannel implements AutoCloseable {
else
failure.addSuppressed(e);
- changeServer(ch);
+ onChannelFailure(ch);
}
}
@@ -164,6 +188,97 @@ final class ReliableChannel implements AutoCloseable {
}
/**
+ * Send request to affinity node and handle response.
+ */
+ public <T> T affinityService(
+ int cacheId,
+ Object key,
+ ClientOperation op,
+ Consumer<PayloadOutputChannel> payloadWriter,
+ Function<PayloadInputChannel, T> payloadReader
+ ) throws ClientException {
+ if (affinityAwarenessEnabled && !nodeChannels.isEmpty() &&
affinityInfoIsUpToDate(cacheId)) {
+ UUID affinityNodeId = affinityCtx.affinityNode(cacheId, key);
+
+ if (affinityNodeId != null) {
+ ClientChannelHolder hld = nodeChannels.get(affinityNodeId);
+
+ if (hld != null) {
+ ClientChannel ch = null;
+
+ try {
+ ch = hld.getOrCreateChannel();
+
+ return ch.service(op, payloadWriter, payloadReader);
+ }
+ catch (ClientConnectionException ignore) {
+ onChannelFailure(hld, ch);
+ }
+ }
+ }
+ }
+
+ // Can't determine affinity node or request to affinity node failed -
proceed with standart failover service.
+ return service(op, payloadWriter, payloadReader);
+ }
+
+ /**
+ * Checks if affinity information for the cache is up to date and tries to
update it if not.
+ *
+ * @return {@code True} if affinity information is up to date, {@code
false} if there is not affinity information
+ * available for this cache or information is obsolete and failed to
update it.
+ */
+ private boolean affinityInfoIsUpToDate(int cacheId) {
+ if (affinityCtx.affinityUpdateRequired(cacheId)) {
+ if (affinityUpdateInProgress.compareAndSet(false, true)) {
+ try {
+ ClientCacheAffinityContext.TopologyNodes lastTop =
affinityCtx.lastTopology();
+
+ if (lastTop == null)
+ return false;
+
+ for (UUID nodeId : lastTop.nodes()) {
+ // Abort iterations when topology changed.
+ if (lastTop != affinityCtx.lastTopology())
+ return false;
+
+ ClientChannelHolder hld = nodeChannels.get(nodeId);
+
+ if (hld != null) {
+ ClientChannel ch = null;
+
+ try {
+ ch = hld.getOrCreateChannel();
+
+ return
ch.service(ClientOperation.CACHE_PARTITIONS,
+ affinityCtx::writePartitionsUpdateRequest,
+ affinityCtx::readPartitionsUpdateResponse);
+ }
+ catch (ClientConnectionException ignore) {
+ onChannelFailure(hld, ch);
+ }
+ }
+ }
+
+ // There is no one alive node found for last topology
version, we should reset affinity context
+ // to let affinity get updated in case of reconnection to
the new cluster (with lower topology
+ // version).
+ affinityCtx.reset(lastTop);
+ }
+ finally {
+ affinityUpdateInProgress.set(false);
+ }
+ }
+
+ // No suitable nodes found to update affinity, failed to execute
service on all nodes or update is already
+ // in progress by another thread.
+ return false;
+ }
+ else
+ return true;
+ }
+
+ /**
* @return host:port_range address lines parsed as {@link
InetSocketAddress}.
*/
private static List<InetSocketAddress> parseAddresses(String[] addrs)
throws ClientException {
@@ -199,34 +314,157 @@ final class ReliableChannel implements AutoCloseable {
if (closed)
throw new ClientException("Channel is closed");
- if (ch == null) {
- try {
- ch = chFactory.apply(new
ClientChannelConfiguration(clientCfg).setAddress(primary)).get();
- }
- catch (ClientConnectionException e) {
- rollAddress();
-
- throw e;
- }
+ try {
+ return channels[curChIdx].getOrCreateChannel();
}
+ catch (ClientConnectionException e) {
+ rollCurrentChannel();
- return ch;
+ throw e;
+ }
}
/** */
- private void rollAddress() {
- if (!backups.isEmpty()) {
- backups.addLast(primary);
+ private synchronized void rollCurrentChannel() {
+ if (++curChIdx >= channels.length)
+ curChIdx = 0;
+ }
- primary = backups.removeFirst();
+ /**
+ * On current channel failure.
+ */
+ private synchronized void onChannelFailure(ClientChannel ch) {
+ // There is nothing wrong if curChIdx was concurrently changed, since
channel was closed by another thread
+ // when current index was changed and no other wrong channel will be
closed by current thread because
+ // onChannelFailure checks channel binded to the holder before closing
it.
+ onChannelFailure(channels[curChIdx], ch);
+ }
+
+ /**
+ * On channel of the specified holder failure.
+ */
+ private synchronized void onChannelFailure(ClientChannelHolder hld,
ClientChannel ch) {
+ if (ch == hld.ch && ch != null) {
+ hld.closeChannel();
+
+ if (hld == channels[curChIdx])
+ rollCurrentChannel();
}
}
- /** */
- private synchronized void changeServer(ClientChannel oldCh) {
- if (oldCh == ch && ch != null) {
- rollAddress();
+ /**
+ * Asynchronously try to establish a connection to all configured servers.
+ */
+ private void initAllChannelsAsync() {
+ // Skip if there is already channels reinit scheduled.
+ if (scheduledChannelsReinit.compareAndSet(false, true)) {
+ asyncRunner.submit(
+ () -> {
+ scheduledChannelsReinit.set(false);
+
+ for (ClientChannelHolder hld : channels) {
+ if (scheduledChannelsReinit.get())
+ return; // New reinit task scheduled.
+
+ try {
+ hld.getOrCreateChannel(true);
+ }
+ catch (Exception ignore) {
+ // No-op.
+ }
+ }
+ }
+ );
+ }
+ }
+
+ /**
+ * Topology version change detected on the channel.
+ *
+ * @param ch Channel.
+ */
+ private void onTopologyChanged(ClientChannel ch) {
+ if (affinityAwarenessEnabled &&
affinityCtx.updateLastTopologyVersion(ch.serverTopologyVersion(),
+ ch.serverNodeId()))
+ initAllChannelsAsync();
+ }
+
+ /**
+ * Channels holder.
+ */
+ private class ClientChannelHolder {
+ /** Channel configuration. */
+ private final ClientChannelConfiguration chCfg;
+
+ /** Channel. */
+ private volatile ClientChannel ch;
+
+ /** Timestamps of reconnect retries. */
+ private final long[] reconnectRetries;
+
+ /**
+ * @param chCfg Channel config.
+ */
+ private ClientChannelHolder(ClientChannelConfiguration chCfg) {
+ this.chCfg = chCfg;
+
+ reconnectRetries = chCfg.getReconnectThrottlingRetries() > 0 &&
chCfg.getReconnectThrottlingPeriod() > 0L ?
+ new long[chCfg.getReconnectThrottlingRetries()] : null;
+ }
+
+ /**
+ * @return Whether reconnect throttling should be applied.
+ */
+ private boolean applyReconnectionThrottling() {
+ if (reconnectRetries == null)
+ return false;
+
+ long ts = System.currentTimeMillis();
+
+ for (int i = 0; i < reconnectRetries.length; i++) {
+ if (ts - reconnectRetries[i] >=
chCfg.getReconnectThrottlingPeriod()) {
+ reconnectRetries[i] = ts;
+
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * Get or create channel.
+ */
+ private synchronized ClientChannel getOrCreateChannel() {
+ return getOrCreateChannel(false);
+ }
+
+ /**
+ * Get or create channel.
+ */
+ private synchronized ClientChannel getOrCreateChannel(boolean
ignoreThrottling) {
+ if (ch == null) {
+ if (!ignoreThrottling && applyReconnectionThrottling())
+ throw new ClientConnectionException("Reconnect is not
allowed due to applied throttling");
+
+ ch = chFactory.apply(chCfg);
+
+ if (ch.serverNodeId() != null) {
+
ch.addTopologyChangeListener(ReliableChannel.this::onTopologyChanged);
+
+ nodeChannels.values().remove(this);
+
+ nodeChannels.putIfAbsent(ch.serverNodeId(), this);
+ }
+ }
+
+ return ch;
+ }
+ /**
+ * Close channel.
+ */
+ private synchronized void closeChannel() {
U.closeQuiet(ch);
ch = null;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/Result.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/Result.java
deleted file mode 100644
index 865599f..0000000
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/Result.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.client.thin;
-
-import org.apache.ignite.client.ClientException;
-
-/**
- * Result of a function throwing an exception.
- */
-final class Result<T> {
- /** Value. */
- private final T val;
-
- /** Exception. */
- private final ClientException ex;
-
- /**
- * Initializes a successful result.
- */
- Result(T val) {
- this.val = val;
- ex = null;
- }
-
- /**
- * Initializes a failed result.
- */
- Result(ClientException ex) {
- if (ex == null)
- throw new NullPointerException("ex");
-
- this.ex = ex;
- val = null;
- }
-
- /**
- * @return Value;
- */
- public T get() throws ClientException {
- if (ex != null)
- throw ex;
-
- return val;
- }
-}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
index e46bb16..fa07556 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
+import java.util.function.Function;
import java.util.stream.Collectors;
import javax.cache.Cache;
import org.apache.ignite.cache.CachePeekMode;
@@ -89,12 +90,10 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
if (key == null)
throw new NullPointerException("key");
- return ch.service(
+ return cacheSingleKeyOperation(
+ key,
ClientOperation.CACHE_GET,
- req -> {
- writeCacheInfo(req);
- writeObject(req, key);
- },
+ null,
this::readObject
);
}
@@ -107,13 +106,11 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
if (val == null)
throw new NullPointerException("val");
- ch.request(
+ cacheSingleKeyOperation(
+ key,
ClientOperation.CACHE_PUT,
- req -> {
- writeCacheInfo(req);
- writeObject(req, key);
- writeObject(req, val);
- }
+ req -> writeObject(req, val),
+ null
);
}
@@ -122,12 +119,10 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
if (key == null)
throw new NullPointerException("key");
- return ch.service(
+ return cacheSingleKeyOperation(
+ key,
ClientOperation.CACHE_CONTAINS_KEY,
- req -> {
- writeCacheInfo(req);
- writeObject(req, key);
- },
+ null,
res -> res.in().readBoolean()
);
}
@@ -220,11 +215,10 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
if (newVal == null)
throw new NullPointerException("newVal");
- return ch.service(
+ return cacheSingleKeyOperation(
+ key,
ClientOperation.CACHE_REPLACE_IF_EQUALS,
req -> {
- writeCacheInfo(req);
- writeObject(req, key);
writeObject(req, oldVal);
writeObject(req, newVal);
},
@@ -240,13 +234,10 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
if (val == null)
throw new NullPointerException("val");
- return ch.service(
+ return cacheSingleKeyOperation(
+ key,
ClientOperation.CACHE_REPLACE,
- req -> {
- writeCacheInfo(req);
- writeObject(req, key);
- writeObject(req, val);
- },
+ req -> writeObject(req, val),
res -> res.in().readBoolean()
);
}
@@ -256,12 +247,10 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
if (key == null)
throw new NullPointerException("key");
- return ch.service(
+ return cacheSingleKeyOperation(
+ key,
ClientOperation.CACHE_REMOVE_KEY,
- req -> {
- writeCacheInfo(req);
- writeObject(req, key);
- },
+ null,
res -> res.in().readBoolean()
);
}
@@ -274,13 +263,10 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
if (oldVal == null)
throw new NullPointerException("oldVal");
- return ch.service(
+ return cacheSingleKeyOperation(
+ key,
ClientOperation.CACHE_REMOVE_IF_EQUALS,
- req -> {
- writeCacheInfo(req);
- writeObject(req, key);
- writeObject(req, oldVal);
- },
+ req -> writeObject(req, oldVal),
res -> res.in().readBoolean()
);
}
@@ -315,13 +301,10 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
if (val == null)
throw new NullPointerException("val");
- return ch.service(
+ return cacheSingleKeyOperation(
+ key,
ClientOperation.CACHE_GET_AND_PUT,
- req -> {
- writeCacheInfo(req);
- writeObject(req, key);
- writeObject(req, val);
- },
+ req -> writeObject(req, val),
this::readObject
);
}
@@ -331,12 +314,10 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
if (key == null)
throw new NullPointerException("key");
- return ch.service(
+ return cacheSingleKeyOperation(
+ key,
ClientOperation.CACHE_GET_AND_REMOVE,
- req -> {
- writeCacheInfo(req);
- writeObject(req, key);
- },
+ null,
this::readObject
);
}
@@ -349,13 +330,10 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
if (val == null)
throw new NullPointerException("val");
- return ch.service(
+ return cacheSingleKeyOperation(
+ key,
ClientOperation.CACHE_GET_AND_REPLACE,
- req -> {
- writeCacheInfo(req);
- writeObject(req, key);
- writeObject(req, val);
- },
+ req -> writeObject(req, val),
this::readObject
);
}
@@ -368,13 +346,10 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
if (val == null)
throw new NullPointerException("val");
- return ch.service(
+ return cacheSingleKeyOperation(
+ key,
ClientOperation.CACHE_PUT_IF_ABSENT,
- req -> {
- writeCacheInfo(req);
- writeObject(req, key);
- writeObject(req, val);
- },
+ req -> writeObject(req, val),
res -> res.in().readBoolean()
);
}
@@ -506,6 +481,30 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
));
}
+
+ /**
+ * Execute cache operation with a single key.
+ */
+ private <T> T cacheSingleKeyOperation(
+ K key,
+ ClientOperation op,
+ Consumer<PayloadOutputChannel> additionalPayloadWriter,
+ Function<PayloadInputChannel, T> payloadReader
+ ) throws ClientException {
+ Consumer<PayloadOutputChannel> payloadWriter = req -> {
+ writeCacheInfo(req);
+ writeObject(req, key);
+
+ if (additionalPayloadWriter != null)
+ additionalPayloadWriter.accept(req);
+ };
+
+ // Transactional operation cannot be executed on affinity node, it
should be executed on node started
+ // the transaction.
+ return transactions.tx() == null ? ch.affinityService(cacheId, key,
op, payloadWriter, payloadReader) :
+ ch.service(op, payloadWriter, payloadReader);
+ }
+
/** Write cache ID and flags. */
private void writeCacheInfo(PayloadOutputChannel payloadCh) {
BinaryOutputStream out = payloadCh.out();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
index e31c614..e23e20b 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
@@ -35,7 +35,9 @@ import java.security.cert.X509Certificate;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -70,6 +72,7 @@ import
org.apache.ignite.internal.binary.streams.BinaryHeapInputStream;
import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream;
import org.apache.ignite.internal.binary.streams.BinaryInputStream;
import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.platform.client.ClientFlag;
import org.apache.ignite.internal.processors.platform.client.ClientStatus;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -101,6 +104,12 @@ class TcpClientChannel implements ClientChannel {
/** Protocol version agreed with the server. */
private ProtocolVersion ver = CURRENT_VER;
+ /** Server node ID. */
+ private UUID srvNodeId;
+
+ /** Server topology version. */
+ private AffinityTopologyVersion srvTopVer;
+
/** Channel. */
private final Socket sock;
@@ -122,6 +131,9 @@ class TcpClientChannel implements ClientChannel {
/** Pending requests. */
private final Map<Long, ClientRequestFuture> pendingReqs = new
ConcurrentHashMap<>();
+ /** Topology change listeners. */
+ private final Collection<Consumer<ClientChannel>> topChangeLsnrs = new
CopyOnWriteArrayList<>();
+
/** Constructor. */
TcpClientChannel(ClientChannelConfiguration cfg) throws
ClientConnectionException, ClientAuthenticationException {
validateConfiguration(cfg);
@@ -277,9 +289,13 @@ class TcpClientChannel implements ClientChannel {
short flags = dataInput.readShort();
if ((flags & ClientFlag.AFFINITY_TOPOLOGY_CHANGED) != 0) {
- // TODO: IGNITE-11898 Implement Best Effort Affinity for java
thin client.
- dataInput.readLong(); // topVer.
- dataInput.readInt(); // minorTopVer.
+ long topVer = dataInput.readLong();
+ int minorTopVer = dataInput.readInt();
+
+ srvTopVer = new AffinityTopologyVersion(topVer, minorTopVer);
+
+ for (Consumer<ClientChannel> lsnr : topChangeLsnrs)
+ lsnr.accept(this);
}
if ((flags & ClientFlag.ERROR) != 0)
@@ -315,6 +331,21 @@ class TcpClientChannel implements ClientChannel {
return ver;
}
+ /** {@inheritDoc} */
+ @Override public UUID serverNodeId() {
+ return srvNodeId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public AffinityTopologyVersion serverTopologyVersion() {
+ return srvTopVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void addTopologyChangeListener(Consumer<ClientChannel>
lsnr) {
+ topChangeLsnrs.add(lsnr);
+ }
+
/** Validate {@link ClientConfiguration}. */
private static void validateConfiguration(ClientChannelConfiguration cfg) {
String error = null;
@@ -402,8 +433,7 @@ class TcpClientChannel implements ClientChannel {
try (BinaryReaderExImpl r = new BinaryReaderExImpl(null, res, null,
true)) {
if (res.readBoolean()) { // Success flag.
if (ver.compareTo(V1_4_0) >= 0)
- // TODO: IGNITE-11898 Implement Best Effort Affinity for
java thin client.
- r.readUuid(); // Server node UUID.
+ srvNodeId = r.readUuid(); // Server node UUID.
}
else {
ProtocolVersion srvVer = new ProtocolVersion(res.readShort(),
res.readShort(), res.readShort());
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
index 2f6e432..b6226a1 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
@@ -75,20 +75,19 @@ public class TcpIgniteClient implements IgniteClient {
/**
* Private constructor. Use {@link
TcpIgniteClient#start(ClientConfiguration)} to create an instance of
- * {@link TcpClientChannel}.
+ * {@code TcpIgniteClient}.
*/
private TcpIgniteClient(ClientConfiguration cfg) throws ClientException {
- Function<ClientChannelConfiguration, Result<ClientChannel>> chFactory
= chCfg -> {
- try {
- return new Result<>(new TcpClientChannel(chCfg));
- }
- catch (ClientException e) {
- return new Result<>(e);
- }
- };
-
- ch = new ReliableChannel(chFactory, cfg);
+ this(TcpClientChannel::new, cfg);
+ }
+ /**
+ * Constructor with custom channel factory.
+ */
+ TcpIgniteClient(
+ Function<ClientChannelConfiguration, ClientChannel> chFactory,
+ ClientConfiguration cfg
+ ) throws ClientException {
marsh = new ClientBinaryMarshaller(new ClientBinaryMetadataHandler(),
new ClientMarshallerContext());
marsh.setBinaryConfiguration(cfg.getBinaryConfiguration());
@@ -97,6 +96,8 @@ public class TcpIgniteClient implements IgniteClient {
binary = new ClientBinary(marsh);
+ ch = new ReliableChannel(chFactory, cfg, binary);
+
transactions = new TcpClientTransactions(ch, marsh,
new
ClientTransactionConfiguration(cfg.getTransactionConfiguration()));
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
b/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
index 1bb3698..a746c3d 100644
--- a/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
@@ -61,6 +61,7 @@ public class ReliabilityTest extends GridCommonAbstractTest {
try (LocalIgniteCluster cluster =
LocalIgniteCluster.start(CLUSTER_SIZE);
IgniteClient client = Ignition.startClient(new
ClientConfiguration()
+ .setReconnectThrottlingRetries(0) // Disable throttling.
.setAddresses(cluster.clientAddresses().toArray(new
String[CLUSTER_SIZE]))
)
) {
@@ -262,6 +263,51 @@ public class ReliabilityTest extends
GridCommonAbstractTest {
}
/**
+ * Test reconnection throttling.
+ */
+ @Test
+ @SuppressWarnings("ThrowableNotThrown")
+ public void testReconnectionThrottling() throws Exception {
+ int throttlingRetries = 5;
+ long throttlingPeriod = 3_000L;
+
+ try (LocalIgniteCluster cluster = LocalIgniteCluster.start(1);
+ IgniteClient client = Ignition.startClient(new
ClientConfiguration()
+ .setReconnectThrottlingPeriod(throttlingPeriod)
+ .setReconnectThrottlingRetries(throttlingRetries)
+ .setAddresses(cluster.clientAddresses().toArray(new
String[1])))
+ ) {
+ ClientCache<Integer, Integer> cache = client.createCache("cache");
+
+ for (int i = 0; i < throttlingRetries; i++) {
+ // Attempts to reconnect within throttlingRetries should pass.
+ cache.put(0, 0);
+
+ dropAllThinClientConnections(Ignition.allGrids().get(0));
+
+ GridTestUtils.assertThrowsWithCause(() -> cache.put(0, 0),
ClientConnectionException.class);
+ }
+
+ for (int i = 0; i < 10; i++) // Attempts to reconnect after
throttlingRetries should fail.
+ GridTestUtils.assertThrowsWithCause(() -> cache.put(0, 0),
ClientConnectionException.class);
+
+ doSleep(throttlingPeriod);
+
+ // Attempt to reconnect after throttlingPeriod should pass.
+ assertTrue(GridTestUtils.waitForCondition(() -> {
+ try {
+ cache.put(0, 0);
+
+ return true;
+ }
+ catch (ClientConnectionException e) {
+ return false;
+ }
+ }, throttlingPeriod));
+ }
+ }
+
+ /**
* Drop all thin client connections on given Ignite instance.
*
* @param ignite Ignite.
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractAffinityAwarenessTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractAffinityAwarenessTest.java
new file mode 100644
index 0000000..d26815d
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractAffinityAwarenessTest.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.ignite.cache.CacheKeyConfiguration;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.AffinityKeyMapped;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.client.ClientException;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static
org.apache.ignite.configuration.ClientConnectorConfiguration.DFLT_PORT;
+
+/**
+ * Abstract thin client affinity awareness test.
+ */
+public abstract class ThinClientAbstractAffinityAwarenessTest extends
GridCommonAbstractTest {
+ /** Wait timeout. */
+ private static final long WAIT_TIMEOUT = 5_000L;
+
+ /** Replicated cache name. */
+ protected static final String REPL_CACHE_NAME = "replicated_cache";
+
+ /** Partitioned cache name. */
+ protected static final String PART_CACHE_NAME = "partitioned_cache";
+
+ /** Partitioned with custom affinity cache name. */
+ protected static final String PART_CUSTOM_AFFINITY_CACHE_NAME =
"partitioned_custom_affinity_cache";
+
+ /** Keys count. */
+ protected static final int KEY_CNT = 30;
+
+ /** Max cluster size. */
+ protected static final int MAX_CLUSTER_SIZE = 4;
+
+ /** Channels. */
+ protected final TestTcpClientChannel[] channels = new
TestTcpClientChannel[MAX_CLUSTER_SIZE];
+
+ /** Operations queue. */
+ protected final Queue<T2<TestTcpClientChannel, ClientOperation>> opsQueue
= new ConcurrentLinkedQueue<>();
+
+ /** Default channel. */
+ protected TestTcpClientChannel dfltCh;
+
+ /** Client instance. */
+ protected IgniteClient client;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setConsistentId(igniteInstanceName);
+
+ CacheConfiguration ccfg0 = new CacheConfiguration()
+ .setName(REPL_CACHE_NAME)
+ .setCacheMode(CacheMode.REPLICATED);
+
+ CacheConfiguration ccfg1 = new CacheConfiguration()
+ .setName(PART_CUSTOM_AFFINITY_CACHE_NAME)
+ .setCacheMode(CacheMode.PARTITIONED)
+ .setAffinity(new CustomAffinityFunction());
+
+ CacheConfiguration ccfg2 = new CacheConfiguration()
+ .setName(PART_CACHE_NAME)
+ .setCacheMode(CacheMode.PARTITIONED)
+ .setKeyConfiguration(
+ new
CacheKeyConfiguration(TestNotAnnotatedAffinityKey.class.getName(),
"affinityKey"),
+ new CacheKeyConfiguration(TestAnnotatedAffinityKey.class));
+
+ return cfg.setCacheConfiguration(ccfg0, ccfg1, ccfg2);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ opsQueue.clear();
+ }
+
+ /**
+ * Checks that operation goes through specified channel.
+ */
+ protected void assertOpOnChannel(TestTcpClientChannel expCh,
ClientOperation expOp) {
+ T2<TestTcpClientChannel, ClientOperation> nextChOp = opsQueue.poll();
+
+ assertNotNull("Unexpected (null) next operation [expCh=" + expCh + ",
expOp=" + expOp + ']', nextChOp);
+
+ assertEquals("Unexpected channel for opertation [expCh=" + expCh + ",
expOp=" + expOp +
+ ", nextOpCh=" + nextChOp + ']', expCh, nextChOp.get1());
+
+ assertEquals("Unexpected operation on channel [expCh=" + expCh + ",
expOp=" + expOp +
+ ", nextOpCh=" + nextChOp + ']', expOp, nextChOp.get2());
+ }
+
+ /**
+ * Calculates affinity channel for cache and key.
+ */
+ protected TestTcpClientChannel affinityChannel(Object key,
IgniteInternalCache<Object, Object> cache) {
+ Collection<ClusterNode> nodes =
cache.affinity().mapKeyToPrimaryAndBackups(key);
+
+ UUID nodeId = nodes.iterator().next().id();
+
+ for (int i = 0; i < channels.length; i++) {
+ if (channels[i] != null &&
nodeId.equals(channels[i].serverNodeId()))
+ return channels[i];
+ }
+
+ return dfltCh;
+ }
+
+ /**
+ * @param nodeIdxs Node indexes to connect to.
+ */
+ protected ClientConfiguration getClientConfiguration(int... nodeIdxs) {
+ String addrs[] = Arrays.stream(nodeIdxs).mapToObj(nodeIdx ->
"127.0.0.1:" + (DFLT_PORT + nodeIdx))
+ .toArray(String[]::new);
+
+ return new
ClientConfiguration().setAddresses(addrs).setAffinityAwarenessEnabled(true);
+ }
+
+ /**
+ * @param clientCfg Client configuration.
+ * @param chIdxs Channels to wait for initialization.
+ */
+ protected void initClient(ClientConfiguration clientCfg, int... chIdxs)
throws IgniteInterruptedCheckedException {
+ client = new TcpIgniteClient(cfg -> {
+ try {
+ log.info("Establishing connection to " + cfg.getAddress());
+
+ TcpClientChannel ch = new TestTcpClientChannel(cfg);
+
+ log.info("Channel initialized: " + ch);
+
+ return ch;
+ }
+ catch (Exception e) {
+ log.warning("Failed to initialize channel: " + e.getMessage());
+
+ throw e;
+ }
+ }, clientCfg);
+
+ awaitChannelsInit(chIdxs);
+
+ initDefaultChannel();
+ }
+
+ /**
+ *
+ */
+ protected void initDefaultChannel() {
+ opsQueue.clear();
+
+ // Send non-affinity request to determine default channel.
+ client.getOrCreateCache(REPL_CACHE_NAME);
+
+ T2<TestTcpClientChannel, ClientOperation> nextChOp = opsQueue.poll();
+
+ assertNotNull(nextChOp);
+
+ assertEquals(nextChOp.get2(),
ClientOperation.CACHE_GET_OR_CREATE_WITH_NAME);
+
+ dfltCh = nextChOp.get1();
+ }
+
+ /**
+ * @param chIdxs Channel idxs.
+ */
+ protected void awaitChannelsInit(int... chIdxs) throws
IgniteInterruptedCheckedException {
+ // Wait until all channels initialized.
+ for (int ch : chIdxs) {
+ assertTrue("Failed to wait for channel[" + ch + "] init",
+ GridTestUtils.waitForCondition(() -> channels[ch] != null,
WAIT_TIMEOUT));
+ }
+ }
+
+ /**
+ *
+ */
+ private static class CustomAffinityFunction extends
RendezvousAffinityFunction {
+ // No-op.
+ }
+
+ /**
+ * Test class without affinity key.
+ */
+ protected static class TestComplexKey {
+ /** */
+ int firstField;
+
+ /** Another field. */
+ int secondField;
+
+ /** */
+ public TestComplexKey(int firstField, int secondField) {
+ this.firstField = firstField;
+ this.secondField = secondField;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return firstField + secondField;
+ }
+ }
+
+ /**
+ * Test class with annotated affinity key.
+ */
+ protected static class TestAnnotatedAffinityKey {
+ /** */
+ @AffinityKeyMapped
+ int affinityKey;
+
+ /** */
+ int anotherField;
+
+ /** */
+ public TestAnnotatedAffinityKey(int affinityKey, int anotherField) {
+ this.affinityKey = affinityKey;
+ this.anotherField = anotherField;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return affinityKey + anotherField;
+ }
+ }
+
+ /**
+ * Test class with affinity key without annotation.
+ */
+ protected static class TestNotAnnotatedAffinityKey {
+ /** */
+ TestComplexKey affinityKey;
+
+ /** */
+ int anotherField;
+
+ /** */
+ public TestNotAnnotatedAffinityKey(TestComplexKey affinityKey, int
anotherField) {
+ this.affinityKey = affinityKey;
+ this.anotherField = anotherField;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return affinityKey.hashCode() + anotherField;
+ }
+ }
+
+ /**
+ * Test TCP client channel.
+ */
+ protected class TestTcpClientChannel extends TcpClientChannel {
+ /** Channel configuration. */
+ private final ClientChannelConfiguration cfg;
+
+ /**
+ * @param cfg Config.
+ */
+ public TestTcpClientChannel(ClientChannelConfiguration cfg) {
+ super(cfg);
+
+ this.cfg = cfg;
+
+ int chIdx = cfg.getAddress().getPort() - DFLT_PORT;
+
+ channels[chIdx] = this;
+
+ addTopologyChangeListener(ch -> log.info("Topology change detected
[ch=" + ch + ", topVer=" +
+ ch.serverTopologyVersion() + ']'));
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> T service(ClientOperation op,
Consumer<PayloadOutputChannel> payloadWriter,
+ Function<PayloadInputChannel, T> payloadReader) throws
ClientException {
+ T res = super.service(op, payloadWriter, payloadReader);
+
+ // Store all operations except binary type registration in queue
to check later.
+ if (op != ClientOperation.REGISTER_BINARY_TYPE_NAME && op !=
ClientOperation.PUT_BINARY_TYPE)
+ opsQueue.offer(new T2<>(this, op));
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return cfg.getAddress().toString();
+ }
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAffinityAwarenessStableTopologyTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAffinityAwarenessStableTopologyTest.java
new file mode 100644
index 0000000..7854d4b
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAffinityAwarenessStableTopologyTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.function.Function;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.client.ClientCache;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.junit.Test;
+
+/**
+ * Test affinity awareness of thin client on stable topology.
+ */
+public class ThinClientAffinityAwarenessStableTopologyTest extends
ThinClientAbstractAffinityAwarenessTest {
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGrids(3);
+
+ awaitPartitionMapExchange();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ // Add one extra node address to the list, skip 0 node.
+ initClient(getClientConfiguration(1, 2, 3), 1, 2);
+ }
+
+ /**
+ * Test that affinity awareness is not applicable for replicated cache.
+ */
+ @Test
+ public void testReplicatedCache() {
+ testNotApplicableCache(REPL_CACHE_NAME);
+ }
+
+ /**
+ * Test that affinity awareness is not applicable for partitioned cache
with custom affinity function.
+ */
+ @Test
+ public void testPartitionedCustomAffinityCache() {
+ testNotApplicableCache(PART_CUSTOM_AFFINITY_CACHE_NAME);
+ }
+
+ /**
+ * Test affinity awareness for all applicable operation types for
partitioned cache with primitive key.
+ */
+ @Test
+ public void testPartitionedCachePrimitiveKey() {
+ testApplicableCache(PART_CACHE_NAME, i -> i);
+ }
+
+ /**
+ * Test affinity awareness for all applicable operation types for
partitioned cache with complex key.
+ */
+ @Test
+ public void testPartitionedCacheComplexKey() {
+ testApplicableCache(PART_CACHE_NAME, i -> new TestComplexKey(i, i));
+ }
+
+ /**
+ * Test affinity awareness for all applicable operation types for
partitioned cache with annotated affinity
+ * mapped key.
+ */
+ @Test
+ public void testPartitionedCacheAnnotatedAffinityKey() {
+ testApplicableCache(PART_CACHE_NAME, i -> new
TestAnnotatedAffinityKey(i, i));
+ }
+
+ /**
+ * Test affinity awareness for all applicable operation types for
partitioned cache with not annotated affinity
+ * mapped key.
+ */
+ @Test
+ public void testPartitionedCacheNotAnnotatedAffinityKey() {
+ testApplicableCache(PART_CACHE_NAME, i -> new
TestNotAnnotatedAffinityKey(new TestComplexKey(i, i), i));
+ }
+
+ /**
+ * Test request to partition mapped to unknown for client node.
+ */
+ @Test
+ public void testPartitionedCacheUnknownNode() throws
IgniteCheckedException {
+ ClientCache<Object, Object> clientCache =
client.cache(PART_CACHE_NAME);
+
+ // We don't included grid(0) address to list of addresses known for
the client, so client don't have connection
+ // with grid(0)
+ Integer keyForUnknownNode = primaryKey(grid(0).cache(PART_CACHE_NAME));
+
+ assertNotNull("Not found key for node " + grid(0).localNode().id(),
keyForUnknownNode);
+
+ clientCache.put(keyForUnknownNode, 0);
+
+ assertOpOnChannel(dfltCh, ClientOperation.CACHE_PARTITIONS);
+ assertOpOnChannel(dfltCh, ClientOperation.CACHE_PUT);
+ }
+
+ /**
+ * @param cacheName Cache name.
+ */
+ private void testNotApplicableCache(String cacheName) {
+ ClientCache<Object, Object> cache = client.cache(cacheName);
+
+ // After first response we should send partitions request on default
channel together with next request.
+ cache.put(0, 0);
+
+ assertOpOnChannel(dfltCh, ClientOperation.CACHE_PARTITIONS);
+ assertOpOnChannel(dfltCh, ClientOperation.CACHE_PUT);
+
+ for (int i = 1; i < KEY_CNT; i++) {
+ cache.put(i, i);
+
+ assertOpOnChannel(dfltCh, ClientOperation.CACHE_PUT);
+
+ cache.get(i);
+
+ assertOpOnChannel(dfltCh, ClientOperation.CACHE_GET);
+ }
+ }
+
+ /**
+ * @param cacheName Cache name.
+ * @param keyFactory Key factory function.
+ */
+ private void testApplicableCache(String cacheName, Function<Integer,
Object> keyFactory) {
+ ClientCache<Object, Object> clientCache = client.cache(cacheName);
+ IgniteInternalCache<Object, Object> igniteCache =
grid(0).context().cache().cache(cacheName);
+
+ clientCache.put(keyFactory.apply(0), 0);
+
+ TestTcpClientChannel opCh = affinityChannel(keyFactory.apply(0),
igniteCache);
+
+ // Default channel is the first who detects topology change, so next
partition request will go through
+ // the default channel.
+ assertOpOnChannel(dfltCh, ClientOperation.CACHE_PARTITIONS);
+ assertOpOnChannel(opCh, ClientOperation.CACHE_PUT);
+
+ for (int i = 1; i < KEY_CNT; i++) {
+ Object key = keyFactory.apply(i);
+
+ opCh = affinityChannel(key, igniteCache);
+
+ clientCache.put(key, key);
+
+ assertOpOnChannel(opCh, ClientOperation.CACHE_PUT);
+
+ clientCache.get(key);
+
+ assertOpOnChannel(opCh, ClientOperation.CACHE_GET);
+
+ clientCache.containsKey(key);
+
+ assertOpOnChannel(opCh, ClientOperation.CACHE_CONTAINS_KEY);
+
+ clientCache.replace(key, i);
+
+ assertOpOnChannel(opCh, ClientOperation.CACHE_REPLACE);
+
+ clientCache.replace(key, i, i);
+
+ assertOpOnChannel(opCh, ClientOperation.CACHE_REPLACE_IF_EQUALS);
+
+ clientCache.remove(key);
+
+ assertOpOnChannel(opCh, ClientOperation.CACHE_REMOVE_KEY);
+
+ clientCache.remove(key, i);
+
+ assertOpOnChannel(opCh, ClientOperation.CACHE_REMOVE_IF_EQUALS);
+
+ clientCache.getAndPut(key, i);
+
+ assertOpOnChannel(opCh, ClientOperation.CACHE_GET_AND_PUT);
+
+ clientCache.getAndRemove(key);
+
+ assertOpOnChannel(opCh, ClientOperation.CACHE_GET_AND_REMOVE);
+
+ clientCache.getAndReplace(key, i);
+
+ assertOpOnChannel(opCh, ClientOperation.CACHE_GET_AND_REPLACE);
+
+ clientCache.putIfAbsent(key, i);
+
+ assertOpOnChannel(opCh, ClientOperation.CACHE_PUT_IF_ABSENT);
+ }
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAffinityAwarenessUnstableTopologyTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAffinityAwarenessUnstableTopologyTest.java
new file mode 100644
index 0000000..fcf64d6
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAffinityAwarenessUnstableTopologyTest.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.lang.management.ManagementFactory;
+import javax.management.MBeanServerInvocationHandler;
+import javax.management.ObjectName;
+import org.apache.ignite.client.ClientCache;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.mxbean.ClientProcessorMXBean;
+import org.junit.Test;
+
+/**
+ * Test affinity awareness of thin client on unstable topology.
+ */
+public class ThinClientAffinityAwarenessUnstableTopologyTest extends
ThinClientAbstractAffinityAwarenessTest {
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+ /**
+ * Test that join of the new node is detected by the client and affects
affinity awareness.
+ */
+ @Test
+ public void testAffinityAwarenessOnNodeJoin() throws Exception {
+ startGrids(3);
+
+ awaitPartitionMapExchange();
+
+ initClient(getClientConfiguration(1, 2, 3), 1, 2);
+
+ // Test affinity awareness before node join.
+ testAffinityAwareness(true);
+
+ assertNull(channels[3]);
+
+ startGrid(3);
+
+ awaitPartitionMapExchange();
+
+ // Send non-affinity request to detect topology change.
+ ClientCache<Object, Object> cache =
client.getOrCreateCache(PART_CACHE_NAME);
+
+ awaitChannelsInit(3);
+
+ assertOpOnChannel(dfltCh,
ClientOperation.CACHE_GET_OR_CREATE_WITH_NAME);
+
+ Integer key = primaryKey(grid(3).cache(PART_CACHE_NAME));
+
+ assertNotNull("Not found key for node 3", key);
+
+ cache.put(key, 0);
+
+ // Cache partitions are requested from default channel, since it's
first (and currently the only) channel
+ // which detects new topology.
+ assertOpOnChannel(dfltCh, ClientOperation.CACHE_PARTITIONS);
+
+ assertOpOnChannel(channels[3], ClientOperation.CACHE_PUT);
+
+ // Test affinity awareness after node join.
+ testAffinityAwareness(false);
+ }
+
+ /**
+ * Test that node left event affects affinity awareness.
+ */
+ @Test
+ public void testAffinityAwarenessOnNodeLeft() throws Exception {
+ startGrids(4);
+
+ awaitPartitionMapExchange();
+
+ initClient(getClientConfiguration(1, 2, 3), 1, 2, 3);
+
+ // Test affinity awareness before node left.
+ testAffinityAwareness(true);
+
+ stopGrid(3);
+
+ channels[3] = null;
+
+ awaitPartitionMapExchange();
+
+ // Next request will also detect topology change.
+ initDefaultChannel();
+
+ // Test affinity awareness after node join.
+ testAffinityAwareness(true);
+ }
+
+ /**
+ * Test connection restore to affinity nodes.
+ */
+ @Test
+ public void testConnectionLoss() throws Exception {
+ startGrids(2);
+
+ awaitPartitionMapExchange();
+
+ initClient(getClientConfiguration(0, 1), 0, 1);
+
+ // Test affinity awareness before connection to node lost.
+ testAffinityAwareness(true);
+
+ // Choose node to disconnect (not default node).
+ int disconnectNodeIdx = dfltCh == channels[0] ? 1 : 0;
+
+ // Drop all thin connections from the node.
+ ObjectName mbeanName = U.makeMBeanName(grid(disconnectNodeIdx).name(),
"Clients",
+ ClientListenerProcessor.class.getSimpleName());
+
+
MBeanServerInvocationHandler.newProxyInstance(ManagementFactory.getPlatformMBeanServer(),
mbeanName,
+ ClientProcessorMXBean.class, true).dropAllConnections();
+
+ channels[disconnectNodeIdx] = null;
+
+ // Send request to disconnected node.
+ ClientCache<Object, Object> cache = client.cache(PART_CACHE_NAME);
+
+ Integer key =
primaryKey(grid(disconnectNodeIdx).cache(PART_CACHE_NAME));
+
+ assertNotNull("Not found key for node " + disconnectNodeIdx, key);
+
+ cache.put(key, 0);
+
+ // Request goes to default channel, since affinity node is
disconnected.
+ assertOpOnChannel(dfltCh, ClientOperation.CACHE_PUT);
+
+ cache.put(key, 0);
+
+ // Connection to disconnected node should be restored after retry.
+ assertOpOnChannel(channels[disconnectNodeIdx],
ClientOperation.CACHE_PUT);
+
+ // Test affinity awareness.
+ testAffinityAwareness(false);
+ }
+
+ /**
+ * Test that affinity awareness works when reconnecting to the new cluster
(with lower topology version)
+ */
+ @Test
+ public void testAffinityAwarenessOnClusterRestart() throws Exception {
+ startGrids(3);
+
+ awaitPartitionMapExchange();
+
+ initClient(getClientConfiguration(0, 1, 2), 0, 1, 2);
+
+ // Test affinity awareness before cluster restart.
+ testAffinityAwareness(true);
+
+ stopAllGrids();
+
+ for (int i = 0; i < channels.length; i++)
+ channels[i] = null;
+
+ // Start 2 grids, so topology version of the new cluster will be less
then old cluster.
+ startGrids(2);
+
+ awaitPartitionMapExchange();
+
+ // Send any request to failover.
+ try {
+ client.cache(REPL_CACHE_NAME).put(0, 0);
+ }
+ catch (Exception expected) {
+ // No-op.
+ }
+
+ initDefaultChannel();
+
+ awaitChannelsInit(0, 1);
+
+ testAffinityAwareness(true);
+ }
+
+ /**
+ * Checks that each request goes to right node.
+ *
+ * @param partReq Next operation should request partitions map.
+ */
+ private void testAffinityAwareness(boolean partReq) {
+ ClientCache<Object, Object> clientCache =
client.cache(PART_CACHE_NAME);
+ IgniteInternalCache<Object, Object> igniteCache =
grid(0).context().cache().cache(PART_CACHE_NAME);
+
+ for (int i = 0; i < KEY_CNT; i++) {
+ TestTcpClientChannel opCh = affinityChannel(i, igniteCache);
+
+ clientCache.put(i, i);
+
+ if (partReq) {
+ assertOpOnChannel(dfltCh, ClientOperation.CACHE_PARTITIONS);
+
+ partReq = false;
+ }
+
+ assertOpOnChannel(opCh, ClientOperation.CACHE_PUT);
+ }
+ }
+}
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
index 80a77ff..b27aba9 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
@@ -17,6 +17,8 @@
package org.apache.ignite.client;
+import
org.apache.ignite.internal.client.thin.ThinClientAffinityAwarenessStableTopologyTest;
+import
org.apache.ignite.internal.client.thin.ThinClientAffinityAwarenessUnstableTopologyTest;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
@@ -37,7 +39,9 @@ import org.junit.runners.Suite;
SslParametersTest.class,
ConnectionTest.class,
ConnectToStartingNodeTest.class,
- AsyncChannelTest.class
+ AsyncChannelTest.class,
+ ThinClientAffinityAwarenessStableTopologyTest.class,
+ ThinClientAffinityAwarenessUnstableTopologyTest.class
})
public class ClientTestSuite {
// No-op.