This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch ignite-16771
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/ignite-16771 by this push:
     new 47d4df876 add serviceAsync overload with preferredNodeName
47d4df876 is described below

commit 47d4df8763400d952fa58008a4c424b752ea71b5
Author: Pavel Tupitsyn <ptupit...@apache.org>
AuthorDate: Thu Apr 14 14:56:04 2022 +0300

    add serviceAsync overload with preferredNodeName
---
 .../ignite/internal/client/ReliableChannel.java    | 46 +++++++++++++++-------
 1 file changed, 32 insertions(+), 14 deletions(-)

diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
index 5f4bd0310..49cda0aac 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
@@ -142,7 +142,7 @@ public final class ReliableChannel implements AutoCloseable 
{
         CompletableFuture<T> fut = new CompletableFuture<>();
 
         // Use the only one attempt to avoid blocking async method.
-        handleServiceAsync(fut, opCode, payloadWriter, payloadReader, null, 0);
+        handleServiceAsync(fut, opCode, payloadWriter, payloadReader, 
preferredNodeName, null, 0);
 
         return fut;
     }
@@ -180,26 +180,44 @@ public final class ReliableChannel implements 
AutoCloseable {
             int opCode,
             PayloadWriter payloadWriter,
             PayloadReader<T> payloadReader,
+            String preferredNodeName,
             IgniteClientConnectionException failure,
             int attempt) {
-        ClientChannel ch;
-        try {
-            ch = getDefaultChannel();
-        } catch (Throwable ex) {
-            if (failure != null) {
-                failure.addSuppressed(ex);
+        ClientChannel ch = null;
 
-                fut.completeExceptionally(failure);
+        if (preferredNodeName != null) {
+            var holder = nodeChannels.get(preferredNodeName);
 
-                return;
+            if (holder != null) {
+                try {
+                    ch = holder.getOrCreateChannel();
+                } catch (Throwable ignored) {
+                    // Ignore.
+                }
             }
+        }
+
+        if (ch == null) {
+            try {
+                ch = getDefaultChannel();
+            } catch (Throwable ex) {
+                if (failure != null) {
+                    failure.addSuppressed(ex);
 
-            fut.completeExceptionally(ex);
+                    fut.completeExceptionally(failure);
 
-            return;
+                    return;
+                }
+
+                fut.completeExceptionally(ex);
+
+                return;
+            }
         }
 
-        ch
+        final ClientChannel ch0 = ch;
+
+        ch0
                 .serviceAsync(opCode, payloadWriter, payloadReader)
                 .handle((res, err) -> {
                     if (err == null) {
@@ -219,7 +237,7 @@ public final class ReliableChannel implements AutoCloseable 
{
 
                         try {
                             // Will try to reinit channels if topology changed.
-                            onChannelFailure(ch);
+                            onChannelFailure(ch0);
                         } catch (Throwable ex) {
                             fut.completeExceptionally(ex);
 
@@ -233,7 +251,7 @@ public final class ReliableChannel implements AutoCloseable 
{
                         }
 
                         if (shouldRetry(opCode, attempt, connectionErr)) {
-                            handleServiceAsync(fut, opCode, payloadWriter, 
payloadReader, failure0, attempt + 1);
+                            handleServiceAsync(fut, opCode, payloadWriter, 
payloadReader, null, failure0, attempt + 1);
 
                             return null;
                         }

Reply via email to