This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new d72a123  IGNITE-11898 Java thin client: Affinity awareness support - 
Fixes #6980.
d72a123 is described below

commit d72a123e122ffe1b4f715f98c2db5d79293f0c90
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Tue Nov 12 17:57:42 2019 +0300

    IGNITE-11898 Java thin client: Affinity awareness support - Fixes #6980.
---
 .../ignite/configuration/ClientConfiguration.java  |  73 +++++
 .../ignite/internal/client/thin/ClientBinary.java  |   3 +-
 .../client/thin/ClientCacheAffinityContext.java    | 227 ++++++++++++++
 .../client/thin/ClientCacheAffinityMapping.java    | 269 ++++++++++++++++
 .../ignite/internal/client/thin/ClientChannel.java |  17 +
 .../client/thin/ClientChannelConfiguration.java    |  70 +++--
 .../internal/client/thin/ClientOperation.java      |   1 +
 .../internal/client/thin/ReliableChannel.java      | 344 +++++++++++++++++----
 .../apache/ignite/internal/client/thin/Result.java |  60 ----
 .../internal/client/thin/TcpClientCache.java       | 117 ++++---
 .../internal/client/thin/TcpClientChannel.java     |  40 ++-
 .../internal/client/thin/TcpIgniteClient.java      |  23 +-
 .../org/apache/ignite/client/ReliabilityTest.java  |  46 +++
 .../ThinClientAbstractAffinityAwarenessTest.java   | 322 +++++++++++++++++++
 ...nClientAffinityAwarenessStableTopologyTest.java | 206 ++++++++++++
 ...lientAffinityAwarenessUnstableTopologyTest.java | 220 +++++++++++++
 .../org/apache/ignite/client/ClientTestSuite.java  |   6 +-
 17 files changed, 1826 insertions(+), 218 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/configuration/ClientConfiguration.java
 
b/modules/core/src/main/java/org/apache/ignite/configuration/ClientConfiguration.java
index 3949919..a4fe74c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/configuration/ClientConfiguration.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/configuration/ClientConfiguration.java
@@ -93,6 +93,25 @@ public final class ClientConfiguration implements 
Serializable {
     private ClientTransactionConfiguration txCfg = new 
ClientTransactionConfiguration();
 
     /**
+     * Whether affinity awareness should be enabled.
+     *
+     * When {@code true} client attempts to send the request directly to the 
primary node for the given cache key.
+     * To do so, connection is established to every known server node.
+     * By default {@code false} only one connection is established at a given 
moment to a random server node.
+     */
+    private boolean affinityAwarenessEnabled;
+
+    /**
+     * Reconnect throttling period (in milliseconds). There are no more than 
{@code reconnectThrottlingRetries}
+     * attempts to reconnect will be made within {@code 
reconnectThrottlingPeriod} in case of connection loss.
+     * Throttling is disabled if either {@code reconnectThrottlingRetries} or 
{@code reconnectThrottlingPeriod} is 0.
+     */
+    private long reconnectThrottlingPeriod = 30_000L;
+
+    /** Reconnect throttling retries. See {@code reconnectThrottlingPeriod}. */
+    private int reconnectThrottlingRetries = 3;
+
+    /**
      * @return Host addresses.
      */
     public String[] getAddresses() {
@@ -416,6 +435,60 @@ public final class ClientConfiguration implements 
Serializable {
         return this;
     }
 
+    /**
+     * @return Whether affinity awareness should be enabled.
+     */
+    public boolean isAffinityAwarenessEnabled() {
+        return affinityAwarenessEnabled;
+    }
+
+    /**
+     * Enable or disable affinity awareness.
+     *
+     * @return {@code this} for chaining.
+     */
+    public ClientConfiguration setAffinityAwarenessEnabled(boolean 
affinityAwarenessEnabled) {
+        this.affinityAwarenessEnabled = affinityAwarenessEnabled;
+
+        return this;
+    }
+
+    /**
+     * Gets reconnect throttling period.
+     */
+    public long getReconnectThrottlingPeriod() {
+        return reconnectThrottlingPeriod;
+    }
+
+    /**
+     * Sets reconnect throttling period.
+     *
+     * @return {@code this} for chaining.
+     */
+    public ClientConfiguration setReconnectThrottlingPeriod(long 
reconnectThrottlingPeriod) {
+        this.reconnectThrottlingPeriod = reconnectThrottlingPeriod;
+
+        return this;
+    }
+
+    /**
+     * Gets reconnect throttling retries.
+     */
+    public int getReconnectThrottlingRetries() {
+        return reconnectThrottlingRetries;
+    }
+
+    /**
+     * Sets reconnect throttling retries.
+     *
+     * @return {@code this} for chaining.
+     */
+    public ClientConfiguration setReconnectThrottlingRetries(int 
reconnectThrottlingRetries) {
+        this.reconnectThrottlingRetries = reconnectThrottlingRetries;
+
+        return this;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(ClientConfiguration.class, this);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientBinary.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientBinary.java
index 4655cbf..8f381a6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientBinary.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientBinary.java
@@ -27,6 +27,7 @@ import org.apache.ignite.binary.BinaryType;
 import org.apache.ignite.internal.binary.BinaryContext;
 import org.apache.ignite.internal.binary.BinaryEnumObjectImpl;
 import org.apache.ignite.internal.binary.BinaryMetadata;
+import org.apache.ignite.internal.binary.BinaryUtils;
 import org.apache.ignite.internal.binary.builder.BinaryObjectBuilderImpl;
 import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream;
 
@@ -54,7 +55,7 @@ class ClientBinary implements IgniteBinary {
         if (obj == null)
             return null;
 
-        if (obj instanceof IgniteBinary)
+        if (BinaryUtils.isBinaryType(obj.getClass()))
             return (T)obj;
 
         byte[] objBytes = marsh.marshal(obj);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityContext.java
new file mode 100644
index 0000000..29bff6e
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityContext.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.IgniteBinary;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+
+/**
+ * Client cache affinity awareness context.
+ */
+public class ClientCacheAffinityContext {
+    /** Binary data processor. */
+    private final IgniteBinary binary;
+
+    /** Contains last topology version and known nodes of this version. */
+    private final AtomicReference<TopologyNodes> lastTop = new 
AtomicReference<>();
+
+    /** Current affinity mapping. */
+    private volatile ClientCacheAffinityMapping affinityMapping;
+
+    /** Cache IDs, which should be included to the next affinity mapping 
request. */
+    private final Set<Integer> pendingCacheIds = new GridConcurrentHashSet<>();
+
+    /**
+     * @param binary Binary data processor.
+     */
+    public ClientCacheAffinityContext(IgniteBinary binary) {
+        this.binary = binary;
+    }
+
+    /**
+     * Update topology version if it's greater than current version and store 
nodes for last topology.
+     *
+     * @param topVer Topology version.
+     * @param nodeId Node id.
+     * @return {@code True} if last topology was updated to the new version.
+     */
+    public boolean updateLastTopologyVersion(AffinityTopologyVersion topVer, 
UUID nodeId) {
+        while (true) {
+            TopologyNodes lastTop = this.lastTop.get();
+
+            if (lastTop == null || topVer.compareTo(lastTop.topVer) > 0) {
+                if (this.lastTop.compareAndSet(lastTop, new 
TopologyNodes(topVer, nodeId)))
+                    return true;
+            }
+            else if (topVer.equals(lastTop.topVer)) {
+                lastTop.nodes.add(nodeId);
+
+                return false;
+            }
+            else
+                return false;
+        }
+    }
+
+    /**
+     * Is affinity update required for given cache.
+     *
+     * @param cacheId Cache id.
+     */
+    public boolean affinityUpdateRequired(int cacheId) {
+        TopologyNodes top = lastTop.get();
+
+        if (top == null) { // Don't know current topology.
+            pendingCacheIds.add(cacheId);
+
+            return false;
+        }
+
+        ClientCacheAffinityMapping mapping = affinityMapping;
+
+        if (mapping == null) {
+            pendingCacheIds.add(cacheId);
+
+            return true;
+        }
+
+        if (top.topVer.compareTo(mapping.topologyVersion()) > 0) {
+            pendingCacheIds.add(cacheId);
+
+            return true;
+        }
+
+        if (mapping.cacheIds().contains(cacheId))
+            return false;
+        else {
+            pendingCacheIds.add(cacheId);
+
+            return true;
+        }
+    }
+
+    /**
+     * @param ch Payload output channel.
+     */
+    public void writePartitionsUpdateRequest(PayloadOutputChannel ch) {
+        ClientCacheAffinityMapping.writeRequest(ch, pendingCacheIds);
+    }
+
+    /**
+     * @param ch Payload input channel.
+     */
+    public synchronized boolean 
readPartitionsUpdateResponse(PayloadInputChannel ch) {
+        if (lastTop.get() == null)
+            return false;
+
+        ClientCacheAffinityMapping newMapping = 
ClientCacheAffinityMapping.readResponse(ch);
+
+        ClientCacheAffinityMapping oldMapping = affinityMapping;
+
+        if (oldMapping == null || 
newMapping.topologyVersion().compareTo(oldMapping.topologyVersion()) > 0) {
+            affinityMapping = newMapping;
+
+            if (oldMapping != null)
+                pendingCacheIds.addAll(oldMapping.cacheIds());
+
+            pendingCacheIds.removeAll(newMapping.cacheIds());
+
+            return true;
+        }
+
+        if (newMapping.topologyVersion().equals(oldMapping.topologyVersion())) 
{
+            affinityMapping = ClientCacheAffinityMapping.merge(oldMapping, 
newMapping);
+
+            pendingCacheIds.removeAll(newMapping.cacheIds());
+
+            return true;
+        }
+
+        // Obsolete mapping.
+        return true;
+    }
+
+    /**
+     * Gets last topology information.
+     */
+    public TopologyNodes lastTopology() {
+        return lastTop.get();
+    }
+
+    /**
+     * Resets affinity context.
+     *
+     * @param top Topology which triggers reset.
+     */
+    public synchronized void reset(TopologyNodes top) {
+        if (lastTop.compareAndSet(top, null)) {
+            affinityMapping = null;
+
+            pendingCacheIds.clear();
+        }
+    }
+
+    /**
+     * Calculates affinity node for given cache and key.
+     *
+     * @param cacheId Cache ID.
+     * @param key Key.
+     * @return Affinity node id or {@code null} if affinity node can't be 
determined for given cache and key.
+     */
+    public UUID affinityNode(int cacheId, Object key) {
+        TopologyNodes top = lastTop.get();
+
+        if (top == null)
+            return null;
+
+        ClientCacheAffinityMapping mapping = affinityMapping;
+
+        if (mapping == null)
+            return null;
+
+        if (top.topVer.compareTo(mapping.topologyVersion()) > 0)
+            return null;
+
+        return mapping.affinityNode(binary, cacheId, key);
+    }
+
+    /**
+     * Holder for list of nodes for topology version.
+     */
+    static class TopologyNodes {
+        /** Topology version. */
+        private final AffinityTopologyVersion topVer;
+
+        /** Nodes. */
+        private final Collection<UUID> nodes = new ConcurrentLinkedQueue<>();
+
+        /**
+         * @param topVer Topology version.
+         * @param nodeId Node id.
+         */
+        private TopologyNodes(AffinityTopologyVersion topVer, UUID nodeId) {
+            this.topVer = topVer;
+
+            nodes.add(nodeId);
+        }
+
+        /**
+         * Gets nodes of this topology.
+         */
+        public Iterable<UUID> nodes() {
+            return Collections.unmodifiableCollection(nodes);
+        }
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityMapping.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityMapping.java
new file mode 100644
index 0000000..84e4074
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityMapping.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.IgniteBinary;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.internal.binary.BinaryObjectExImpl;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Affinity mapping (partition to nodes) for each cache.
+ */
+public class ClientCacheAffinityMapping {
+    /** CacheAffinityInfo for caches with not applicable affinity awareness. */
+    private static final CacheAffinityInfo NOT_APPLICABLE_CACHE_AFFINITY_INFO =
+        new CacheAffinityInfo(null, null);
+
+    /** Topology version. */
+    private final AffinityTopologyVersion topVer;
+
+    /** Affinity information for each cache. */
+    private final Map<Integer, CacheAffinityInfo> cacheAffinity = new 
HashMap<>();
+
+    /** Unmodifiable collection of cache IDs. To preserve instance 
immutability. */
+    private final Collection<Integer> cacheIds = 
Collections.unmodifiableCollection(cacheAffinity.keySet());
+
+    /**
+     * @param topVer Topology version.
+     */
+    private ClientCacheAffinityMapping(AffinityTopologyVersion topVer) {
+        this.topVer = topVer;
+    }
+
+    /**
+     * Gets topology version.
+     */
+    public AffinityTopologyVersion topologyVersion() {
+        return topVer;
+    }
+
+    /**
+     * Gets cache IDs.
+     */
+    public Collection<Integer> cacheIds() {
+        return cacheIds;
+    }
+
+    /**
+     * Calculates affinity node for given cache and key.
+     *
+     * @param binary Binary data processor (needed to extract affinity field 
from the key).
+     * @param cacheId Cache ID.
+     * @param key Key.
+     * @return Affinity node id or {@code null} if affinity node can't be 
determined for given cache and key.
+     */
+    public UUID affinityNode(IgniteBinary binary, int cacheId, Object key) {
+        CacheAffinityInfo affinityInfo = cacheAffinity.get(cacheId);
+
+        if (affinityInfo == null || affinityInfo == 
NOT_APPLICABLE_CACHE_AFFINITY_INFO)
+            return null;
+
+        Object binaryKey = binary.toBinary(key);
+
+        if (!affinityInfo.keyCfg.isEmpty()) {
+            int typeId = binary.typeId(key.getClass().getName());
+
+            Integer fieldId = affinityInfo.keyCfg.get(typeId);
+
+            if (fieldId != null) {
+                if (binaryKey instanceof BinaryObjectExImpl)
+                    binaryKey = ((BinaryObjectExImpl)binaryKey).field(fieldId);
+                else // Can't get field value, affinity node can't be 
determined in this case.
+                    return null;
+            }
+        }
+
+        return affinityInfo.nodeForKey(binaryKey);
+    }
+
+    /**
+     * Merge specified mappings into one instance.
+     */
+    public static ClientCacheAffinityMapping merge(ClientCacheAffinityMapping 
... mappings) {
+        assert !F.isEmpty(mappings);
+
+        ClientCacheAffinityMapping res = new 
ClientCacheAffinityMapping(mappings[0].topVer);
+
+        for (ClientCacheAffinityMapping mapping : mappings) {
+            assert res.topVer.equals(mapping.topVer) : "Mappings must have 
identical topology versions [res.topVer=" +
+                res.topVer + ", mapping.topVer=" + mapping.topVer + ']';
+
+            for (Map.Entry<Integer, CacheAffinityInfo> entry : 
mapping.cacheAffinity.entrySet())
+                res.cacheAffinity.put(entry.getKey(), entry.getValue());
+        }
+
+        return res;
+    }
+
+    /**
+     * Writes caches affinity request to the output channel.
+     *
+     * @param ch Output channel.
+     * @param cacheIds Cache IDs.
+     */
+    public static void writeRequest(PayloadOutputChannel ch, 
Collection<Integer> cacheIds) {
+        BinaryOutputStream out = ch.out();
+
+        out.writeInt(cacheIds.size());
+
+        for (int cacheId : cacheIds)
+            out.writeInt(cacheId);
+    }
+
+    /**
+     * Reads caches affinity response from the input channel and creates 
{@code ClientCacheAffinityMapping} instance
+     * from this response.
+     *
+     * @param ch Input channel.
+     */
+    public static ClientCacheAffinityMapping readResponse(PayloadInputChannel 
ch) {
+        try (BinaryReaderExImpl in = new BinaryReaderExImpl(null, ch.in(), 
null, true)) {
+            long topVer = in.readLong();
+            int minorTopVer = in.readInt();
+
+            ClientCacheAffinityMapping aff = new ClientCacheAffinityMapping(
+                new AffinityTopologyVersion(topVer, minorTopVer));
+
+            int mappingsCnt = in.readInt();
+
+            for (int i = 0; i < mappingsCnt; i++) {
+                boolean applicable = in.readBoolean();
+
+                int cachesCnt = in.readInt();
+
+                if (applicable) { // Affinity awareness is applicable for this 
caches.
+                    Map<Integer, Map<Integer, Integer>> cacheKeyCfg = 
U.newHashMap(cachesCnt);
+
+                    for (int j = 0; j < cachesCnt; j++)
+                        cacheKeyCfg.put(in.readInt(), 
readCacheKeyConfiguration(in));
+
+                    UUID[] partToNode = readNodePartitions(in);
+
+                    for (Map.Entry<Integer, Map<Integer, Integer>> keyCfg : 
cacheKeyCfg.entrySet())
+                        aff.cacheAffinity.put(keyCfg.getKey(), new 
CacheAffinityInfo(keyCfg.getValue(), partToNode));
+                }
+                else { // Affinity awareness is not applicable for this caches.
+                    for (int j = 0; j < cachesCnt; j++)
+                        aff.cacheAffinity.put(in.readInt(), 
NOT_APPLICABLE_CACHE_AFFINITY_INFO);
+                }
+            }
+
+            return aff;
+        }
+        catch (IOException e) {
+            throw new ClientError(e);
+        }
+    }
+
+    /**
+     * @param in Input reader.
+     */
+    private static Map<Integer, Integer> 
readCacheKeyConfiguration(BinaryReaderExImpl in) {
+        int keyCfgCnt = in.readInt();
+
+        Map<Integer, Integer> keyCfg = U.newHashMap(keyCfgCnt);
+
+        for (int i = 0; i < keyCfgCnt; i++)
+            keyCfg.put(in.readInt(), in.readInt());
+
+        return keyCfg;
+    }
+
+    /**
+     * @param in Input reader.
+     */
+    private static UUID[] readNodePartitions(BinaryReaderExImpl in) {
+        int nodesCnt = in.readInt();
+
+        int maxPart = -1;
+
+        UUID[] partToNode = new UUID[1024];
+
+        for (int i = 0; i < nodesCnt; i++) {
+            UUID nodeId = in.readUuid();
+
+            int partCnt = in.readInt();
+
+            for (int j = 0; j < partCnt; j++) {
+                int part = in.readInt();
+
+                if (part > maxPart) {
+                    maxPart = part;
+
+                    // Expand partToNode if needed.
+                    if (part >= partToNode.length)
+                        partToNode = Arrays.copyOf(partToNode, U.ceilPow2(part 
+ 1));
+                }
+
+                partToNode[part] = nodeId;
+            }
+        }
+
+        return Arrays.copyOf(partToNode, maxPart + 1);
+    }
+
+    /**
+     * Class to store affinity information for cache.
+     */
+    private static class CacheAffinityInfo {
+        /** Key configuration. */
+        private final Map<Integer, Integer> keyCfg;
+
+        /** Partition mapping. */
+        private final UUID[] partMapping;
+
+        /** Affinity mask. */
+        private final int affinityMask;
+
+        /**
+         * @param keyCfg Cache key configuration or {@code null} if affinity 
awareness is not applicable for this cache.
+         * @param partMapping Partition to node mapping or {@code null} if 
affinity awareness is not applicable for
+         * this cache.
+         */
+        private CacheAffinityInfo(Map<Integer, Integer> keyCfg, UUID[] 
partMapping) {
+            this.keyCfg = keyCfg;
+            this.partMapping = partMapping;
+            affinityMask = partMapping != null ? 
RendezvousAffinityFunction.calculateMask(partMapping.length) : 0;
+        }
+
+        /**
+         * Calculates node for given key.
+         *
+         * @param key Key.
+         */
+        private UUID nodeForKey(Object key) {
+            assert partMapping != null;
+
+            int part = RendezvousAffinityFunction.calculatePartition(key, 
affinityMask, partMapping.length);
+
+            return partMapping[part];
+        }
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannel.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannel.java
index de7062e..3cb8790 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannel.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannel.java
@@ -17,10 +17,12 @@
 
 package org.apache.ignite.internal.client.thin;
 
+import java.util.UUID;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import org.apache.ignite.client.ClientAuthorizationException;
 import org.apache.ignite.client.ClientConnectionException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 
 /**
  * Processing thin client requests and responses.
@@ -41,4 +43,19 @@ interface ClientChannel extends AutoCloseable {
      * @return Server version.
      */
     public ProtocolVersion serverVersion();
+
+    /**
+     * @return Server node ID.
+     */
+    public UUID serverNodeId();
+
+    /**
+     * @return Server topology version.
+     */
+    public AffinityTopologyVersion serverTopologyVersion();
+
+    /**
+     * Add topology change listener.
+     */
+    public void addTopologyChangeListener(Consumer<ClientChannel> lsnr);
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannelConfiguration.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannelConfiguration.java
index eaf5055..6260d62 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannelConfiguration.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannelConfiguration.java
@@ -29,63 +29,69 @@ import org.apache.ignite.client.SslProtocol;
  */
 final class ClientChannelConfiguration {
     /** Host. */
-    private InetSocketAddress addr;
+    private final InetSocketAddress addr;
 
     /** Ssl mode. */
-    private SslMode sslMode;
+    private final SslMode sslMode;
 
     /** Tcp no delay. */
-    private boolean tcpNoDelay;
+    private final boolean tcpNoDelay;
 
     /** Timeout. */
-    private int timeout;
+    private final int timeout;
 
     /** Send buffer size. */
-    private int sndBufSize;
+    private final int sndBufSize;
 
     /** Receive buffer size. */
-    private int rcvBufSize;
+    private final int rcvBufSize;
 
     /** Ssl client certificate key store path. */
-    private String sslClientCertKeyStorePath;
+    private final String sslClientCertKeyStorePath;
 
     /** Ssl client certificate key store type. */
-    private String sslClientCertKeyStoreType;
+    private final String sslClientCertKeyStoreType;
 
     /** Ssl client certificate key store password. */
-    private String sslClientCertKeyStorePwd;
+    private final String sslClientCertKeyStorePwd;
 
     /** Ssl trust certificate key store path. */
-    private String sslTrustCertKeyStorePath;
+    private final String sslTrustCertKeyStorePath;
 
     /** Ssl trust certificate key store type. */
-    private String sslTrustCertKeyStoreType;
+    private final String sslTrustCertKeyStoreType;
 
     /** Ssl trust certificate key store password. */
-    private String sslTrustCertKeyStorePwd;
+    private final String sslTrustCertKeyStorePwd;
 
     /** Ssl key algorithm. */
-    private String sslKeyAlgorithm;
+    private final String sslKeyAlgorithm;
 
     /** Ssl protocol. */
-    private SslProtocol sslProto;
+    private final SslProtocol sslProto;
 
     /** Ssl trust all. */
-    private boolean sslTrustAll;
+    private final boolean sslTrustAll;
 
     /** SSL Context Factory. */
-    private Factory<SSLContext> sslCtxFactory;
+    private final Factory<SSLContext> sslCtxFactory;
 
     /** User. */
-    private String userName;
+    private final String userName;
 
     /** Password. */
-    private String userPwd;
+    private final String userPwd;
+
+    /** Reconnect period (for throttling). */
+    private final long reconnectThrottlingPeriod;
+
+    /** Reconnect retries within period (for throttling). */
+    private final int reconnectThrottlingRetries;
 
     /**
      * Constructor.
      */
-    ClientChannelConfiguration(ClientConfiguration cfg) {
+    ClientChannelConfiguration(ClientConfiguration cfg, InetSocketAddress 
addr) {
         this.sslMode = cfg.getSslMode();
         this.tcpNoDelay = cfg.isTcpNoDelay();
         this.timeout = cfg.getTimeout();
@@ -103,6 +109,9 @@ final class ClientChannelConfiguration {
         this.sslCtxFactory = cfg.getSslContextFactory();
         this.userName = cfg.getUserName();
         this.userPwd = cfg.getUserPassword();
+        this.reconnectThrottlingPeriod = cfg.getReconnectThrottlingPeriod();
+        this.reconnectThrottlingRetries = cfg.getReconnectThrottlingRetries();
+        this.addr = addr;
     }
 
     /**
@@ -113,15 +122,6 @@ final class ClientChannelConfiguration {
     }
 
     /**
-     * @param newVal Address.
-     */
-    public ClientChannelConfiguration setAddress(InetSocketAddress newVal) {
-        addr = newVal;
-
-        return this;
-    }
-
-    /**
      * @return SSL Mode.
      */
     public SslMode getSslMode() {
@@ -239,4 +239,18 @@ final class ClientChannelConfiguration {
     public String getUserPassword() {
         return userPwd;
     }
+
+    /**
+     * @return Reconnect period (for throttling).
+     */
+    public long getReconnectThrottlingPeriod() {
+        return reconnectThrottlingPeriod;
+    }
+
+    /**
+     * @return Reconnect retries within period (for throttling).
+     */
+    public int getReconnectThrottlingRetries() {
+        return reconnectThrottlingRetries;
+    }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
index c0017f3..9091849 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
@@ -44,6 +44,7 @@ enum ClientOperation {
     /** Cache get and replace. */CACHE_GET_AND_REPLACE(1006),
     /** Cache put if absent. */CACHE_PUT_IF_ABSENT(1002),
     /** Cache clear. */CACHE_CLEAR(1013),
+    /** Cache partitions. */CACHE_PARTITIONS(1101),
     /** Query scan. */QUERY_SCAN(2000),
     /** Query scan cursor get page. */QUERY_SCAN_CURSOR_GET_PAGE(2001),
     /** Query sql. */QUERY_SQL(2002),
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
index 2e8deef..abcf484 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
@@ -20,14 +20,20 @@ package org.apache.ignite.internal.client.thin;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Deque;
-import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
+import org.apache.ignite.IgniteBinary;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.client.ClientConnectionException;
 import org.apache.ignite.client.ClientException;
@@ -36,28 +42,44 @@ import 
org.apache.ignite.configuration.ClientConnectorConfiguration;
 import org.apache.ignite.internal.util.HostAndPortRange;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.NotNull;
 
 /**
- * Adds failover abd thread-safety to {@link ClientChannel}.
+ * Communication channel with failover and affinity awareness.
  */
 final class ReliableChannel implements AutoCloseable {
-    /** Raw channel. */
-    private final Function<ClientChannelConfiguration, Result<ClientChannel>> 
chFactory;
+    /** Channel factory. */
+    private final Function<ClientChannelConfiguration, ClientChannel> 
chFactory;
 
-    /** Servers count. */
-    private final int srvCnt;
+    /** Client channel holders for each configured address. */
+    private final ClientChannelHolder[] channels;
 
-    /** Primary server. */
-    private InetSocketAddress primary;
+    /** Index of the current channel. */
+    private int curChIdx;
 
-    /** Backup servers. */
-    private final Deque<InetSocketAddress> backups = new LinkedList<>();
+    /** Affinity awareness enabled. */
+    private final boolean affinityAwarenessEnabled;
 
-    /** Channel. */
-    private ClientChannel ch;
+    /** Cache affinity awareness context. */
+    private final ClientCacheAffinityContext affinityCtx;
 
-    /** Ignite config. */
-    private final ClientConfiguration clientCfg;
+    /** Node channels. */
+    private final Map<UUID, ClientChannelHolder> nodeChannels = new 
ConcurrentHashMap<>();
+
+    /** Async tasks thread pool. */
+    private final ExecutorService asyncRunner = 
Executors.newSingleThreadExecutor(
+        new ThreadFactory() {
+            @Override public Thread newThread(@NotNull Runnable r) {
+                return new Thread(r, "thin-client-channel-async-runner");
+            }
+        }
+    );
+
+    /** Channels reinit was scheduled. */
+    private final AtomicBoolean scheduledChannelsReinit = new AtomicBoolean();
+
+    /** Affinity map update is in progress. */
+    private final AtomicBoolean affinityUpdateInProgress = new AtomicBoolean();
 
     /** Channel is closed. */
     private boolean closed;
@@ -66,8 +88,9 @@ final class ReliableChannel implements AutoCloseable {
      * Constructor.
      */
     ReliableChannel(
-        Function<ClientChannelConfiguration, Result<ClientChannel>> chFactory,
-        ClientConfiguration clientCfg
+        Function<ClientChannelConfiguration, ClientChannel> chFactory,
+        ClientConfiguration clientCfg,
+        IgniteBinary binary
     ) throws ClientException {
         if (chFactory == null)
             throw new NullPointerException("chFactory");
@@ -76,30 +99,34 @@ final class ReliableChannel implements AutoCloseable {
             throw new NullPointerException("clientCfg");
 
         this.chFactory = chFactory;
-        this.clientCfg = clientCfg;
 
         List<InetSocketAddress> addrs = 
parseAddresses(clientCfg.getAddresses());
 
-        srvCnt = addrs.size();
+        channels = new ClientChannelHolder[addrs.size()];
 
-        primary = addrs.get(new Random().nextInt(addrs.size())); // we already 
verified there is at least one address
+        for (int i = 0; i < channels.length; i++)
+            channels[i] = new ClientChannelHolder(new 
ClientChannelConfiguration(clientCfg, addrs.get(i)));
 
-        for (InetSocketAddress a : addrs) {
-            if (a != primary)
-                backups.add(a);
-        }
+        curChIdx = new Random().nextInt(channels.length); // We already 
verified there is at least one address.
+
+        affinityAwarenessEnabled = clientCfg.isAffinityAwarenessEnabled() && 
channels.length > 1;
+
+        affinityCtx = new ClientCacheAffinityContext(binary);
 
         ClientConnectionException lastEx = null;
 
-        for (int i = 0; i < addrs.size(); i++) {
+        for (int i = 0; i < channels.length; i++) {
             try {
-                ch = chFactory.apply(new 
ClientChannelConfiguration(clientCfg).setAddress(primary)).get();
+                channels[curChIdx].getOrCreateChannel();
+
+                if (affinityAwarenessEnabled)
+                    initAllChannelsAsync();
 
                 return;
             } catch (ClientConnectionException e) {
                 lastEx = e;
 
-                rollAddress();
+                rollCurrentChannel();
             }
         }
 
@@ -107,14 +134,11 @@ final class ReliableChannel implements AutoCloseable {
     }
 
     /** {@inheritDoc} */
-    @Override public synchronized void close() throws Exception {
+    @Override public synchronized void close() {
         closed = true;
 
-        if (ch != null) {
-            ch.close();
-
-            ch = null;
-        }
+        for (ClientChannelHolder hld : channels)
+            hld.closeChannel();
     }
 
     /**
@@ -127,7 +151,7 @@ final class ReliableChannel implements AutoCloseable {
     ) throws ClientException {
         ClientConnectionException failure = null;
 
-        for (int i = 0; i < srvCnt; i++) {
+        for (int i = 0; i < channels.length; i++) {
             ClientChannel ch = null;
 
             try {
@@ -141,7 +165,7 @@ final class ReliableChannel implements AutoCloseable {
                 else
                     failure.addSuppressed(e);
 
-                changeServer(ch);
+                onChannelFailure(ch);
             }
         }
 
@@ -164,6 +188,97 @@ final class ReliableChannel implements AutoCloseable {
     }
 
     /**
+     * Send request to affinity node and handle response.
+     */
+    public <T> T affinityService(
+        int cacheId,
+        Object key,
+        ClientOperation op,
+        Consumer<PayloadOutputChannel> payloadWriter,
+        Function<PayloadInputChannel, T> payloadReader
+    ) throws ClientException {
+        if (affinityAwarenessEnabled && !nodeChannels.isEmpty() && 
affinityInfoIsUpToDate(cacheId)) {
+            UUID affinityNodeId = affinityCtx.affinityNode(cacheId, key);
+
+            if (affinityNodeId != null) {
+                ClientChannelHolder hld = nodeChannels.get(affinityNodeId);
+
+                if (hld != null) {
+                    ClientChannel ch = null;
+
+                    try {
+                        ch = hld.getOrCreateChannel();
+
+                        return ch.service(op, payloadWriter, payloadReader);
+                    }
+                    catch (ClientConnectionException ignore) {
+                        onChannelFailure(hld, ch);
+                    }
+                }
+            }
+        }
+
+        // Can't determine affinity node or request to affinity node failed - 
proceed with standart failover service.
+        return service(op, payloadWriter, payloadReader);
+    }
+
+    /**
+     * Checks if affinity information for the cache is up to date and tries to 
update it if not.
+     *
+     * @return {@code True} if affinity information is up to date, {@code 
false} if there is not affinity information
+     * available for this cache or information is obsolete and failed to 
update it.
+     */
+    private boolean affinityInfoIsUpToDate(int cacheId) {
+        if (affinityCtx.affinityUpdateRequired(cacheId)) {
+            if (affinityUpdateInProgress.compareAndSet(false, true)) {
+                try {
+                    ClientCacheAffinityContext.TopologyNodes lastTop = 
affinityCtx.lastTopology();
+
+                    if (lastTop == null)
+                        return false;
+
+                    for (UUID nodeId : lastTop.nodes()) {
+                        // Abort iterations when topology changed.
+                        if (lastTop != affinityCtx.lastTopology())
+                            return false;
+
+                        ClientChannelHolder hld = nodeChannels.get(nodeId);
+
+                        if (hld != null) {
+                            ClientChannel ch = null;
+
+                            try {
+                                ch = hld.getOrCreateChannel();
+
+                                return 
ch.service(ClientOperation.CACHE_PARTITIONS,
+                                    affinityCtx::writePartitionsUpdateRequest,
+                                    affinityCtx::readPartitionsUpdateResponse);
+                            }
+                            catch (ClientConnectionException ignore) {
+                                onChannelFailure(hld, ch);
+                            }
+                        }
+                    }
+
+                    // There is no one alive node found for last topology 
version, we should reset affinity context
+                    // to let affinity get updated in case of reconnection to 
the new cluster (with lower topology
+                    // version).
+                    affinityCtx.reset(lastTop);
+                }
+                finally {
+                    affinityUpdateInProgress.set(false);
+                }
+            }
+
+            // No suitable nodes found to update affinity, failed to execute 
service on all nodes or update is already
+            // in progress by another thread.
+            return false;
+        }
+        else
+            return true;
+    }
+
+    /**
      * @return host:port_range address lines parsed as {@link 
InetSocketAddress}.
      */
     private static List<InetSocketAddress> parseAddresses(String[] addrs) 
throws ClientException {
@@ -199,34 +314,157 @@ final class ReliableChannel implements AutoCloseable {
         if (closed)
             throw new ClientException("Channel is closed");
 
-        if (ch == null) {
-            try {
-                ch = chFactory.apply(new 
ClientChannelConfiguration(clientCfg).setAddress(primary)).get();
-            }
-            catch (ClientConnectionException e) {
-                rollAddress();
-
-                throw e;
-            }
+        try {
+            return channels[curChIdx].getOrCreateChannel();
         }
+        catch (ClientConnectionException e) {
+            rollCurrentChannel();
 
-        return ch;
+            throw e;
+        }
     }
 
     /** */
-    private void rollAddress() {
-        if (!backups.isEmpty()) {
-            backups.addLast(primary);
+    private synchronized void rollCurrentChannel() {
+        if (++curChIdx >= channels.length)
+            curChIdx = 0;
+    }
 
-            primary = backups.removeFirst();
+    /**
+     * On current channel failure.
+     */
+    private synchronized void onChannelFailure(ClientChannel ch) {
+        // There is nothing wrong if curChIdx was concurrently changed, since 
channel was closed by another thread
+        // when current index was changed and no other wrong channel will be 
closed by current thread because
+        // onChannelFailure checks channel binded to the holder before closing 
it.
+        onChannelFailure(channels[curChIdx], ch);
+    }
+
+    /**
+     * On channel of the specified holder failure.
+     */
+    private synchronized void onChannelFailure(ClientChannelHolder hld, 
ClientChannel ch) {
+        if (ch == hld.ch && ch != null) {
+            hld.closeChannel();
+
+            if (hld == channels[curChIdx])
+                rollCurrentChannel();
         }
     }
 
-    /** */
-    private synchronized void changeServer(ClientChannel oldCh) {
-        if (oldCh == ch && ch != null) {
-            rollAddress();
+    /**
+     * Asynchronously try to establish a connection to all configured servers.
+     */
+    private void initAllChannelsAsync() {
+        // Skip if there is already channels reinit scheduled.
+        if (scheduledChannelsReinit.compareAndSet(false, true)) {
+            asyncRunner.submit(
+                () -> {
+                    scheduledChannelsReinit.set(false);
+
+                    for (ClientChannelHolder hld : channels) {
+                        if (scheduledChannelsReinit.get())
+                            return; // New reinit task scheduled.
+
+                        try {
+                            hld.getOrCreateChannel(true);
+                        }
+                        catch (Exception ignore) {
+                            // No-op.
+                        }
+                    }
+                }
+            );
+        }
+    }
+
+    /**
+     * Topology version change detected on the channel.
+     *
+     * @param ch Channel.
+     */
+    private void onTopologyChanged(ClientChannel ch) {
+        if (affinityAwarenessEnabled && 
affinityCtx.updateLastTopologyVersion(ch.serverTopologyVersion(),
+            ch.serverNodeId()))
+            initAllChannelsAsync();
+    }
+
+    /**
+     * Channels holder.
+     */
+    private class ClientChannelHolder {
+        /** Channel configuration. */
+        private final ClientChannelConfiguration chCfg;
+
+        /** Channel. */
+        private volatile ClientChannel ch;
+
+        /** Timestamps of reconnect retries. */
+        private final long[] reconnectRetries;
+
+        /**
+         * @param chCfg Channel config.
+         */
+        private ClientChannelHolder(ClientChannelConfiguration chCfg) {
+            this.chCfg = chCfg;
+
+            reconnectRetries = chCfg.getReconnectThrottlingRetries() > 0 && 
chCfg.getReconnectThrottlingPeriod() > 0L ?
+                new long[chCfg.getReconnectThrottlingRetries()] : null;
+        }
+
+        /**
+         * @return Whether reconnect throttling should be applied.
+         */
+        private boolean applyReconnectionThrottling() {
+            if (reconnectRetries == null)
+                return false;
+
+            long ts = System.currentTimeMillis();
+
+            for (int i = 0; i < reconnectRetries.length; i++) {
+                if (ts - reconnectRetries[i] >= 
chCfg.getReconnectThrottlingPeriod()) {
+                    reconnectRetries[i] = ts;
+
+                    return false;
+                }
+            }
+
+            return true;
+        }
+
+        /**
+         * Get or create channel.
+         */
+        private synchronized ClientChannel getOrCreateChannel() {
+            return getOrCreateChannel(false);
+        }
+
+        /**
+         * Get or create channel.
+         */
+        private synchronized ClientChannel getOrCreateChannel(boolean 
ignoreThrottling) {
+            if (ch == null) {
+                if (!ignoreThrottling && applyReconnectionThrottling())
+                    throw new ClientConnectionException("Reconnect is not 
allowed due to applied throttling");
+
+                ch = chFactory.apply(chCfg);
+
+                if (ch.serverNodeId() != null) {
+                    
ch.addTopologyChangeListener(ReliableChannel.this::onTopologyChanged);
+
+                    nodeChannels.values().remove(this);
+
+                    nodeChannels.putIfAbsent(ch.serverNodeId(), this);
+                }
+            }
+
+            return ch;
+        }
 
+        /**
+         * Close channel.
+         */
+        private synchronized void closeChannel() {
             U.closeQuiet(ch);
 
             ch = null;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/Result.java 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/Result.java
deleted file mode 100644
index 865599f..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/Result.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.client.thin;
-
-import org.apache.ignite.client.ClientException;
-
-/**
- * Result of a function throwing an exception.
- */
-final class Result<T> {
-    /** Value. */
-    private final T val;
-
-    /** Exception. */
-    private final ClientException ex;
-
-    /**
-     * Initializes a successful result.
-     */
-    Result(T val) {
-        this.val = val;
-        ex = null;
-    }
-
-    /**
-     * Initializes a failed result.
-     */
-    Result(ClientException ex) {
-        if (ex == null)
-            throw new NullPointerException("ex");
-
-        this.ex = ex;
-        val = null;
-    }
-
-    /**
-     * @return Value;
-     */
-    public T get() throws ClientException {
-        if (ex != null)
-            throw ex;
-
-        return val;
-    }
-}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
index e46bb16..fa07556 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.Consumer;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import javax.cache.Cache;
 import org.apache.ignite.cache.CachePeekMode;
@@ -89,12 +90,10 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
         if (key == null)
             throw new NullPointerException("key");
 
-        return ch.service(
+        return cacheSingleKeyOperation(
+            key,
             ClientOperation.CACHE_GET,
-            req -> {
-                writeCacheInfo(req);
-                writeObject(req, key);
-            },
+            null,
             this::readObject
         );
     }
@@ -107,13 +106,11 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
         if (val == null)
             throw new NullPointerException("val");
 
-        ch.request(
+        cacheSingleKeyOperation(
+            key,
             ClientOperation.CACHE_PUT,
-            req -> {
-                writeCacheInfo(req);
-                writeObject(req, key);
-                writeObject(req, val);
-            }
+            req -> writeObject(req, val),
+            null
         );
     }
 
@@ -122,12 +119,10 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
         if (key == null)
             throw new NullPointerException("key");
 
-        return ch.service(
+        return cacheSingleKeyOperation(
+            key,
             ClientOperation.CACHE_CONTAINS_KEY,
-            req -> {
-                writeCacheInfo(req);
-                writeObject(req, key);
-            },
+            null,
             res -> res.in().readBoolean()
         );
     }
@@ -220,11 +215,10 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
         if (newVal == null)
             throw new NullPointerException("newVal");
 
-        return ch.service(
+        return cacheSingleKeyOperation(
+            key,
             ClientOperation.CACHE_REPLACE_IF_EQUALS,
             req -> {
-                writeCacheInfo(req);
-                writeObject(req, key);
                 writeObject(req, oldVal);
                 writeObject(req, newVal);
             },
@@ -240,13 +234,10 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
         if (val == null)
             throw new NullPointerException("val");
 
-        return ch.service(
+        return cacheSingleKeyOperation(
+            key,
             ClientOperation.CACHE_REPLACE,
-            req -> {
-                writeCacheInfo(req);
-                writeObject(req, key);
-                writeObject(req, val);
-            },
+            req -> writeObject(req, val),
             res -> res.in().readBoolean()
         );
     }
@@ -256,12 +247,10 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
         if (key == null)
             throw new NullPointerException("key");
 
-        return ch.service(
+        return cacheSingleKeyOperation(
+            key,
             ClientOperation.CACHE_REMOVE_KEY,
-            req -> {
-                writeCacheInfo(req);
-                writeObject(req, key);
-            },
+            null,
             res -> res.in().readBoolean()
         );
     }
@@ -274,13 +263,10 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
         if (oldVal == null)
             throw new NullPointerException("oldVal");
 
-        return ch.service(
+        return cacheSingleKeyOperation(
+            key,
             ClientOperation.CACHE_REMOVE_IF_EQUALS,
-            req -> {
-                writeCacheInfo(req);
-                writeObject(req, key);
-                writeObject(req, oldVal);
-            },
+            req -> writeObject(req, oldVal),
             res -> res.in().readBoolean()
         );
     }
@@ -315,13 +301,10 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
         if (val == null)
             throw new NullPointerException("val");
 
-        return ch.service(
+        return cacheSingleKeyOperation(
+            key,
             ClientOperation.CACHE_GET_AND_PUT,
-            req -> {
-                writeCacheInfo(req);
-                writeObject(req, key);
-                writeObject(req, val);
-            },
+            req -> writeObject(req, val),
             this::readObject
         );
     }
@@ -331,12 +314,10 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
         if (key == null)
             throw new NullPointerException("key");
 
-        return ch.service(
+        return cacheSingleKeyOperation(
+            key,
             ClientOperation.CACHE_GET_AND_REMOVE,
-            req -> {
-                writeCacheInfo(req);
-                writeObject(req, key);
-            },
+            null,
             this::readObject
         );
     }
@@ -349,13 +330,10 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
         if (val == null)
             throw new NullPointerException("val");
 
-        return ch.service(
+        return cacheSingleKeyOperation(
+            key,
             ClientOperation.CACHE_GET_AND_REPLACE,
-            req -> {
-                writeCacheInfo(req);
-                writeObject(req, key);
-                writeObject(req, val);
-            },
+            req -> writeObject(req, val),
             this::readObject
         );
     }
@@ -368,13 +346,10 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
         if (val == null)
             throw new NullPointerException("val");
 
-        return ch.service(
+        return cacheSingleKeyOperation(
+            key,
             ClientOperation.CACHE_PUT_IF_ABSENT,
-            req -> {
-                writeCacheInfo(req);
-                writeObject(req, key);
-                writeObject(req, val);
-            },
+            req -> writeObject(req, val),
             res -> res.in().readBoolean()
         );
     }
@@ -506,6 +481,30 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
         ));
     }
 
+
+    /**
+     * Execute cache operation with a single key.
+     */
+    private <T> T cacheSingleKeyOperation(
+        K key,
+        ClientOperation op,
+        Consumer<PayloadOutputChannel> additionalPayloadWriter,
+        Function<PayloadInputChannel, T> payloadReader
+    ) throws ClientException {
+        Consumer<PayloadOutputChannel> payloadWriter = req -> {
+            writeCacheInfo(req);
+            writeObject(req, key);
+
+            if (additionalPayloadWriter != null)
+                additionalPayloadWriter.accept(req);
+        };
+
+        // Transactional operation cannot be executed on affinity node, it 
should be executed on node started
+        // the transaction.
+        return transactions.tx() == null ? ch.affinityService(cacheId, key, 
op, payloadWriter, payloadReader) :
+            ch.service(op, payloadWriter, payloadReader);
+    }
+
     /** Write cache ID and flags. */
     private void writeCacheInfo(PayloadOutputChannel payloadCh) {
         BinaryOutputStream out = payloadCh.out();
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
index e31c614..e23e20b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
@@ -35,7 +35,9 @@ import java.security.cert.X509Certificate;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -70,6 +72,7 @@ import 
org.apache.ignite.internal.binary.streams.BinaryHeapInputStream;
 import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream;
 import org.apache.ignite.internal.binary.streams.BinaryInputStream;
 import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.platform.client.ClientFlag;
 import org.apache.ignite.internal.processors.platform.client.ClientStatus;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -101,6 +104,12 @@ class TcpClientChannel implements ClientChannel {
     /** Protocol version agreed with the server. */
     private ProtocolVersion ver = CURRENT_VER;
 
+    /** Server node ID. */
+    private UUID srvNodeId;
+
+    /** Server topology version. */
+    private AffinityTopologyVersion srvTopVer;
+
     /** Channel. */
     private final Socket sock;
 
@@ -122,6 +131,9 @@ class TcpClientChannel implements ClientChannel {
     /** Pending requests. */
     private final Map<Long, ClientRequestFuture> pendingReqs = new 
ConcurrentHashMap<>();
 
+    /** Topology change listeners. */
+    private final Collection<Consumer<ClientChannel>> topChangeLsnrs = new 
CopyOnWriteArrayList<>();
+
     /** Constructor. */
     TcpClientChannel(ClientChannelConfiguration cfg) throws 
ClientConnectionException, ClientAuthenticationException {
         validateConfiguration(cfg);
@@ -277,9 +289,13 @@ class TcpClientChannel implements ClientChannel {
             short flags = dataInput.readShort();
 
             if ((flags & ClientFlag.AFFINITY_TOPOLOGY_CHANGED) != 0) {
-                // TODO: IGNITE-11898 Implement Best Effort Affinity for java 
thin client.
-                dataInput.readLong(); // topVer.
-                dataInput.readInt(); // minorTopVer.
+                long topVer = dataInput.readLong();
+                int minorTopVer = dataInput.readInt();
+
+                srvTopVer = new AffinityTopologyVersion(topVer, minorTopVer);
+
+                for (Consumer<ClientChannel> lsnr : topChangeLsnrs)
+                    lsnr.accept(this);
             }
 
             if ((flags & ClientFlag.ERROR) != 0)
@@ -315,6 +331,21 @@ class TcpClientChannel implements ClientChannel {
         return ver;
     }
 
+    /** {@inheritDoc} */
+    @Override public UUID serverNodeId() {
+        return srvNodeId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public AffinityTopologyVersion serverTopologyVersion() {
+        return srvTopVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void addTopologyChangeListener(Consumer<ClientChannel> 
lsnr) {
+        topChangeLsnrs.add(lsnr);
+    }
+
     /** Validate {@link ClientConfiguration}. */
     private static void validateConfiguration(ClientChannelConfiguration cfg) {
         String error = null;
@@ -402,8 +433,7 @@ class TcpClientChannel implements ClientChannel {
         try (BinaryReaderExImpl r = new BinaryReaderExImpl(null, res, null, 
true)) {
             if (res.readBoolean()) { // Success flag.
                 if (ver.compareTo(V1_4_0) >= 0)
-                    // TODO: IGNITE-11898 Implement Best Effort Affinity for 
java thin client.
-                    r.readUuid(); // Server node UUID.
+                    srvNodeId = r.readUuid(); // Server node UUID.
             }
             else {
                 ProtocolVersion srvVer = new ProtocolVersion(res.readShort(), 
res.readShort(), res.readShort());
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
index 2f6e432..b6226a1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
@@ -75,20 +75,19 @@ public class TcpIgniteClient implements IgniteClient {
 
     /**
      * Private constructor. Use {@link 
TcpIgniteClient#start(ClientConfiguration)} to create an instance of
-     * {@link TcpClientChannel}.
+     * {@code TcpIgniteClient}.
      */
     private TcpIgniteClient(ClientConfiguration cfg) throws ClientException {
-        Function<ClientChannelConfiguration, Result<ClientChannel>> chFactory 
= chCfg -> {
-            try {
-                return new Result<>(new TcpClientChannel(chCfg));
-            }
-            catch (ClientException e) {
-                return new Result<>(e);
-            }
-        };
-
-        ch = new ReliableChannel(chFactory, cfg);
+        this(TcpClientChannel::new, cfg);
+    }
 
+    /**
+     * Constructor with custom channel factory.
+     */
+    TcpIgniteClient(
+        Function<ClientChannelConfiguration, ClientChannel> chFactory,
+        ClientConfiguration cfg
+    ) throws ClientException {
         marsh = new ClientBinaryMarshaller(new ClientBinaryMetadataHandler(), 
new ClientMarshallerContext());
 
         marsh.setBinaryConfiguration(cfg.getBinaryConfiguration());
@@ -97,6 +96,8 @@ public class TcpIgniteClient implements IgniteClient {
 
         binary = new ClientBinary(marsh);
 
+        ch = new ReliableChannel(chFactory, cfg, binary);
+
         transactions = new TcpClientTransactions(ch, marsh,
             new 
ClientTransactionConfiguration(cfg.getTransactionConfiguration()));
     }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java 
b/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
index 1bb3698..a746c3d 100644
--- a/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
@@ -61,6 +61,7 @@ public class ReliabilityTest extends GridCommonAbstractTest {
 
         try (LocalIgniteCluster cluster = 
LocalIgniteCluster.start(CLUSTER_SIZE);
              IgniteClient client = Ignition.startClient(new 
ClientConfiguration()
+                 .setReconnectThrottlingRetries(0) // Disable throttling.
                  .setAddresses(cluster.clientAddresses().toArray(new 
String[CLUSTER_SIZE]))
              )
         ) {
@@ -262,6 +263,51 @@ public class ReliabilityTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * Test reconnection throttling.
+     */
+    @Test
+    @SuppressWarnings("ThrowableNotThrown")
+    public void testReconnectionThrottling() throws Exception {
+        int throttlingRetries = 5;
+        long throttlingPeriod = 3_000L;
+
+        try (LocalIgniteCluster cluster = LocalIgniteCluster.start(1);
+             IgniteClient client = Ignition.startClient(new 
ClientConfiguration()
+                 .setReconnectThrottlingPeriod(throttlingPeriod)
+                 .setReconnectThrottlingRetries(throttlingRetries)
+                 .setAddresses(cluster.clientAddresses().toArray(new 
String[1])))
+        ) {
+            ClientCache<Integer, Integer> cache = client.createCache("cache");
+
+            for (int i = 0; i < throttlingRetries; i++) {
+                // Attempts to reconnect within throttlingRetries should pass.
+                cache.put(0, 0);
+
+                dropAllThinClientConnections(Ignition.allGrids().get(0));
+
+                GridTestUtils.assertThrowsWithCause(() -> cache.put(0, 0), 
ClientConnectionException.class);
+            }
+
+            for (int i = 0; i < 10; i++) // Attempts to reconnect after 
throttlingRetries should fail.
+                GridTestUtils.assertThrowsWithCause(() -> cache.put(0, 0), 
ClientConnectionException.class);
+
+            doSleep(throttlingPeriod);
+
+            // Attempt to reconnect after throttlingPeriod should pass.
+            assertTrue(GridTestUtils.waitForCondition(() -> {
+                try {
+                    cache.put(0, 0);
+
+                    return true;
+                }
+                catch (ClientConnectionException e) {
+                    return false;
+                }
+            }, throttlingPeriod));
+        }
+    }
+
+    /**
      * Drop all thin client connections on given Ignite instance.
      *
      * @param ignite Ignite.
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractAffinityAwarenessTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractAffinityAwarenessTest.java
new file mode 100644
index 0000000..d26815d
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractAffinityAwarenessTest.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.ignite.cache.CacheKeyConfiguration;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.AffinityKeyMapped;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.client.ClientException;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static 
org.apache.ignite.configuration.ClientConnectorConfiguration.DFLT_PORT;
+
+/**
+ * Abstract thin client affinity awareness test.
+ */
+public abstract class ThinClientAbstractAffinityAwarenessTest extends 
GridCommonAbstractTest {
+    /** Wait timeout. */
+    private static final long WAIT_TIMEOUT = 5_000L;
+
+    /** Replicated cache name. */
+    protected static final String REPL_CACHE_NAME = "replicated_cache";
+
+    /** Partitioned cache name. */
+    protected static final String PART_CACHE_NAME = "partitioned_cache";
+
+    /** Partitioned with custom affinity cache name. */
+    protected static final String PART_CUSTOM_AFFINITY_CACHE_NAME = 
"partitioned_custom_affinity_cache";
+
+    /** Keys count. */
+    protected static final int KEY_CNT = 30;
+
+    /** Max cluster size. */
+    protected static final int MAX_CLUSTER_SIZE = 4;
+
+    /** Channels. */
+    protected final TestTcpClientChannel[] channels = new 
TestTcpClientChannel[MAX_CLUSTER_SIZE];
+
+    /** Operations queue. */
+    protected final Queue<T2<TestTcpClientChannel, ClientOperation>> opsQueue 
= new ConcurrentLinkedQueue<>();
+
+    /** Default channel. */
+    protected TestTcpClientChannel dfltCh;
+
+    /** Client instance. */
+    protected IgniteClient client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setConsistentId(igniteInstanceName);
+
+        CacheConfiguration ccfg0 = new CacheConfiguration()
+            .setName(REPL_CACHE_NAME)
+            .setCacheMode(CacheMode.REPLICATED);
+
+        CacheConfiguration ccfg1 = new CacheConfiguration()
+            .setName(PART_CUSTOM_AFFINITY_CACHE_NAME)
+            .setCacheMode(CacheMode.PARTITIONED)
+            .setAffinity(new CustomAffinityFunction());
+
+        CacheConfiguration ccfg2 = new CacheConfiguration()
+            .setName(PART_CACHE_NAME)
+            .setCacheMode(CacheMode.PARTITIONED)
+            .setKeyConfiguration(
+                new 
CacheKeyConfiguration(TestNotAnnotatedAffinityKey.class.getName(), 
"affinityKey"),
+                new CacheKeyConfiguration(TestAnnotatedAffinityKey.class));
+
+        return cfg.setCacheConfiguration(ccfg0, ccfg1, ccfg2);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        opsQueue.clear();
+    }
+
+    /**
+     * Checks that operation goes through specified channel.
+     */
+    protected void assertOpOnChannel(TestTcpClientChannel expCh, 
ClientOperation expOp) {
+        T2<TestTcpClientChannel, ClientOperation> nextChOp = opsQueue.poll();
+
+        assertNotNull("Unexpected (null) next operation [expCh=" + expCh + ", 
expOp=" + expOp + ']', nextChOp);
+
+        assertEquals("Unexpected channel for opertation [expCh=" + expCh + ", 
expOp=" + expOp +
+            ", nextOpCh=" + nextChOp + ']', expCh, nextChOp.get1());
+
+        assertEquals("Unexpected operation on channel [expCh=" + expCh + ", 
expOp=" + expOp +
+            ", nextOpCh=" + nextChOp + ']', expOp, nextChOp.get2());
+    }
+
+    /**
+     * Calculates affinity channel for cache and key.
+     */
+    protected TestTcpClientChannel affinityChannel(Object key, 
IgniteInternalCache<Object, Object> cache) {
+        Collection<ClusterNode> nodes = 
cache.affinity().mapKeyToPrimaryAndBackups(key);
+
+        UUID nodeId = nodes.iterator().next().id();
+
+        for (int i = 0; i < channels.length; i++) {
+            if (channels[i] != null && 
nodeId.equals(channels[i].serverNodeId()))
+                return channels[i];
+        }
+
+        return dfltCh;
+    }
+
+    /**
+     * @param nodeIdxs Node indexes to connect to.
+     */
+    protected ClientConfiguration getClientConfiguration(int... nodeIdxs) {
+        String addrs[] = Arrays.stream(nodeIdxs).mapToObj(nodeIdx -> 
"127.0.0.1:" + (DFLT_PORT + nodeIdx))
+            .toArray(String[]::new);
+
+        return new 
ClientConfiguration().setAddresses(addrs).setAffinityAwarenessEnabled(true);
+    }
+
+    /**
+     * @param clientCfg Client configuration.
+     * @param chIdxs Channels to wait for initialization.
+     */
+    protected void initClient(ClientConfiguration clientCfg, int... chIdxs) 
throws IgniteInterruptedCheckedException {
+        client = new TcpIgniteClient(cfg -> {
+            try {
+                log.info("Establishing connection to " + cfg.getAddress());
+
+                TcpClientChannel ch = new TestTcpClientChannel(cfg);
+
+                log.info("Channel initialized: " + ch);
+
+                return ch;
+            }
+            catch (Exception e) {
+                log.warning("Failed to initialize channel: " + e.getMessage());
+
+                throw e;
+            }
+        }, clientCfg);
+
+        awaitChannelsInit(chIdxs);
+
+        initDefaultChannel();
+    }
+
+    /**
+     *
+     */
+    protected void initDefaultChannel() {
+        opsQueue.clear();
+
+        // Send non-affinity request to determine default channel.
+        client.getOrCreateCache(REPL_CACHE_NAME);
+
+        T2<TestTcpClientChannel, ClientOperation> nextChOp = opsQueue.poll();
+
+        assertNotNull(nextChOp);
+
+        assertEquals(nextChOp.get2(), 
ClientOperation.CACHE_GET_OR_CREATE_WITH_NAME);
+
+        dfltCh = nextChOp.get1();
+    }
+
+    /**
+     * @param chIdxs Channel idxs.
+     */
+    protected void awaitChannelsInit(int... chIdxs) throws 
IgniteInterruptedCheckedException {
+        // Wait until all channels initialized.
+        for (int ch : chIdxs) {
+            assertTrue("Failed to wait for channel[" + ch + "] init",
+                GridTestUtils.waitForCondition(() -> channels[ch] != null, 
WAIT_TIMEOUT));
+        }
+    }
+
+    /**
+     *
+     */
+    private static class CustomAffinityFunction extends 
RendezvousAffinityFunction {
+        // No-op.
+    }
+
+    /**
+     * Test class without affinity key.
+     */
+    protected static class TestComplexKey {
+        /** */
+        int firstField;
+
+        /** Another field. */
+        int secondField;
+
+        /** */
+        public TestComplexKey(int firstField, int secondField) {
+            this.firstField = firstField;
+            this.secondField = secondField;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return firstField + secondField;
+        }
+    }
+
+    /**
+     * Test class with annotated affinity key.
+     */
+    protected static class TestAnnotatedAffinityKey {
+        /** */
+        @AffinityKeyMapped
+        int affinityKey;
+
+        /** */
+        int anotherField;
+
+        /** */
+        public TestAnnotatedAffinityKey(int affinityKey, int anotherField) {
+            this.affinityKey = affinityKey;
+            this.anotherField = anotherField;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return affinityKey + anotherField;
+        }
+    }
+
+    /**
+     * Test class with affinity key without annotation.
+     */
+    protected static class TestNotAnnotatedAffinityKey {
+        /** */
+        TestComplexKey affinityKey;
+
+        /** */
+        int anotherField;
+
+        /** */
+        public TestNotAnnotatedAffinityKey(TestComplexKey affinityKey, int 
anotherField) {
+            this.affinityKey = affinityKey;
+            this.anotherField = anotherField;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return affinityKey.hashCode() + anotherField;
+        }
+    }
+
+    /**
+     * Test TCP client channel.
+     */
+    protected class TestTcpClientChannel extends TcpClientChannel {
+        /** Channel configuration. */
+        private final ClientChannelConfiguration cfg;
+
+        /**
+         * @param cfg Config.
+         */
+        public TestTcpClientChannel(ClientChannelConfiguration cfg) {
+            super(cfg);
+
+            this.cfg = cfg;
+
+            int chIdx = cfg.getAddress().getPort() - DFLT_PORT;
+
+            channels[chIdx] = this;
+
+            addTopologyChangeListener(ch -> log.info("Topology change detected 
[ch=" + ch + ", topVer=" +
+                ch.serverTopologyVersion() + ']'));
+        }
+
+        /** {@inheritDoc} */
+        @Override public <T> T service(ClientOperation op, 
Consumer<PayloadOutputChannel> payloadWriter,
+            Function<PayloadInputChannel, T> payloadReader) throws 
ClientException {
+            T res = super.service(op, payloadWriter, payloadReader);
+
+            // Store all operations except binary type registration in queue 
to check later.
+            if (op != ClientOperation.REGISTER_BINARY_TYPE_NAME &&  op != 
ClientOperation.PUT_BINARY_TYPE)
+                opsQueue.offer(new T2<>(this, op));
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return cfg.getAddress().toString();
+        }
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAffinityAwarenessStableTopologyTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAffinityAwarenessStableTopologyTest.java
new file mode 100644
index 0000000..7854d4b
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAffinityAwarenessStableTopologyTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.function.Function;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.client.ClientCache;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.junit.Test;
+
+/**
+ * Test affinity awareness of thin client on stable topology.
+ */
+public class ThinClientAffinityAwarenessStableTopologyTest extends 
ThinClientAbstractAffinityAwarenessTest {
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrids(3);
+
+        awaitPartitionMapExchange();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        // Add one extra node address to the list, skip 0 node.
+        initClient(getClientConfiguration(1, 2, 3), 1, 2);
+    }
+
+    /**
+     * Test that affinity awareness is not applicable for replicated cache.
+     */
+    @Test
+    public void testReplicatedCache() {
+        testNotApplicableCache(REPL_CACHE_NAME);
+    }
+
+    /**
+     * Test that affinity awareness is not applicable for partitioned cache 
with custom affinity function.
+     */
+    @Test
+    public void testPartitionedCustomAffinityCache() {
+        testNotApplicableCache(PART_CUSTOM_AFFINITY_CACHE_NAME);
+    }
+
+    /**
+     * Test affinity awareness for all applicable operation types for 
partitioned cache with primitive key.
+     */
+    @Test
+    public void testPartitionedCachePrimitiveKey() {
+        testApplicableCache(PART_CACHE_NAME, i -> i);
+    }
+
+    /**
+     * Test affinity awareness for all applicable operation types for 
partitioned cache with complex key.
+     */
+    @Test
+    public void testPartitionedCacheComplexKey() {
+        testApplicableCache(PART_CACHE_NAME, i -> new TestComplexKey(i, i));
+    }
+
+    /**
+     * Test affinity awareness for all applicable operation types for 
partitioned cache with annotated affinity
+     * mapped key.
+     */
+    @Test
+    public void testPartitionedCacheAnnotatedAffinityKey() {
+        testApplicableCache(PART_CACHE_NAME, i -> new 
TestAnnotatedAffinityKey(i, i));
+    }
+
+    /**
+     * Test affinity awareness for all applicable operation types for 
partitioned cache with not annotated affinity
+     * mapped key.
+     */
+    @Test
+    public void testPartitionedCacheNotAnnotatedAffinityKey() {
+        testApplicableCache(PART_CACHE_NAME, i -> new 
TestNotAnnotatedAffinityKey(new TestComplexKey(i, i), i));
+    }
+
+    /**
+     * Test request to partition mapped to unknown for client node.
+     */
+    @Test
+    public void testPartitionedCacheUnknownNode() throws 
IgniteCheckedException {
+        ClientCache<Object, Object> clientCache = 
client.cache(PART_CACHE_NAME);
+
+        // We don't included grid(0) address to list of addresses known for 
the client, so client don't have connection
+        // with grid(0)
+        Integer keyForUnknownNode = primaryKey(grid(0).cache(PART_CACHE_NAME));
+
+        assertNotNull("Not found key for node " + grid(0).localNode().id(), 
keyForUnknownNode);
+
+        clientCache.put(keyForUnknownNode, 0);
+
+        assertOpOnChannel(dfltCh, ClientOperation.CACHE_PARTITIONS);
+        assertOpOnChannel(dfltCh, ClientOperation.CACHE_PUT);
+    }
+
+    /**
+     * @param cacheName Cache name.
+     */
+    private void testNotApplicableCache(String cacheName) {
+        ClientCache<Object, Object> cache = client.cache(cacheName);
+
+        // After first response we should send partitions request on default 
channel together with next request.
+        cache.put(0, 0);
+
+        assertOpOnChannel(dfltCh, ClientOperation.CACHE_PARTITIONS);
+        assertOpOnChannel(dfltCh, ClientOperation.CACHE_PUT);
+
+        for (int i = 1; i < KEY_CNT; i++) {
+            cache.put(i, i);
+
+            assertOpOnChannel(dfltCh, ClientOperation.CACHE_PUT);
+
+            cache.get(i);
+
+            assertOpOnChannel(dfltCh, ClientOperation.CACHE_GET);
+        }
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @param keyFactory Key factory function.
+     */
+    private void testApplicableCache(String cacheName, Function<Integer, 
Object> keyFactory) {
+        ClientCache<Object, Object> clientCache = client.cache(cacheName);
+        IgniteInternalCache<Object, Object> igniteCache = 
grid(0).context().cache().cache(cacheName);
+
+        clientCache.put(keyFactory.apply(0), 0);
+
+        TestTcpClientChannel opCh = affinityChannel(keyFactory.apply(0), 
igniteCache);
+
+        // Default channel is the first who detects topology change, so next 
partition request will go through
+        // the default channel.
+        assertOpOnChannel(dfltCh, ClientOperation.CACHE_PARTITIONS);
+        assertOpOnChannel(opCh, ClientOperation.CACHE_PUT);
+
+        for (int i = 1; i < KEY_CNT; i++) {
+            Object key = keyFactory.apply(i);
+
+            opCh = affinityChannel(key, igniteCache);
+
+            clientCache.put(key, key);
+
+            assertOpOnChannel(opCh, ClientOperation.CACHE_PUT);
+
+            clientCache.get(key);
+
+            assertOpOnChannel(opCh, ClientOperation.CACHE_GET);
+
+            clientCache.containsKey(key);
+
+            assertOpOnChannel(opCh, ClientOperation.CACHE_CONTAINS_KEY);
+
+            clientCache.replace(key, i);
+
+            assertOpOnChannel(opCh, ClientOperation.CACHE_REPLACE);
+
+            clientCache.replace(key, i, i);
+
+            assertOpOnChannel(opCh, ClientOperation.CACHE_REPLACE_IF_EQUALS);
+
+            clientCache.remove(key);
+
+            assertOpOnChannel(opCh, ClientOperation.CACHE_REMOVE_KEY);
+
+            clientCache.remove(key, i);
+
+            assertOpOnChannel(opCh, ClientOperation.CACHE_REMOVE_IF_EQUALS);
+
+            clientCache.getAndPut(key, i);
+
+            assertOpOnChannel(opCh, ClientOperation.CACHE_GET_AND_PUT);
+
+            clientCache.getAndRemove(key);
+
+            assertOpOnChannel(opCh, ClientOperation.CACHE_GET_AND_REMOVE);
+
+            clientCache.getAndReplace(key, i);
+
+            assertOpOnChannel(opCh, ClientOperation.CACHE_GET_AND_REPLACE);
+
+            clientCache.putIfAbsent(key, i);
+
+            assertOpOnChannel(opCh, ClientOperation.CACHE_PUT_IF_ABSENT);
+        }
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAffinityAwarenessUnstableTopologyTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAffinityAwarenessUnstableTopologyTest.java
new file mode 100644
index 0000000..fcf64d6
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAffinityAwarenessUnstableTopologyTest.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.lang.management.ManagementFactory;
+import javax.management.MBeanServerInvocationHandler;
+import javax.management.ObjectName;
+import org.apache.ignite.client.ClientCache;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.mxbean.ClientProcessorMXBean;
+import org.junit.Test;
+
+/**
+ * Test affinity awareness of thin client on unstable topology.
+ */
+public class ThinClientAffinityAwarenessUnstableTopologyTest extends 
ThinClientAbstractAffinityAwarenessTest {
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /**
+     * Test that join of the new node is detected by the client and affects 
affinity awareness.
+     */
+    @Test
+    public void testAffinityAwarenessOnNodeJoin() throws Exception {
+        startGrids(3);
+
+        awaitPartitionMapExchange();
+
+        initClient(getClientConfiguration(1, 2, 3), 1, 2);
+
+        // Test affinity awareness before node join.
+        testAffinityAwareness(true);
+
+        assertNull(channels[3]);
+
+        startGrid(3);
+
+        awaitPartitionMapExchange();
+
+        // Send non-affinity request to detect topology change.
+        ClientCache<Object, Object> cache = 
client.getOrCreateCache(PART_CACHE_NAME);
+
+        awaitChannelsInit(3);
+
+        assertOpOnChannel(dfltCh, 
ClientOperation.CACHE_GET_OR_CREATE_WITH_NAME);
+
+        Integer key = primaryKey(grid(3).cache(PART_CACHE_NAME));
+
+        assertNotNull("Not found key for node 3", key);
+
+        cache.put(key, 0);
+
+        // Cache partitions are requested from default channel, since it's 
first (and currently the only) channel
+        // which detects new topology.
+        assertOpOnChannel(dfltCh, ClientOperation.CACHE_PARTITIONS);
+
+        assertOpOnChannel(channels[3], ClientOperation.CACHE_PUT);
+
+        // Test affinity awareness after node join.
+        testAffinityAwareness(false);
+    }
+
+    /**
+     * Test that node left event affects affinity awareness.
+     */
+    @Test
+    public void testAffinityAwarenessOnNodeLeft() throws Exception {
+        startGrids(4);
+
+        awaitPartitionMapExchange();
+
+        initClient(getClientConfiguration(1, 2, 3), 1, 2, 3);
+
+        // Test affinity awareness before node left.
+        testAffinityAwareness(true);
+
+        stopGrid(3);
+
+        channels[3] = null;
+
+        awaitPartitionMapExchange();
+
+        // Next request will also detect topology change.
+        initDefaultChannel();
+
+        // Test affinity awareness after node join.
+        testAffinityAwareness(true);
+    }
+
+    /**
+     * Test connection restore to affinity nodes.
+     */
+    @Test
+    public void testConnectionLoss() throws Exception {
+        startGrids(2);
+
+        awaitPartitionMapExchange();
+
+        initClient(getClientConfiguration(0, 1), 0, 1);
+
+        // Test affinity awareness before connection to node lost.
+        testAffinityAwareness(true);
+
+        // Choose node to disconnect (not default node).
+        int disconnectNodeIdx = dfltCh == channels[0] ? 1 : 0;
+
+        // Drop all thin connections from the node.
+        ObjectName mbeanName = U.makeMBeanName(grid(disconnectNodeIdx).name(), 
"Clients",
+            ClientListenerProcessor.class.getSimpleName());
+
+        
MBeanServerInvocationHandler.newProxyInstance(ManagementFactory.getPlatformMBeanServer(),
 mbeanName,
+            ClientProcessorMXBean.class, true).dropAllConnections();
+
+        channels[disconnectNodeIdx] = null;
+
+        // Send request to disconnected node.
+        ClientCache<Object, Object> cache = client.cache(PART_CACHE_NAME);
+
+        Integer key = 
primaryKey(grid(disconnectNodeIdx).cache(PART_CACHE_NAME));
+
+        assertNotNull("Not found key for node " + disconnectNodeIdx, key);
+
+        cache.put(key, 0);
+
+        // Request goes to default channel, since affinity node is 
disconnected.
+        assertOpOnChannel(dfltCh, ClientOperation.CACHE_PUT);
+
+        cache.put(key, 0);
+
+        // Connection to disconnected node should be restored after retry.
+        assertOpOnChannel(channels[disconnectNodeIdx], 
ClientOperation.CACHE_PUT);
+
+        // Test affinity awareness.
+        testAffinityAwareness(false);
+    }
+
+    /**
+     * Test that affinity awareness works when reconnecting to the new cluster 
(with lower topology version)
+     */
+    @Test
+    public void testAffinityAwarenessOnClusterRestart() throws Exception {
+        startGrids(3);
+
+        awaitPartitionMapExchange();
+
+        initClient(getClientConfiguration(0, 1, 2), 0, 1, 2);
+
+        // Test affinity awareness before cluster restart.
+        testAffinityAwareness(true);
+
+        stopAllGrids();
+
+        for (int i = 0; i < channels.length; i++)
+            channels[i] = null;
+
+        // Start 2 grids, so topology version of the new cluster will be less 
then old cluster.
+        startGrids(2);
+
+        awaitPartitionMapExchange();
+
+        // Send any request to failover.
+        try {
+            client.cache(REPL_CACHE_NAME).put(0, 0);
+        }
+        catch (Exception expected) {
+            // No-op.
+        }
+
+        initDefaultChannel();
+
+        awaitChannelsInit(0, 1);
+
+        testAffinityAwareness(true);
+    }
+
+    /**
+     * Checks that each request goes to right node.
+     *
+     * @param partReq Next operation should request partitions map.
+     */
+    private void testAffinityAwareness(boolean partReq) {
+        ClientCache<Object, Object> clientCache = 
client.cache(PART_CACHE_NAME);
+        IgniteInternalCache<Object, Object> igniteCache = 
grid(0).context().cache().cache(PART_CACHE_NAME);
+
+        for (int i = 0; i < KEY_CNT; i++) {
+            TestTcpClientChannel opCh = affinityChannel(i, igniteCache);
+
+            clientCache.put(i, i);
+
+            if (partReq) {
+                assertOpOnChannel(dfltCh, ClientOperation.CACHE_PARTITIONS);
+
+                partReq = false;
+            }
+
+            assertOpOnChannel(opCh, ClientOperation.CACHE_PUT);
+        }
+    }
+}
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java 
b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
index 80a77ff..b27aba9 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.client;
 
+import 
org.apache.ignite.internal.client.thin.ThinClientAffinityAwarenessStableTopologyTest;
+import 
org.apache.ignite.internal.client.thin.ThinClientAffinityAwarenessUnstableTopologyTest;
 import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
 
@@ -37,7 +39,9 @@ import org.junit.runners.Suite;
     SslParametersTest.class,
     ConnectionTest.class,
     ConnectToStartingNodeTest.class,
-    AsyncChannelTest.class
+    AsyncChannelTest.class,
+    ThinClientAffinityAwarenessStableTopologyTest.class,
+    ThinClientAffinityAwarenessUnstableTopologyTest.class
 })
 public class ClientTestSuite {
     // No-op.

Reply via email to