This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 2c08b950d5 IGNITE-20828 Do not retry attempts to unsubscribe in 
TopologyAwareRaftGroupService (#2828)
2c08b950d5 is described below

commit 2c08b950d533e06de7654eb8711a4a94b05ce83b
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Fri Nov 10 17:27:42 2023 +0400

    IGNITE-20828 Do not retry attempts to unsubscribe in 
TopologyAwareRaftGroupService (#2828)
---
 .../raft/client/TopologyAwareRaftGroupService.java | 62 +++++++++++++---------
 1 file changed, 36 insertions(+), 26 deletions(-)

diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
index da51581fb7..96fc615916 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
@@ -209,38 +209,48 @@ public class TopologyAwareRaftGroupService implements 
RaftGroupService {
      */
     private void sendWithRetry(ClusterNode node, 
SubscriptionLeaderChangeRequest msg, CompletableFuture<Boolean> msgSendFut) {
         clusterService.messagingService().invoke(node, msg, 
raftConfiguration.responseTimeout().value()).whenCompleteAsync((unused, th) -> {
-            if (th != null) {
-                if (recoverable(th)) {
-                    
logicalTopologyService.logicalTopologyOnLeader().whenCompleteAsync((logicalTopologySnapshot,
 topologyGetIssue) -> {
-                        if (topologyGetIssue != null) {
-                            LOG.error("Actual logical topology snapshot was 
not got.", topologyGetIssue);
-
-                            msgSendFut.completeExceptionally(topologyGetIssue);
-
-                            return;
-                        }
-
-                        if (logicalTopologySnapshot.nodes().contains(node)) {
-                            sendWithRetry(node, msg, msgSendFut);
-                        } else {
-                            LOG.info("Could not subscribe to leader update 
from a specific node, because the node had left from the"
-                                    + " cluster [node={}]", node);
-
-                            msgSendFut.complete(false);
-                        }
-                    }, executor);
+            if (th == null) {
+                msgSendFut.complete(true);
+
+                return;
+            }
+
+            if (!msg.subscribe()) {
+                // We don't want to propagate exceptions when unsubscribing 
(if it's not an Error!).
+
+                if (th instanceof Error) {
+                    msgSendFut.completeExceptionally(th);
                 } else {
-                    if (!(th instanceof NodeStoppingException)) {
-                        LOG.error("Could not send the subscribe message to the 
node [node={}, msg={}]", th, node, msg);
+                    LOG.debug("An exception while trying to unsubscribe", th);
+
+                    msgSendFut.complete(false);
+                }
+            } else if (recoverable(th)) {
+                
logicalTopologyService.logicalTopologyOnLeader().whenCompleteAsync((logicalTopologySnapshot,
 topologyGetIssue) -> {
+                    if (topologyGetIssue != null) {
+                        LOG.error("Actual logical topology snapshot was not 
got.", topologyGetIssue);
+
+                        msgSendFut.completeExceptionally(topologyGetIssue);
+
+                        return;
                     }
 
-                    msgSendFut.completeExceptionally(th);
+                    if (logicalTopologySnapshot.nodes().contains(node)) {
+                        sendWithRetry(node, msg, msgSendFut);
+                    } else {
+                        LOG.info("Could not subscribe to leader update from a 
specific node, because the node had left from the"
+                                + " cluster [node={}]", node);
+
+                        msgSendFut.complete(false);
+                    }
+                }, executor);
+            } else {
+                if (!(th instanceof NodeStoppingException)) {
+                    LOG.error("Could not send the subscribe message to the 
node [node={}, msg={}]", th, node, msg);
                 }
 
-                return;
+                msgSendFut.completeExceptionally(th);
             }
-
-            msgSendFut.complete(true);
         }, executor);
     }
 

Reply via email to