This is an automated email from the ASF dual-hosted git repository. JackieTien97 pushed a commit to branch rc/2.0.10 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4b9b87960e7f52753459914a90d2d1b86244e6cd Author: Yongzao <[email protected]> AuthorDate: Wed Jun 10 16:24:04 2026 +0800 Bound Ratis reconfiguration retries and add region migration ITs (#17895) --- .../it/env/cluster/config/MppCommonConfig.java | 21 +++ .../env/cluster/config/MppSharedCommonConfig.java | 21 +++ .../it/env/remote/config/RemoteCommonConfig.java | 15 ++ .../org/apache/iotdb/itbase/env/CommonConfig.java | 6 + .../IoTDBRegionMigrateITFrameworkForRatis.java | 42 ++++++ ...IoTDBRegionOperationReliabilityITFramework.java | 128 +++++++++++++--- ...oTDBRegionMigrateAddingPeerCrashForRatisIT.java | 48 ++++++ .../IoTDBRegionMigrateClusterCrashForRatisIT.java | 76 ++++++++++ ...oTDBRegionMigrateConfigNodeCrashForRatisIT.java | 164 +++++++++++++++++++++ .../iotdb/confignode/conf/ConfigNodeConfig.java | 40 +++++ .../confignode/conf/ConfigNodeDescriptor.java | 15 ++ .../manager/consensus/ConsensusManager.java | 2 + .../iotdb/confignode/manager/node/NodeManager.java | 4 + .../apache/iotdb/consensus/config/RatisConfig.java | 22 ++- .../apache/iotdb/consensus/ratis/RatisClient.java | 31 ++-- .../iotdb/consensus/ratis/RatisConsensus.java | 2 +- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 29 ++++ .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 11 ++ .../db/consensus/DataRegionConsensusImpl.java | 2 + .../db/consensus/SchemaRegionConsensusImpl.java | 3 + .../impl/DataNodeInternalRPCServiceImpl.java | 13 ++ .../conf/iotdb-system.properties.template | 11 ++ .../src/main/thrift/confignode.thrift | 6 + 23 files changed, 679 insertions(+), 33 deletions(-) diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java index 3bba46f798c..8cece160c13 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java @@ -387,6 +387,27 @@ public class MppCommonConfig extends MppBaseConfig implements CommonConfig { return this; } + @Override + public CommonConfig setConfigNodeRatisReconfigurationMaxRetryAttempts(int maxRetryAttempts) { + setProperty( + "config_node_ratis_reconfiguration_max_retry_attempts", String.valueOf(maxRetryAttempts)); + return this; + } + + @Override + public CommonConfig setSchemaRegionRatisReconfigurationMaxRetryAttempts(int maxRetryAttempts) { + setProperty( + "schema_region_ratis_reconfiguration_max_retry_attempts", String.valueOf(maxRetryAttempts)); + return this; + } + + @Override + public CommonConfig setDataRegionRatisReconfigurationMaxRetryAttempts(int maxRetryAttempts) { + setProperty( + "data_region_ratis_reconfiguration_max_retry_attempts", String.valueOf(maxRetryAttempts)); + return this; + } + @Override public CommonConfig setSeriesSlotNum(int seriesSlotNum) { setProperty("series_slot_num", String.valueOf(seriesSlotNum)); diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java index 582c9a049e4..8720e18ab22 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java @@ -389,6 +389,27 @@ public class MppSharedCommonConfig implements CommonConfig { return this; } + @Override + public CommonConfig setConfigNodeRatisReconfigurationMaxRetryAttempts(int maxRetryAttempts) { + cnConfig.setConfigNodeRatisReconfigurationMaxRetryAttempts(maxRetryAttempts); + dnConfig.setConfigNodeRatisReconfigurationMaxRetryAttempts(maxRetryAttempts); + return this; + } + + @Override + public CommonConfig setSchemaRegionRatisReconfigurationMaxRetryAttempts(int maxRetryAttempts) { + cnConfig.setSchemaRegionRatisReconfigurationMaxRetryAttempts(maxRetryAttempts); + dnConfig.setSchemaRegionRatisReconfigurationMaxRetryAttempts(maxRetryAttempts); + return this; + } + + @Override + public CommonConfig setDataRegionRatisReconfigurationMaxRetryAttempts(int maxRetryAttempts) { + cnConfig.setDataRegionRatisReconfigurationMaxRetryAttempts(maxRetryAttempts); + dnConfig.setDataRegionRatisReconfigurationMaxRetryAttempts(maxRetryAttempts); + return this; + } + @Override public CommonConfig setSeriesSlotNum(int seriesSlotNum) { cnConfig.setSeriesSlotNum(seriesSlotNum); diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java index 48c157e957b..4db030e607b 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java @@ -274,6 +274,21 @@ public class RemoteCommonConfig implements CommonConfig { return this; } + @Override + public CommonConfig setConfigNodeRatisReconfigurationMaxRetryAttempts(int maxRetryAttempts) { + return this; + } + + @Override + public CommonConfig setSchemaRegionRatisReconfigurationMaxRetryAttempts(int maxRetryAttempts) { + return this; + } + + @Override + public CommonConfig setDataRegionRatisReconfigurationMaxRetryAttempts(int maxRetryAttempts) { + return this; + } + @Override public CommonConfig setSeriesSlotNum(int seriesSlotNum) { return this; diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java index dc21234e2ba..5c1b1350adc 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java @@ -124,6 +124,12 @@ public interface CommonConfig { CommonConfig setDataRatisTriggerSnapshotThreshold(long threshold); + CommonConfig setConfigNodeRatisReconfigurationMaxRetryAttempts(int maxRetryAttempts); + + CommonConfig setSchemaRegionRatisReconfigurationMaxRetryAttempts(int maxRetryAttempts); + + CommonConfig setDataRegionRatisReconfigurationMaxRetryAttempts(int maxRetryAttempts); + CommonConfig setSeriesSlotNum(int seriesSlotNum); CommonConfig setDataPartitionAllocationStrategy(String dataPartitionAllocationStrategy); diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionMigrateITFrameworkForRatis.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionMigrateITFrameworkForRatis.java new file mode 100644 index 00000000000..3b08f555d90 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionMigrateITFrameworkForRatis.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.it.regionmigration; + +import org.apache.iotdb.consensus.ConsensusFactory; +import org.apache.iotdb.it.env.EnvFactory; + +import org.junit.Before; + +public class IoTDBRegionMigrateITFrameworkForRatis + extends IoTDBRegionOperationReliabilityITFramework { + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + EnvFactory.getEnv() + .getConfig() + .getCommonConfig() + .setDataRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) + .setConfigNodeRatisReconfigurationMaxRetryAttempts(10) + .setDataRegionRatisReconfigurationMaxRetryAttempts(10) + .setSchemaRegionRatisReconfigurationMaxRetryAttempts(10); + } +} diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java index 58393ecc61b..9b9c4ad41a1 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java @@ -176,6 +176,28 @@ public class IoTDBRegionOperationReliabilityITFramework { killNode); } + public void failAndRollbackTest( + final int dataReplicateFactor, + final int schemaReplicationFactor, + final int configNodeNum, + final int dataNodeNum, + KeySetView<String, Boolean> killConfigNodeKeywords, + KeySetView<String, Boolean> killDataNodeKeywords, + KillNode killNode) + throws Exception { + generalTestWithAllOptions( + dataReplicateFactor, + schemaReplicationFactor, + configNodeNum, + dataNodeNum, + killConfigNodeKeywords, + killDataNodeKeywords, + actionOfKillNode, + false, + killNode, + true); + } + public void killClusterTest( KeySetView<String, Boolean> configNodeKeywords, boolean expectMigrateSuccess) throws Exception { @@ -204,6 +226,31 @@ public class IoTDBRegionOperationReliabilityITFramework { final boolean expectMigrateSuccess, KillNode killNode) throws Exception { + generalTestWithAllOptions( + dataReplicateFactor, + schemaReplicationFactor, + configNodeNum, + dataNodeNum, + configNodeKeywords, + dataNodeKeywords, + actionWhenDetectKeyWords, + expectMigrateSuccess, + killNode, + false); + } + + private void generalTestWithAllOptions( + final int dataReplicateFactor, + final int schemaReplicationFactor, + final int configNodeNum, + final int dataNodeNum, + KeySetView<String, Boolean> configNodeKeywords, + KeySetView<String, Boolean> dataNodeKeywords, + Consumer<KillPointContext> actionWhenDetectKeyWords, + final boolean expectMigrateSuccess, + KillNode killNode, + boolean expectRollbackWhenFail) + throws Exception { // prepare env EnvFactory.getEnv() .getConfig() @@ -266,26 +313,42 @@ public class IoTDBRegionOperationReliabilityITFramework { statement.execute(buildRegionMigrateCommand(selectedRegion, originalDataNode, destDataNode)); boolean success = false; - Predicate<TShowRegionResp> migrateRegionPredicate = - tShowRegionResp -> { - Map<Integer, Set<Integer>> newRegionMap = - getRegionMap(tShowRegionResp.getRegionInfoList()); - Set<Integer> dataNodes = newRegionMap.get(selectedRegion); - return !dataNodes.contains(originalDataNode) && dataNodes.contains(destDataNode); - }; - try { - awaitUntilSuccess( - client, - selectedRegion, - migrateRegionPredicate, - Optional.of(destDataNode), - Optional.of(originalDataNode)); - success = true; - } catch (ConditionTimeoutException e) { - if (expectMigrateSuccess) { - LOGGER.error("Region migrate failed", e); + if (expectRollbackWhenFail) { + awaitKillPointsTriggered(configNodeKeywords); + awaitKillPointsTriggered(dataNodeKeywords); + try { + awaitUntilSuccess( + client, + selectedRegion, + rollbackPredicate(selectedRegion, regionMap.get(selectedRegion), destDataNode), + Optional.empty(), + Optional.of(destDataNode)); + } catch (ConditionTimeoutException e) { + LOGGER.error("Region migrate did not roll back", e); Assert.fail(); } + } else { + Predicate<TShowRegionResp> migrateRegionPredicate = + tShowRegionResp -> { + Map<Integer, Set<Integer>> newRegionMap = + getRegionMap(tShowRegionResp.getRegionInfoList()); + Set<Integer> dataNodes = newRegionMap.get(selectedRegion); + return !dataNodes.contains(originalDataNode) && dataNodes.contains(destDataNode); + }; + try { + awaitUntilSuccess( + client, + selectedRegion, + migrateRegionPredicate, + Optional.of(destDataNode), + Optional.of(originalDataNode)); + success = true; + } catch (ConditionTimeoutException e) { + if (expectMigrateSuccess) { + LOGGER.error("Region migrate failed", e); + Assert.fail(); + } + } } if (!expectMigrateSuccess && success) { LOGGER.error("Region migrate succeeded unexpectedly"); @@ -311,6 +374,28 @@ public class IoTDBRegionOperationReliabilityITFramework { } } + private static Predicate<TShowRegionResp> rollbackPredicate( + int selectedRegion, Set<Integer> originalDataNodes, int destDataNode) { + return tShowRegionResp -> { + List<TRegionInfo> selectedDataRegionInfos = + tShowRegionResp.getRegionInfoList().stream() + .filter( + regionInfo -> + regionInfo.getConsensusGroupId().getType() == TConsensusGroupType.DataRegion + && regionInfo.getConsensusGroupId().getId() == selectedRegion) + .collect(Collectors.toList()); + Set<Integer> dataNodes = + selectedDataRegionInfos.stream() + .map(TRegionInfo::getDataNodeId) + .collect(Collectors.toSet()); + return dataNodes.equals(originalDataNodes) + && !dataNodes.contains(destDataNode) + && selectedDataRegionInfos.stream() + .allMatch( + regionInfo -> RegionStatus.Running.getStatus().equals(regionInfo.getStatus())); + }; + } + public static Set<Integer> getAllDataNodes(Statement statement) throws Exception { ResultSet result = statement.executeQuery(SHOW_DATANODES); Set<Integer> allDataNodeId = new HashSet<>(); @@ -428,6 +513,13 @@ public class IoTDBRegionOperationReliabilityITFramework { } } + private static void awaitKillPointsTriggered(KeySetView<String, Boolean> killPoints) { + if (killPoints.isEmpty()) { + return; + } + Awaitility.await().atMost(2, TimeUnit.MINUTES).until(killPoints::isEmpty); + } + private static String buildRegionMigrateCommand(int who, int from, int to) { String result = String.format(REGION_MIGRATE_COMMAND_FORMAT, who, from, to); LOGGER.info(result); diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/datanodecrash/ratis/IoTDBRegionMigrateAddingPeerCrashForRatisIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/datanodecrash/ratis/IoTDBRegionMigrateAddingPeerCrashForRatisIT.java new file mode 100644 index 00000000000..4561abb309b --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/datanodecrash/ratis/IoTDBRegionMigrateAddingPeerCrashForRatisIT.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.it.regionmigration.pass.daily.datanodecrash.ratis; + +import org.apache.iotdb.commons.utils.KillPoint.DataNodeKillPoints; +import org.apache.iotdb.commons.utils.KillPoint.KillNode; +import org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionMigrateITFrameworkForRatis; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.DailyIT; + +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +@Category({DailyIT.class}) +@RunWith(IoTDBTestRunner.class) +public class IoTDBRegionMigrateAddingPeerCrashForRatisIT + extends IoTDBRegionMigrateITFrameworkForRatis { + + @Test + public void addingPeerCrashShouldFailAndRollback() throws Exception { + failAndRollbackTest( + 2, + 2, + 1, + 3, + noKillPoints(), + buildSet(DataNodeKillPoints.DESTINATION_CREATE_LOCAL_PEER), + KillNode.DESTINATION_DATANODE); + } +} diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/ratis/IoTDBRegionMigrateClusterCrashForRatisIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/ratis/IoTDBRegionMigrateClusterCrashForRatisIT.java new file mode 100644 index 00000000000..ed6e51c0d93 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/ratis/IoTDBRegionMigrateClusterCrashForRatisIT.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.it.regionmigration.pass.daily.ratis; + +import org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionMigrateITFrameworkForRatis; +import org.apache.iotdb.confignode.procedure.state.AddRegionPeerState; +import org.apache.iotdb.confignode.procedure.state.RemoveRegionPeerState; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.DailyIT; + +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +@Category({DailyIT.class}) +@RunWith(IoTDBTestRunner.class) +public class IoTDBRegionMigrateClusterCrashForRatisIT + extends IoTDBRegionMigrateITFrameworkForRatis { + + @Test + public void clusterCrashDuringCreateNewRegionPeer() throws Exception { + killClusterTest(buildSet(AddRegionPeerState.CREATE_NEW_REGION_PEER), true); + } + + @Test + public void clusterCrashDuringCreateConsensusPipes() throws Exception { + killClusterTest(buildSet(AddRegionPeerState.CREATE_CONSENSUS_PIPES), true); + } + + @Test + public void clusterCrashDuringDoAddRegionPeer() throws Exception { + killClusterTest(buildSet(AddRegionPeerState.DO_ADD_REGION_PEER), false); + } + + @Test + public void clusterCrashDuringUpdateRegionLocationCache() throws Exception { + killClusterTest(buildSet(AddRegionPeerState.UPDATE_REGION_LOCATION_CACHE), true); + } + + @Test + public void clusterCrashDuringTransferRegionLeader() throws Exception { + killClusterTest(buildSet(RemoveRegionPeerState.TRANSFER_REGION_LEADER), true); + } + + @Test + public void clusterCrashDuringRemoveRegionPeer() throws Exception { + killClusterTest(buildSet(RemoveRegionPeerState.REMOVE_REGION_PEER), true); + } + + @Test + public void clusterCrashDuringDropConsensusPipes() throws Exception { + killClusterTest(buildSet(RemoveRegionPeerState.DROP_CONSENSUS_PIPES), true); + } + + @Test + public void clusterCrashDuringRemoveRegionLocationCache() throws Exception { + killClusterTest(buildSet(RemoveRegionPeerState.REMOVE_REGION_LOCATION_CACHE), true); + } +} diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/ratis/IoTDBRegionMigrateConfigNodeCrashForRatisIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/ratis/IoTDBRegionMigrateConfigNodeCrashForRatisIT.java new file mode 100644 index 00000000000..67b318691a9 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/ratis/IoTDBRegionMigrateConfigNodeCrashForRatisIT.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.it.regionmigration.pass.daily.ratis; + +import org.apache.iotdb.commons.utils.KillPoint.KillNode; +import org.apache.iotdb.commons.utils.KillPoint.KillPoint; +import org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionMigrateITFrameworkForRatis; +import org.apache.iotdb.confignode.procedure.state.AddRegionPeerState; +import org.apache.iotdb.confignode.procedure.state.RemoveRegionPeerState; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.DailyIT; + +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.util.Arrays; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +@Category({DailyIT.class}) +@RunWith(IoTDBTestRunner.class) +public class IoTDBRegionMigrateConfigNodeCrashForRatisIT + extends IoTDBRegionMigrateITFrameworkForRatis { + + @Test + public void cnCrashDuringCreatePeerTest() throws Exception { + successTest( + 1, + 1, + 1, + 2, + buildSet(AddRegionPeerState.CREATE_NEW_REGION_PEER), + noKillPoints(), + KillNode.CONFIG_NODE); + } + + @Test + public void cnCrashDuringCreateConsensusPipesTest() throws Exception { + successTest( + 1, + 1, + 1, + 2, + buildSet(AddRegionPeerState.CREATE_CONSENSUS_PIPES), + noKillPoints(), + KillNode.CONFIG_NODE); + } + + @Test + public void cnCrashDuringDoAddPeerTest() throws Exception { + successTest( + 1, + 1, + 1, + 2, + buildSet(AddRegionPeerState.DO_ADD_REGION_PEER), + noKillPoints(), + KillNode.CONFIG_NODE); + } + + @Test + public void cnCrashDuringUpdateCacheTest() throws Exception { + successTest( + 1, + 1, + 1, + 2, + buildSet(AddRegionPeerState.UPDATE_REGION_LOCATION_CACHE), + noKillPoints(), + KillNode.CONFIG_NODE); + } + + @Test + public void cnCrashDuringChangeRegionLeaderTest() throws Exception { + successTest( + 1, + 1, + 1, + 2, + buildSet(RemoveRegionPeerState.TRANSFER_REGION_LEADER), + noKillPoints(), + KillNode.CONFIG_NODE); + } + + @Test + public void cnCrashDuringRemoveRegionPeerTest() throws Exception { + successTest( + 1, + 1, + 1, + 2, + buildSet(RemoveRegionPeerState.REMOVE_REGION_PEER), + noKillPoints(), + KillNode.CONFIG_NODE); + } + + @Test + public void cnCrashDuringDeleteOldRegionPeerTest() throws Exception { + successTest( + 1, + 1, + 1, + 2, + buildSet(RemoveRegionPeerState.DELETE_OLD_REGION_PEER), + noKillPoints(), + KillNode.CONFIG_NODE); + } + + @Test + public void cnCrashDuringDropConsensusPipesTest() throws Exception { + successTest( + 1, + 1, + 1, + 2, + buildSet(RemoveRegionPeerState.DROP_CONSENSUS_PIPES), + noKillPoints(), + KillNode.CONFIG_NODE); + } + + @Test + public void cnCrashDuringRemoveRegionLocationCacheTest() throws Exception { + successTest( + 1, + 1, + 1, + 2, + buildSet(RemoveRegionPeerState.REMOVE_REGION_LOCATION_CACHE), + noKillPoints(), + KillNode.CONFIG_NODE); + } + + @Test + public void cnCrashTest() throws Exception { + ConcurrentHashMap.KeySetView<String, Boolean> killConfigNodeKeywords = noKillPoints(); + killConfigNodeKeywords.addAll( + Arrays.stream(AddRegionPeerState.values()) + .map(KillPoint::enumToString) + .collect(Collectors.toList())); + killConfigNodeKeywords.addAll( + Arrays.stream(RemoveRegionPeerState.values()) + .map(KillPoint::enumToString) + .collect(Collectors.toList())); + successTest(1, 1, 1, 2, killConfigNodeKeywords, noKillPoints(), KillNode.CONFIG_NODE); + } +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java index 1c0555affe3..051a7fc31f7 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java @@ -318,6 +318,16 @@ public class ConfigNodeConfig { private long schemaRegionRatisInitialSleepTimeMs = 100; private long schemaRegionRatisMaxSleepTimeMs = 10000; + /** + * RatisConsensus protocol, max retry attempts for a configuration change (add/remove peer). Uses + * a fixed 2s retry interval; bounding the attempts stops a killed ADDING peer from blocking the + * reconfiguration -- and hence a region migration -- forever. + */ + private int configNodeRatisReconfigurationMaxRetryAttempts = 15; + + private int dataRegionRatisReconfigurationMaxRetryAttempts = 15; + private int schemaRegionRatisReconfigurationMaxRetryAttempts = 15; + private long configNodeRatisPreserveLogsWhenPurge = 1000; private long schemaRegionRatisPreserveLogsWhenPurge = 1000; private long dataRegionRatisPreserveLogsWhenPurge = 1000; @@ -1117,6 +1127,36 @@ public class ConfigNodeConfig { this.schemaRegionRatisMaxRetryAttempts = schemaRegionRatisMaxRetryAttempts; } + public int getConfigNodeRatisReconfigurationMaxRetryAttempts() { + return configNodeRatisReconfigurationMaxRetryAttempts; + } + + public void setConfigNodeRatisReconfigurationMaxRetryAttempts( + int configNodeRatisReconfigurationMaxRetryAttempts) { + this.configNodeRatisReconfigurationMaxRetryAttempts = + configNodeRatisReconfigurationMaxRetryAttempts; + } + + public int getDataRegionRatisReconfigurationMaxRetryAttempts() { + return dataRegionRatisReconfigurationMaxRetryAttempts; + } + + public void setDataRegionRatisReconfigurationMaxRetryAttempts( + int dataRegionRatisReconfigurationMaxRetryAttempts) { + this.dataRegionRatisReconfigurationMaxRetryAttempts = + dataRegionRatisReconfigurationMaxRetryAttempts; + } + + public int getSchemaRegionRatisReconfigurationMaxRetryAttempts() { + return schemaRegionRatisReconfigurationMaxRetryAttempts; + } + + public void setSchemaRegionRatisReconfigurationMaxRetryAttempts( + int schemaRegionRatisReconfigurationMaxRetryAttempts) { + this.schemaRegionRatisReconfigurationMaxRetryAttempts = + schemaRegionRatisReconfigurationMaxRetryAttempts; + } + public long getSchemaRegionRatisInitialSleepTimeMs() { return schemaRegionRatisInitialSleepTimeMs; } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java index b6bf74edb31..7321431a12f 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java @@ -629,6 +629,11 @@ public class ConfigNodeDescriptor { properties.getProperty( "config_node_ratis_max_retry_attempts", String.valueOf(conf.getConfigNodeRatisMaxRetryAttempts())))); + conf.setConfigNodeRatisReconfigurationMaxRetryAttempts( + Integer.parseInt( + properties.getProperty( + "config_node_ratis_reconfiguration_max_retry_attempts", + String.valueOf(conf.getConfigNodeRatisReconfigurationMaxRetryAttempts())))); conf.setConfigNodeRatisInitialSleepTimeMs( Long.parseLong( properties.getProperty( @@ -645,6 +650,11 @@ public class ConfigNodeDescriptor { properties.getProperty( "data_region_ratis_max_retry_attempts", String.valueOf(conf.getDataRegionRatisMaxRetryAttempts())))); + conf.setDataRegionRatisReconfigurationMaxRetryAttempts( + Integer.parseInt( + properties.getProperty( + "data_region_ratis_reconfiguration_max_retry_attempts", + String.valueOf(conf.getDataRegionRatisReconfigurationMaxRetryAttempts())))); conf.setDataRegionRatisInitialSleepTimeMs( Long.parseLong( properties.getProperty( @@ -661,6 +671,11 @@ public class ConfigNodeDescriptor { properties.getProperty( "schema_region_ratis_max_retry_attempts", String.valueOf(conf.getSchemaRegionRatisMaxRetryAttempts())))); + conf.setSchemaRegionRatisReconfigurationMaxRetryAttempts( + Integer.parseInt( + properties.getProperty( + "schema_region_ratis_reconfiguration_max_retry_attempts", + String.valueOf(conf.getSchemaRegionRatisReconfigurationMaxRetryAttempts())))); conf.setSchemaRegionRatisInitialSleepTimeMs( Long.parseLong( properties.getProperty( diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java index d7024c357b1..edc952e499b 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java @@ -236,6 +236,8 @@ public class ConsensusManager { .setClientRetryMaxSleepTimeMs( CONF.getConfigNodeRatisMaxSleepTimeMs()) .setMaxClientNumForEachNode(CONF.getMaxClientNumForEachNode()) + .setReconfigurationMaxRetryAttempts( + CONF.getConfigNodeRatisReconfigurationMaxRetryAttempts()) .build()) .setImpl( RatisConfig.Impl.newBuilder() diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java index 928db046980..4479fb77711 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java @@ -233,6 +233,10 @@ public class NodeManager { ratisConfig.setSchemaMaxRetryAttempts(conf.getSchemaRegionRatisMaxRetryAttempts()); ratisConfig.setSchemaInitialSleepTime(conf.getSchemaRegionRatisInitialSleepTimeMs()); ratisConfig.setSchemaMaxSleepTime(conf.getSchemaRegionRatisMaxSleepTimeMs()); + ratisConfig.setDataReconfigurationMaxRetryAttempts( + conf.getDataRegionRatisReconfigurationMaxRetryAttempts()); + ratisConfig.setSchemaReconfigurationMaxRetryAttempts( + conf.getSchemaRegionRatisReconfigurationMaxRetryAttempts()); ratisConfig.setSchemaPreserveWhenPurge(conf.getSchemaRegionRatisPreserveLogsWhenPurge()); ratisConfig.setDataPreserveWhenPurge(conf.getDataRegionRatisPreserveLogsWhenPurge()); diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java index ddbfac62111..4b22764b7f4 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java @@ -865,18 +865,21 @@ public class RatisConfig { private final long clientRetryInitialSleepTimeMs; private final long clientRetryMaxSleepTimeMs; private final int maxClientNumForEachNode; + private final int reconfigurationMaxRetryAttempts; public Client( long clientRequestTimeoutMillis, int clientMaxRetryAttempt, long clientRetryInitialSleepTimeMs, long clientRetryMaxSleepTimeMs, - int maxClientNumForEachNode) { + int maxClientNumForEachNode, + int reconfigurationMaxRetryAttempts) { this.clientRequestTimeoutMillis = clientRequestTimeoutMillis; this.clientMaxRetryAttempt = clientMaxRetryAttempt; this.clientRetryInitialSleepTimeMs = clientRetryInitialSleepTimeMs; this.clientRetryMaxSleepTimeMs = clientRetryMaxSleepTimeMs; this.maxClientNumForEachNode = maxClientNumForEachNode; + this.reconfigurationMaxRetryAttempts = reconfigurationMaxRetryAttempts; } public long getClientRequestTimeoutMillis() { @@ -899,6 +902,10 @@ public class RatisConfig { return maxClientNumForEachNode; } + public int getReconfigurationMaxRetryAttempts() { + return reconfigurationMaxRetryAttempts; + } + public static Client.Builder newBuilder() { return new Builder(); } @@ -910,6 +917,11 @@ public class RatisConfig { private long clientRetryInitialSleepTimeMs = 100; private long clientRetryMaxSleepTimeMs = 10000; private int maxClientNumForEachNode = DefaultProperty.MAX_CLIENT_NUM_FOR_EACH_NODE; + // A Ratis configuration change (add/remove peer) retries the "in progress / not ready" + // failures with a fixed 2s interval. Bounding the number of attempts (instead of retrying + // forever) prevents a killed ADDING peer that can never catch up from blocking the + // reconfiguration -- and hence the region migration -- indefinitely. 15 attempts ~= 30s. + private int reconfigurationMaxRetryAttempts = 15; public Client build() { return new Client( @@ -917,7 +929,8 @@ public class RatisConfig { clientMaxRetryAttempt, clientRetryInitialSleepTimeMs, clientRetryMaxSleepTimeMs, - maxClientNumForEachNode); + maxClientNumForEachNode, + reconfigurationMaxRetryAttempts); } public Builder setClientRequestTimeoutMillis(long clientRequestTimeoutMillis) { @@ -944,6 +957,11 @@ public class RatisConfig { this.maxClientNumForEachNode = maxClientNumForEachNode; return this; } + + public Builder setReconfigurationMaxRetryAttempts(int reconfigurationMaxRetryAttempts) { + this.reconfigurationMaxRetryAttempts = reconfigurationMaxRetryAttempts; + return this; + } } } diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisClient.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisClient.java index a1adf727129..41ae201d1cf 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisClient.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisClient.java @@ -132,14 +132,14 @@ class RatisClient implements AutoCloseable { } } - static class EndlessRetryFactory extends BaseClientFactory<RaftGroup, RatisClient> { + static class ReconfigurationRetryFactory extends BaseClientFactory<RaftGroup, RatisClient> { private final RaftProperties raftProperties; private final RaftClientRpc clientRpc; private final RatisConfig.Client config; private final Parameters parameters; - public EndlessRetryFactory( + public ReconfigurationRetryFactory( ClientManager<RaftGroup, RatisClient> clientManager, RaftProperties raftProperties, RaftClientRpc clientRpc, @@ -165,7 +165,7 @@ class RatisClient implements AutoCloseable { RaftClient.newBuilder() .setProperties(raftProperties) .setRaftGroup(group) - .setRetryPolicy(new RatisEndlessRetryPolicy(config)) + .setRetryPolicy(new RatisReconfigurationRetryPolicy(config)) .setParameters(parameters) .setClientRpc(clientRpc) .build(), @@ -226,16 +226,23 @@ class RatisClient implements AutoCloseable { } /** This policy is used to raft configuration change */ - private static class RatisEndlessRetryPolicy implements RetryPolicy { - - private static final Logger logger = LoggerFactory.getLogger(RatisEndlessRetryPolicy.class); - // for reconfiguration request, we use different retry policy - private final RetryPolicy endlessPolicy; + private static class RatisReconfigurationRetryPolicy implements RetryPolicy { + + private static final Logger logger = + LoggerFactory.getLogger(RatisReconfigurationRetryPolicy.class); + // For a reconfiguration request we retry the "in progress / not ready" failures with a fixed + // 2s interval, but only up to a bounded number of attempts. An unbounded retry (the previous + // behavior) would block the setConfiguration call forever when a newly ADDING peer is killed + // and can never catch up, leaving the region migration permanently stuck. After the bound is + // exhausted the last failure is propagated, so the upper layer can fail and roll back. + private final RetryPolicy reconfigurationPolicy; private final RetryPolicy defaultPolicy; - RatisEndlessRetryPolicy(RatisConfig.Client config) { - endlessPolicy = - RetryPolicies.retryForeverWithSleep(TimeDuration.valueOf(2, TimeUnit.SECONDS)); + RatisReconfigurationRetryPolicy(RatisConfig.Client config) { + reconfigurationPolicy = + RetryPolicies.retryUpToMaximumCountWithFixedSleep( + config.getReconfigurationMaxRetryAttempts(), + TimeDuration.valueOf(2, TimeUnit.SECONDS)); defaultPolicy = new RatisRetryPolicy(config); } @@ -248,7 +255,7 @@ class RatisClient implements AutoCloseable { || cause instanceof LeaderSteppingDownException || cause instanceof ServerNotReadyException || cause instanceof NotLeaderException) { - return endlessPolicy.handleAttemptFailure(event); + return reconfigurationPolicy.handleAttemptFailure(event); } return defaultPolicy.handleAttemptFailure(event); diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java index 372b45b8afc..558b57ebc3d 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java @@ -1023,7 +1023,7 @@ class RatisConsensus implements IConsensus { GenericKeyedObjectPool<RaftGroup, RatisClient> clientPool = new GenericKeyedObjectPool<>( isReconfiguration - ? new RatisClient.EndlessRetryFactory( + ? new RatisClient.ReconfigurationRetryFactory( manager, properties, clientRpc, config.getClient(), parameters) : new RatisClient.Factory( manager, properties, clientRpc, config.getClient(), parameters), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index fec40a15150..e45085bd4be 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -1062,6 +1062,15 @@ public class IoTDBConfig { private int dataRatisConsensusMaxRetryAttempts = 10; private int schemaRatisConsensusMaxRetryAttempts = 10; + + /** + * RatisConsensus protocol, max retry attempts for a configuration change (add/remove peer). Uses + * a fixed 2s retry interval; bounding the attempts stops a killed ADDING peer from blocking the + * reconfiguration -- and hence a region migration -- forever. Pushed from the ConfigNode. + */ + private int dataRatisConsensusReconfigurationMaxRetryAttempts = 15; + + private int schemaRatisConsensusReconfigurationMaxRetryAttempts = 15; private long dataRatisConsensusInitialSleepTimeMs = 100L; private long schemaRatisConsensusInitialSleepTimeMs = 100L; private long dataRatisConsensusMaxSleepTimeMs = 10000L; @@ -3832,6 +3841,26 @@ public class IoTDBConfig { this.schemaRatisConsensusMaxRetryAttempts = schemaRatisConsensusMaxRetryAttempts; } + public int getDataRatisConsensusReconfigurationMaxRetryAttempts() { + return dataRatisConsensusReconfigurationMaxRetryAttempts; + } + + public void setDataRatisConsensusReconfigurationMaxRetryAttempts( + int dataRatisConsensusReconfigurationMaxRetryAttempts) { + this.dataRatisConsensusReconfigurationMaxRetryAttempts = + dataRatisConsensusReconfigurationMaxRetryAttempts; + } + + public int getSchemaRatisConsensusReconfigurationMaxRetryAttempts() { + return schemaRatisConsensusReconfigurationMaxRetryAttempts; + } + + public void setSchemaRatisConsensusReconfigurationMaxRetryAttempts( + int schemaRatisConsensusReconfigurationMaxRetryAttempts) { + this.schemaRatisConsensusReconfigurationMaxRetryAttempts = + schemaRatisConsensusReconfigurationMaxRetryAttempts; + } + public long getDataRatisConsensusInitialSleepTimeMs() { return dataRatisConsensusInitialSleepTimeMs; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 8cba13b2265..b6237d435a1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -2965,6 +2965,17 @@ public class IoTDBDescriptor { conf.setSchemaRatisConsensusInitialSleepTimeMs(ratisConfig.getSchemaInitialSleepTime()); conf.setSchemaRatisConsensusMaxSleepTimeMs(ratisConfig.getSchemaMaxSleepTime()); + // Optional fields: an old ConfigNode (rolling upgrade) will not set them, in which case the + // DataNode keeps its local default instead of overwriting it with 0. + if (ratisConfig.isSetDataReconfigurationMaxRetryAttempts()) { + conf.setDataRatisConsensusReconfigurationMaxRetryAttempts( + ratisConfig.getDataReconfigurationMaxRetryAttempts()); + } + if (ratisConfig.isSetSchemaReconfigurationMaxRetryAttempts()) { + conf.setSchemaRatisConsensusReconfigurationMaxRetryAttempts( + ratisConfig.getSchemaReconfigurationMaxRetryAttempts()); + } + conf.setDataRatisConsensusPreserveWhenPurge(ratisConfig.getDataPreserveWhenPurge()); conf.setSchemaRatisConsensusPreserveWhenPurge(ratisConfig.getSchemaPreserveWhenPurge()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java index 6fc3bbdb596..e2ddd74bbe9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java @@ -273,6 +273,8 @@ public class DataRegionConsensusImpl { CONF.getDataRatisConsensusInitialSleepTimeMs()) .setClientRetryMaxSleepTimeMs(CONF.getDataRatisConsensusMaxSleepTimeMs()) .setMaxClientNumForEachNode(CONF.getMaxClientNumForEachNode()) + .setReconfigurationMaxRetryAttempts( + CONF.getDataRatisConsensusReconfigurationMaxRetryAttempts()) .build()) .setImpl( RatisConfig.Impl.newBuilder() diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java index e5b9fbe15d0..f6a1175da19 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java @@ -155,6 +155,9 @@ public class SchemaRegionConsensusImpl { .setClientRetryMaxSleepTimeMs( CONF.getDataRatisConsensusMaxSleepTimeMs()) .setMaxClientNumForEachNode(CONF.getMaxClientNumForEachNode()) + .setReconfigurationMaxRetryAttempts( + CONF + .getSchemaRatisConsensusReconfigurationMaxRetryAttempts()) .build()) .setImpl( RatisConfig.Impl.newBuilder() diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index f7f2f13b14c..922fc3bbedd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -99,6 +99,8 @@ import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerGroupMeta; import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta; import org.apache.iotdb.commons.trigger.TriggerInformation; import org.apache.iotdb.commons.udf.UDFInformation; +import org.apache.iotdb.commons.utils.KillPoint.DataNodeKillPoints; +import org.apache.iotdb.commons.utils.KillPoint.KillPoint; import org.apache.iotdb.commons.utils.PathUtils; import org.apache.iotdb.commons.utils.SerializeUtils; import org.apache.iotdb.commons.utils.StatusUtils; @@ -3166,10 +3168,21 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface REGION_MIGRATE_PROCESS, peers, regionId); + if (isRatisConsensusRegion(regionId)) { + KillPoint.setKillPoint(DataNodeKillPoints.DESTINATION_CREATE_LOCAL_PEER); + } status.setMessage(DataNodeMiscMessages.CREATE_NEW_REGION_PEER_SUCCEED_REGION_ID + regionId); return status; } + private boolean isRatisConsensusRegion(ConsensusGroupId regionId) { + return regionId instanceof DataRegionId + ? ConsensusFactory.RATIS_CONSENSUS.equals( + IoTDBDescriptor.getInstance().getConfig().getDataRegionConsensusProtocolClass()) + : ConsensusFactory.RATIS_CONSENSUS.equals( + IoTDBDescriptor.getInstance().getConfig().getSchemaRegionConsensusProtocolClass()); + } + @Override public TSStatus cleanDataNodeCache(TCleanDataNodeCacheReq req) { LOGGER.info(DataNodeMiscMessages.START_DISABLE_DATA_NODE, req); diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template index 3b2d5ff5473..e13706268ae 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template @@ -2047,6 +2047,17 @@ data_region_ratis_max_retry_attempts=10 data_region_ratis_initial_sleep_time_ms=100 data_region_ratis_max_sleep_time_ms=10000 +# Max retry attempts for a Ratis configuration change (add/remove peer), e.g. during region +# migration or cluster scale-in/out. Unlike the request retry policy above, reconfiguration retries +# use a fixed 2s interval, so this roughly caps the wait at (attempts * 2s). Bounding it (instead of +# retrying forever) prevents a killed ADDING peer that can never catch up from blocking the +# reconfiguration -- and therefore the whole region migration -- indefinitely. +# effectiveMode: restart +# Datatype: int +config_node_ratis_reconfiguration_max_retry_attempts=15 +schema_region_ratis_reconfiguration_max_retry_attempts=15 +data_region_ratis_reconfiguration_max_retry_attempts=15 + # first election timeout # effectiveMode: restart # Datatype: int diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift index 22529ffbb73..ba62bbdf282 100644 --- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift +++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift @@ -104,6 +104,12 @@ struct TRatisConfig { 34: required i64 dataRegionPeriodicSnapshotInterval 35: required i32 ratisTransferLeaderTimeoutMs; + + // Bound the retry attempts of a Ratis configuration change (add/remove peer) so a killed ADDING + // peer cannot block the reconfiguration forever. Optional for rolling-upgrade compatibility: an + // old ConfigNode will not set them and the DataNode falls back to its local default. + 36: optional i32 schemaReconfigurationMaxRetryAttempts + 37: optional i32 dataReconfigurationMaxRetryAttempts } struct TCQConfig {
