This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 2c08b950d5 IGNITE-20828 Do not retry attempts to unsubscribe in
TopologyAwareRaftGroupService (#2828)
2c08b950d5 is described below
commit 2c08b950d533e06de7654eb8711a4a94b05ce83b
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Fri Nov 10 17:27:42 2023 +0400
IGNITE-20828 Do not retry attempts to unsubscribe in
TopologyAwareRaftGroupService (#2828)
---
.../raft/client/TopologyAwareRaftGroupService.java | 62 +++++++++++++---------
1 file changed, 36 insertions(+), 26 deletions(-)
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
index da51581fb7..96fc615916 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
@@ -209,38 +209,48 @@ public class TopologyAwareRaftGroupService implements
RaftGroupService {
*/
private void sendWithRetry(ClusterNode node,
SubscriptionLeaderChangeRequest msg, CompletableFuture<Boolean> msgSendFut) {
clusterService.messagingService().invoke(node, msg,
raftConfiguration.responseTimeout().value()).whenCompleteAsync((unused, th) -> {
- if (th != null) {
- if (recoverable(th)) {
-
logicalTopologyService.logicalTopologyOnLeader().whenCompleteAsync((logicalTopologySnapshot,
topologyGetIssue) -> {
- if (topologyGetIssue != null) {
- LOG.error("Actual logical topology snapshot was
not got.", topologyGetIssue);
-
- msgSendFut.completeExceptionally(topologyGetIssue);
-
- return;
- }
-
- if (logicalTopologySnapshot.nodes().contains(node)) {
- sendWithRetry(node, msg, msgSendFut);
- } else {
- LOG.info("Could not subscribe to leader update
from a specific node, because the node had left from the"
- + " cluster [node={}]", node);
-
- msgSendFut.complete(false);
- }
- }, executor);
+ if (th == null) {
+ msgSendFut.complete(true);
+
+ return;
+ }
+
+ if (!msg.subscribe()) {
+ // We don't want to propagate exceptions when unsubscribing
(if it's not an Error!).
+
+ if (th instanceof Error) {
+ msgSendFut.completeExceptionally(th);
} else {
- if (!(th instanceof NodeStoppingException)) {
- LOG.error("Could not send the subscribe message to the
node [node={}, msg={}]", th, node, msg);
+ LOG.debug("An exception while trying to unsubscribe", th);
+
+ msgSendFut.complete(false);
+ }
+ } else if (recoverable(th)) {
+
logicalTopologyService.logicalTopologyOnLeader().whenCompleteAsync((logicalTopologySnapshot,
topologyGetIssue) -> {
+ if (topologyGetIssue != null) {
+ LOG.error("Actual logical topology snapshot was not
got.", topologyGetIssue);
+
+ msgSendFut.completeExceptionally(topologyGetIssue);
+
+ return;
}
- msgSendFut.completeExceptionally(th);
+ if (logicalTopologySnapshot.nodes().contains(node)) {
+ sendWithRetry(node, msg, msgSendFut);
+ } else {
+ LOG.info("Could not subscribe to leader update from a
specific node, because the node had left from the"
+ + " cluster [node={}]", node);
+
+ msgSendFut.complete(false);
+ }
+ }, executor);
+ } else {
+ if (!(th instanceof NodeStoppingException)) {
+ LOG.error("Could not send the subscribe message to the
node [node={}, msg={}]", th, node, msg);
}
- return;
+ msgSendFut.completeExceptionally(th);
}
-
- msgSendFut.complete(true);
}, executor);
}