This is an automated email from the ASF dual-hosted git repository.

CRZbulabula pushed a commit to branch fix/add-confignode-idempotent-retry
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 8a2485aee4d9c6c7d2429138fa7760d06d43ec9d
Author: Yongzao <[email protected]>
AuthorDate: Tue Jun 9 14:51:22 2026 +0800

    Fix AddConfigNode retry idempotency
---
 .../manager/consensus/ConsensusManager.java        |  23 +++-
 .../thrift/ConfigNodeRPCServiceProcessor.java      |   9 +-
 .../manager/consensus/ConsensusManagerTest.java    | 116 +++++++++++++++++++++
 .../thrift/ConfigNodeRPCServiceProcessorTest.java  |  47 +++++++++
 4 files changed, 192 insertions(+), 3 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
index 8b4eeed5a1b..84594b0d7a8 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.commons.conf.CommonConfig;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.consensus.ConfigRegionId;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.conf.SystemPropertiesUtils;
@@ -44,6 +45,9 @@ import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.config.ConsensusConfig;
 import org.apache.iotdb.consensus.config.RatisConfig;
 import org.apache.iotdb.consensus.exception.ConsensusException;
+import 
org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
+import 
org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException;
+import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
 import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -86,6 +90,12 @@ public class ConsensusManager {
     setConsensusLayer(stateMachine);
   }
 
+  @TestOnly
+  ConsensusManager(IManager configManager, IConsensus consensusImpl) {
+    this.configManager = configManager;
+    this.consensusImpl = consensusImpl;
+  }
+
   public void start() throws IOException {
     consensusImpl.start();
     if (SystemPropertiesUtils.isRestarted()) {
@@ -289,7 +299,11 @@ public class ConsensusManager {
               configNodeLocation.getConfigNodeId(),
               configNodeLocation.getConsensusEndPoint()));
     }
-    consensusImpl.createLocalPeer(DEFAULT_CONSENSUS_GROUP_ID, peerList);
+    try {
+      consensusImpl.createLocalPeer(DEFAULT_CONSENSUS_GROUP_ID, peerList);
+    } catch (ConsensusGroupAlreadyExistException e) {
+      LOGGER.info("ConfigNode local peer has already been created: {}", 
e.getMessage());
+    }
   }
 
   /**
@@ -306,6 +320,9 @@ public class ConsensusManager {
               DEFAULT_CONSENSUS_GROUP_ID,
               configNodeLocation.getConfigNodeId(),
               configNodeLocation.getConsensusEndPoint()));
+    } catch (PeerAlreadyInConsensusGroupException e) {
+      LOGGER.info(
+          "ConfigNode peer {} has already been added: {}", configNodeLocation, 
e.getMessage());
     } catch (ConsensusException e) {
       throw new AddPeerException(configNodeLocation);
     }
@@ -327,6 +344,10 @@ public class ConsensusManager {
               configNodeLocation.getConfigNodeId(),
               configNodeLocation.getConsensusEndPoint()));
       return true;
+    } catch (PeerNotInConsensusGroupException e) {
+      LOGGER.info(
+          "ConfigNode peer {} has already been removed: {}", 
configNodeLocation, e.getMessage());
+      return true;
     } catch (ConsensusException e) {
       return false;
     }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index dbb35839045..589d4e2a9c7 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -230,6 +230,7 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq;
 import org.apache.iotdb.confignode.rpc.thrift.TUnsubscribeReq;
 import org.apache.iotdb.confignode.service.ConfigNode;
 import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
 import org.apache.iotdb.db.queryengine.plan.relational.type.AuthorRType;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -859,15 +860,19 @@ public class ConfigNodeRPCServiceProcessor implements 
IConfigNodeRPCService.Ifac
 
   @Override
   public TSStatus deleteConfigNodePeer(TConfigNodeLocation configNodeLocation) 
{
-    if 
(!configManager.getNodeManager().getRegisteredConfigNodes().contains(configNodeLocation))
 {
+    if (configNodeConfig.getConfigNodeId() != -1
+        && configNodeLocation.getConfigNodeId() != 
configNodeConfig.getConfigNodeId()) {
       return new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_ERROR.getStatusCode())
           .setMessage(
-              "remove ConsensusGroup failed because the ConfigNode not in 
current Cluster.");
+              "remove ConsensusGroup failed because the target ConfigNode is 
not current ConfigNode.");
     }
 
     ConsensusGroupId groupId = 
configManager.getConsensusManager().getConsensusGroupId();
     try {
       
configManager.getConsensusManager().getConsensusImpl().deleteLocalPeer(groupId);
+    } catch (ConsensusGroupNotExistException e) {
+      return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())
+          .setMessage(ConfigNodeMessages.REMOVE_CONSENSUSGROUP_SUCCESS);
     } catch (ConsensusException e) {
       return new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_ERROR.getStatusCode())
           .setMessage(
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManagerTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManagerTest.java
new file mode 100644
index 00000000000..2f3b9b31890
--- /dev/null
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManagerTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.consensus;
+
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.confignode.exception.AddPeerException;
+import org.apache.iotdb.confignode.manager.IManager;
+import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import 
org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
+import 
org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException;
+import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.Collections;
+
+public class ConsensusManagerTest {
+
+  @Test
+  public void createPeerForConsensusGroupShouldIgnoreAlreadyCreatedLocalPeer() 
throws Exception {
+    final IConsensus consensus = Mockito.mock(IConsensus.class);
+    Mockito.doThrow(
+            new 
ConsensusGroupAlreadyExistException(ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID))
+        .when(consensus)
+        .createLocalPeer(
+            Mockito.eq(ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID), 
Mockito.anyList());
+
+    newConsensusManager(consensus)
+        
.createPeerForConsensusGroup(Collections.singletonList(newConfigNodeLocation(1)));
+
+    Mockito.verify(consensus)
+        .createLocalPeer(
+            Mockito.eq(ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID), 
Mockito.anyList());
+  }
+
+  @Test
+  public void addConfigNodePeerShouldIgnoreAlreadyAddedPeer() throws Exception 
{
+    final IConsensus consensus = Mockito.mock(IConsensus.class);
+    Mockito.doThrow(
+            new PeerAlreadyInConsensusGroupException(
+                ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID,
+                new Peer(
+                    ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID,
+                    1,
+                    new TEndPoint("127.0.0.1", 10720))))
+        .when(consensus)
+        .addRemotePeer(
+            Mockito.eq(ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID), 
Mockito.any(Peer.class));
+
+    newConsensusManager(consensus).addConfigNodePeer(newConfigNodeLocation(1));
+
+    Mockito.verify(consensus)
+        .addRemotePeer(
+            Mockito.eq(ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID), 
Mockito.any(Peer.class));
+  }
+
+  @Test
+  public void addConfigNodePeerShouldKeepFailingForOtherConsensusErrors() 
throws Exception {
+    final IConsensus consensus = Mockito.mock(IConsensus.class);
+    Mockito.doThrow(new ConsensusException("reconfiguration failed"))
+        .when(consensus)
+        .addRemotePeer(
+            Mockito.eq(ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID), 
Mockito.any(Peer.class));
+
+    Assert.assertThrows(
+        AddPeerException.class,
+        () -> 
newConsensusManager(consensus).addConfigNodePeer(newConfigNodeLocation(1)));
+  }
+
+  @Test
+  public void removeConfigNodePeerShouldIgnoreAlreadyRemovedPeer() throws 
Exception {
+    final IConsensus consensus = Mockito.mock(IConsensus.class);
+    Mockito.doThrow(
+            new PeerNotInConsensusGroupException(
+                ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID, 
"127.0.0.1:10720"))
+        .when(consensus)
+        .removeRemotePeer(
+            Mockito.eq(ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID), 
Mockito.any(Peer.class));
+
+    Assert.assertTrue(
+        
newConsensusManager(consensus).removeConfigNodePeer(newConfigNodeLocation(1)));
+  }
+
+  private static ConsensusManager newConsensusManager(final IConsensus 
consensus) throws Exception {
+    return new ConsensusManager(Mockito.mock(IManager.class), consensus);
+  }
+
+  private static TConfigNodeLocation newConfigNodeLocation(final int 
configNodeId) {
+    return new TConfigNodeLocation(
+        configNodeId,
+        new TEndPoint("127.0.0.1", 10710 + configNodeId),
+        new TEndPoint("127.0.0.1", 10720 + configNodeId));
+  }
+}
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
index c4d993b5a79..336bba31c37 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
@@ -27,12 +27,15 @@ import org.apache.iotdb.commons.conf.CommonConfig;
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 import 
org.apache.iotdb.confignode.consensus.response.datanode.DataNodeRegisterResp;
 import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartResp;
 import org.apache.iotdb.confignode.rpc.thrift.TRuntimeConfiguration;
 import org.apache.iotdb.confignode.service.ConfigNode;
+import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.rpc.TimeoutChangeableTFastFramedTransport;
 
@@ -161,4 +164,48 @@ public class ConfigNodeRPCServiceProcessorTest extends 
TestCase {
         "1.2.3.4",
         
sentRequest.getDataNodeConfiguration().getLocation().getClientRpcEndPoint().getIp());
   }
+
+  public void testDeleteConfigNodePeerShouldIgnoreMissingLocalPeer() throws 
Exception {
+    CommonConfig commonConfig = Mockito.mock(CommonConfig.class);
+    ConfigNodeConfig configNodeConfig = Mockito.mock(ConfigNodeConfig.class);
+    ConfigNode configNode = Mockito.mock(ConfigNode.class);
+    ConfigManager configManager = Mockito.mock(ConfigManager.class);
+    ConsensusManager consensusManager = Mockito.mock(ConsensusManager.class);
+    IConsensus consensus = Mockito.mock(IConsensus.class);
+    
Mockito.when(configManager.getConsensusManager()).thenReturn(consensusManager);
+    Mockito.when(consensusManager.getConsensusGroupId())
+        .thenReturn(ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID);
+    Mockito.when(consensusManager.getConsensusImpl()).thenReturn(consensus);
+    Mockito.doThrow(
+            new 
ConsensusGroupNotExistException(ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID))
+        .when(consensus)
+        .deleteLocalPeer(ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID);
+    ConfigNodeRPCServiceProcessor sut =
+        new ConfigNodeRPCServiceProcessor(
+            commonConfig, configNodeConfig, configNode, configManager);
+
+    TSStatus status = sut.deleteConfigNodePeer(new TConfigNodeLocation());
+
+    Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
+    Mockito.verify(configManager, Mockito.never()).getNodeManager();
+  }
+
+  public void testDeleteConfigNodePeerShouldRejectMismatchedTarget() throws 
Exception {
+    CommonConfig commonConfig = Mockito.mock(CommonConfig.class);
+    ConfigNodeConfig configNodeConfig = Mockito.mock(ConfigNodeConfig.class);
+    ConfigNode configNode = Mockito.mock(ConfigNode.class);
+    ConfigManager configManager = Mockito.mock(ConfigManager.class);
+    Mockito.when(configNodeConfig.getConfigNodeId()).thenReturn(2);
+    ConfigNodeRPCServiceProcessor sut =
+        new ConfigNodeRPCServiceProcessor(
+            commonConfig, configNodeConfig, configNode, configManager);
+
+    TSStatus status =
+        sut.deleteConfigNodePeer(
+            new TConfigNodeLocation(
+                1, new TEndPoint("127.0.0.1", 10710), new 
TEndPoint("127.0.0.1", 10720)));
+
+    Assert.assertEquals(TSStatusCode.REMOVE_CONFIGNODE_ERROR.getStatusCode(), 
status.getCode());
+    Mockito.verify(configManager, Mockito.never()).getConsensusManager();
+  }
 }

Reply via email to