This is an automated email from the ASF dual-hosted git repository.
CRZbulabula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c3e74a2a68e Fix AddConfigNode retry idempotency (#17874)
c3e74a2a68e is described below
commit c3e74a2a68ebea55ffa3cec682af82e76f4244ce
Author: Yongzao <[email protected]>
AuthorDate: Tue Jun 9 16:54:16 2026 +0800
Fix AddConfigNode retry idempotency (#17874)
---
.../manager/consensus/ConsensusManager.java | 23 +++-
.../thrift/ConfigNodeRPCServiceProcessor.java | 9 +-
.../manager/consensus/ConsensusManagerTest.java | 116 +++++++++++++++++++++
.../thrift/ConfigNodeRPCServiceProcessorTest.java | 47 +++++++++
4 files changed, 192 insertions(+), 3 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
index 8b4eeed5a1b..84594b0d7a8 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.ConfigRegionId;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.conf.SystemPropertiesUtils;
@@ -44,6 +45,9 @@ import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.config.ConsensusConfig;
import org.apache.iotdb.consensus.config.RatisConfig;
import org.apache.iotdb.consensus.exception.ConsensusException;
+import
org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
+import
org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException;
+import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -86,6 +90,12 @@ public class ConsensusManager {
setConsensusLayer(stateMachine);
}
+ @TestOnly
+ ConsensusManager(IManager configManager, IConsensus consensusImpl) {
+ this.configManager = configManager;
+ this.consensusImpl = consensusImpl;
+ }
+
public void start() throws IOException {
consensusImpl.start();
if (SystemPropertiesUtils.isRestarted()) {
@@ -289,7 +299,11 @@ public class ConsensusManager {
configNodeLocation.getConfigNodeId(),
configNodeLocation.getConsensusEndPoint()));
}
- consensusImpl.createLocalPeer(DEFAULT_CONSENSUS_GROUP_ID, peerList);
+ try {
+ consensusImpl.createLocalPeer(DEFAULT_CONSENSUS_GROUP_ID, peerList);
+ } catch (ConsensusGroupAlreadyExistException e) {
+ LOGGER.info("ConfigNode local peer has already been created: {}",
e.getMessage());
+ }
}
/**
@@ -306,6 +320,9 @@ public class ConsensusManager {
DEFAULT_CONSENSUS_GROUP_ID,
configNodeLocation.getConfigNodeId(),
configNodeLocation.getConsensusEndPoint()));
+ } catch (PeerAlreadyInConsensusGroupException e) {
+ LOGGER.info(
+ "ConfigNode peer {} has already been added: {}", configNodeLocation,
e.getMessage());
} catch (ConsensusException e) {
throw new AddPeerException(configNodeLocation);
}
@@ -327,6 +344,10 @@ public class ConsensusManager {
configNodeLocation.getConfigNodeId(),
configNodeLocation.getConsensusEndPoint()));
return true;
+ } catch (PeerNotInConsensusGroupException e) {
+ LOGGER.info(
+ "ConfigNode peer {} has already been removed: {}",
configNodeLocation, e.getMessage());
+ return true;
} catch (ConsensusException e) {
return false;
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index d16642be9d9..efd3f8abe80 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -232,6 +232,7 @@ import
org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq;
import org.apache.iotdb.confignode.rpc.thrift.TUnsubscribeReq;
import org.apache.iotdb.confignode.service.ConfigNode;
import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
import org.apache.iotdb.db.queryengine.plan.relational.type.AuthorRType;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -861,15 +862,19 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
@Override
public TSStatus deleteConfigNodePeer(TConfigNodeLocation configNodeLocation)
{
- if
(!configManager.getNodeManager().getRegisteredConfigNodes().contains(configNodeLocation))
{
+ if (configNodeConfig.getConfigNodeId() != -1
+ && configNodeLocation.getConfigNodeId() !=
configNodeConfig.getConfigNodeId()) {
return new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_ERROR.getStatusCode())
.setMessage(
- "remove ConsensusGroup failed because the ConfigNode not in
current Cluster.");
+ "remove ConsensusGroup failed because the target ConfigNode is
not current ConfigNode.");
}
ConsensusGroupId groupId =
configManager.getConsensusManager().getConsensusGroupId();
try {
configManager.getConsensusManager().getConsensusImpl().deleteLocalPeer(groupId);
+ } catch (ConsensusGroupNotExistException e) {
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())
+ .setMessage(ConfigNodeMessages.REMOVE_CONSENSUSGROUP_SUCCESS);
} catch (ConsensusException e) {
return new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_ERROR.getStatusCode())
.setMessage(
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManagerTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManagerTest.java
new file mode 100644
index 00000000000..2f3b9b31890
--- /dev/null
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManagerTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.consensus;
+
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.confignode.exception.AddPeerException;
+import org.apache.iotdb.confignode.manager.IManager;
+import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import
org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
+import
org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException;
+import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.Collections;
+
+public class ConsensusManagerTest {
+
+ @Test
+ public void createPeerForConsensusGroupShouldIgnoreAlreadyCreatedLocalPeer()
throws Exception {
+ final IConsensus consensus = Mockito.mock(IConsensus.class);
+ Mockito.doThrow(
+ new
ConsensusGroupAlreadyExistException(ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID))
+ .when(consensus)
+ .createLocalPeer(
+ Mockito.eq(ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID),
Mockito.anyList());
+
+ newConsensusManager(consensus)
+
.createPeerForConsensusGroup(Collections.singletonList(newConfigNodeLocation(1)));
+
+ Mockito.verify(consensus)
+ .createLocalPeer(
+ Mockito.eq(ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID),
Mockito.anyList());
+ }
+
+ @Test
+ public void addConfigNodePeerShouldIgnoreAlreadyAddedPeer() throws Exception
{
+ final IConsensus consensus = Mockito.mock(IConsensus.class);
+ Mockito.doThrow(
+ new PeerAlreadyInConsensusGroupException(
+ ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID,
+ new Peer(
+ ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID,
+ 1,
+ new TEndPoint("127.0.0.1", 10720))))
+ .when(consensus)
+ .addRemotePeer(
+ Mockito.eq(ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID),
Mockito.any(Peer.class));
+
+ newConsensusManager(consensus).addConfigNodePeer(newConfigNodeLocation(1));
+
+ Mockito.verify(consensus)
+ .addRemotePeer(
+ Mockito.eq(ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID),
Mockito.any(Peer.class));
+ }
+
+ @Test
+ public void addConfigNodePeerShouldKeepFailingForOtherConsensusErrors()
throws Exception {
+ final IConsensus consensus = Mockito.mock(IConsensus.class);
+ Mockito.doThrow(new ConsensusException("reconfiguration failed"))
+ .when(consensus)
+ .addRemotePeer(
+ Mockito.eq(ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID),
Mockito.any(Peer.class));
+
+ Assert.assertThrows(
+ AddPeerException.class,
+ () ->
newConsensusManager(consensus).addConfigNodePeer(newConfigNodeLocation(1)));
+ }
+
+ @Test
+ public void removeConfigNodePeerShouldIgnoreAlreadyRemovedPeer() throws
Exception {
+ final IConsensus consensus = Mockito.mock(IConsensus.class);
+ Mockito.doThrow(
+ new PeerNotInConsensusGroupException(
+ ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID,
"127.0.0.1:10720"))
+ .when(consensus)
+ .removeRemotePeer(
+ Mockito.eq(ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID),
Mockito.any(Peer.class));
+
+ Assert.assertTrue(
+
newConsensusManager(consensus).removeConfigNodePeer(newConfigNodeLocation(1)));
+ }
+
+ private static ConsensusManager newConsensusManager(final IConsensus
consensus) throws Exception {
+ return new ConsensusManager(Mockito.mock(IManager.class), consensus);
+ }
+
+ private static TConfigNodeLocation newConfigNodeLocation(final int
configNodeId) {
+ return new TConfigNodeLocation(
+ configNodeId,
+ new TEndPoint("127.0.0.1", 10710 + configNodeId),
+ new TEndPoint("127.0.0.1", 10720 + configNodeId));
+ }
+}
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
index c4d993b5a79..336bba31c37 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
@@ -27,12 +27,15 @@ import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import
org.apache.iotdb.confignode.consensus.response.datanode.DataNodeRegisterResp;
import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartResp;
import org.apache.iotdb.confignode.rpc.thrift.TRuntimeConfiguration;
import org.apache.iotdb.confignode.service.ConfigNode;
+import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.rpc.TimeoutChangeableTFastFramedTransport;
@@ -161,4 +164,48 @@ public class ConfigNodeRPCServiceProcessorTest extends
TestCase {
"1.2.3.4",
sentRequest.getDataNodeConfiguration().getLocation().getClientRpcEndPoint().getIp());
}
+
+ public void testDeleteConfigNodePeerShouldIgnoreMissingLocalPeer() throws
Exception {
+ CommonConfig commonConfig = Mockito.mock(CommonConfig.class);
+ ConfigNodeConfig configNodeConfig = Mockito.mock(ConfigNodeConfig.class);
+ ConfigNode configNode = Mockito.mock(ConfigNode.class);
+ ConfigManager configManager = Mockito.mock(ConfigManager.class);
+ ConsensusManager consensusManager = Mockito.mock(ConsensusManager.class);
+ IConsensus consensus = Mockito.mock(IConsensus.class);
+
Mockito.when(configManager.getConsensusManager()).thenReturn(consensusManager);
+ Mockito.when(consensusManager.getConsensusGroupId())
+ .thenReturn(ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID);
+ Mockito.when(consensusManager.getConsensusImpl()).thenReturn(consensus);
+ Mockito.doThrow(
+ new
ConsensusGroupNotExistException(ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID))
+ .when(consensus)
+ .deleteLocalPeer(ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID);
+ ConfigNodeRPCServiceProcessor sut =
+ new ConfigNodeRPCServiceProcessor(
+ commonConfig, configNodeConfig, configNode, configManager);
+
+ TSStatus status = sut.deleteConfigNodePeer(new TConfigNodeLocation());
+
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+ Mockito.verify(configManager, Mockito.never()).getNodeManager();
+ }
+
+ public void testDeleteConfigNodePeerShouldRejectMismatchedTarget() throws
Exception {
+ CommonConfig commonConfig = Mockito.mock(CommonConfig.class);
+ ConfigNodeConfig configNodeConfig = Mockito.mock(ConfigNodeConfig.class);
+ ConfigNode configNode = Mockito.mock(ConfigNode.class);
+ ConfigManager configManager = Mockito.mock(ConfigManager.class);
+ Mockito.when(configNodeConfig.getConfigNodeId()).thenReturn(2);
+ ConfigNodeRPCServiceProcessor sut =
+ new ConfigNodeRPCServiceProcessor(
+ commonConfig, configNodeConfig, configNode, configManager);
+
+ TSStatus status =
+ sut.deleteConfigNodePeer(
+ new TConfigNodeLocation(
+ 1, new TEndPoint("127.0.0.1", 10710), new
TEndPoint("127.0.0.1", 10720)));
+
+ Assert.assertEquals(TSStatusCode.REMOVE_CONFIGNODE_ERROR.getStatusCode(),
status.getCode());
+ Mockito.verify(configManager, Mockito.never()).getConsensusManager();
+ }
}