This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch enhance_symmetric_network_partition_issue_cp in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b6159619ceca0539fb45fbf2bbc13934cc26e875 Author: Potato <[email protected]> AuthorDate: Mon Aug 19 18:08:03 2024 +0800 Avoid the brain split phenomenon in the symmetric network partition scenario #13221 Signed-off-by: OneSizeFitQuorum <[email protected]> --- .../statemachine/ConfigRegionStateMachine.java | 65 ++++++++++++---------- .../org/apache/iotdb/consensus/IStateMachine.java | 5 ++ .../ratis/ApplicationStateMachineProxy.java | 6 ++ .../schemaregion/SchemaRegionStateMachine.java | 29 ++++++---- 4 files changed, 67 insertions(+), 38 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java index b0d48cb6544..3861521f6e9 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java @@ -240,40 +240,48 @@ public class ConfigRegionStateMachine implements IStateMachine, IStateMachine.Ev int currentNodeId = ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(); if (currentNodeId != newLeaderId) { LOGGER.info( - "Current node [nodeId:{}, ip:port: {}] is not longer the leader, " + "Current node [nodeId:{}, ip:port: {}] is no longer the leader, " + "the new leader is [nodeId:{}]", currentNodeId, currentNodeTEndPoint, newLeaderId); + } + } - // Stop leader scheduling services - configManager.getPipeManager().getPipeRuntimeCoordinator().stopPipeMetaSync(); - configManager.getPipeManager().getPipeRuntimeCoordinator().stopPipeHeartbeat(); - configManager - .getSubscriptionManager() - .getSubscriptionCoordinator() - .stopSubscriptionMetaSync(); - configManager.getLoadManager().stopLoadServices(); - configManager.getProcedureManager().stopExecutor(); - configManager.getRetryFailedTasksThread().stopRetryFailedTasksService(); - configManager.getPartitionManager().stopRegionCleaner(); - configManager.getCQManager().stopCQScheduler(); - configManager.getClusterSchemaManager().clearSchemaQuotaCache(); - // Remove Metric after leader change - configManager.removeMetrics(); - - // Shutdown leader related service for config pipe - PipeConfigNodeAgent.runtime().notifyLeaderUnavailable(); - - // Clean receiver file dir - PipeConfigNodeAgent.receiver().cleanPipeReceiverDir(); + @Override + public void notifyNotLeader() { + // We get currentNodeId here because the currentNodeId + // couldn't initialize earlier than the ConfigRegionStateMachine + int currentNodeId = ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(); + LOGGER.info( + "Current node [nodeId:{}, ip:port: {}] is no longer the leader, " + + "start cleaning up related services", + currentNodeId, + currentNodeTEndPoint); + // Stop leader scheduling services + configManager.getPipeManager().getPipeRuntimeCoordinator().stopPipeMetaSync(); + configManager.getPipeManager().getPipeRuntimeCoordinator().stopPipeHeartbeat(); + configManager.getSubscriptionManager().getSubscriptionCoordinator().stopSubscriptionMetaSync(); + configManager.getLoadManager().stopLoadServices(); + configManager.getProcedureManager().stopExecutor(); + configManager.getRetryFailedTasksThread().stopRetryFailedTasksService(); + configManager.getPartitionManager().stopRegionCleaner(); + configManager.getCQManager().stopCQScheduler(); + configManager.getClusterSchemaManager().clearSchemaQuotaCache(); + // Remove Metric after leader change + configManager.removeMetrics(); - LOGGER.info( - "Current node [nodeId:{}, ip:port: {}] is not longer the leader, " - + "all services on old leader are unavailable now.", - currentNodeId, - currentNodeTEndPoint); - } + // Shutdown leader related service for config pipe + PipeConfigNodeAgent.runtime().notifyLeaderUnavailable(); + + // Clean receiver file dir + PipeConfigNodeAgent.receiver().cleanPipeReceiverDir(); + + LOGGER.info( + "Current node [nodeId:{}, ip:port: {}] is no longer the leader, " + + "all services on old leader are unavailable now.", + currentNodeId, + currentNodeTEndPoint); } @Override @@ -482,6 +490,7 @@ public class ConfigRegionStateMachine implements IStateMachine, IStateMachine.Ev } static class FileComparator implements Comparator<String> { + @Override public int compare(String filename1, String filename2) { long id1 = parseEndIndex(filename1); diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java index bad1c4d9b7e..d0473ca1d04 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java @@ -158,6 +158,11 @@ public interface IStateMachine { default void notifyLeaderReady() { // do nothing default } + + /** Notify the {@link IStateMachine} that this server is no longer the leader. */ + default void notifyNotLeader() { + // do nothing default + } } /** diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java index e0e6b542dee..12095189c52 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java @@ -54,6 +54,7 @@ import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.nio.file.StandardCopyOption; +import java.util.Collection; import java.util.concurrent.CompletableFuture; import java.util.function.BiConsumer; @@ -296,6 +297,11 @@ public class ApplicationStateMachineProxy extends BaseStateMachine { applicationStateMachine.event().notifyLeaderReady(); } + @Override + public void notifyNotLeader(Collection<TransactionContext> pendingEntries) throws IOException { + applicationStateMachine.event().notifyNotLeader(); + } + @Override public void notifyConfigurationChanged( long term, long index, RaftConfigurationProto newRaftConfiguration) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaRegionStateMachine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaRegionStateMachine.java index ca2c5f28212..f07cc34e5f6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaRegionStateMachine.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaRegionStateMachine.java @@ -69,24 +69,33 @@ public class SchemaRegionStateMachine extends BaseStateMachine { @Override public void notifyLeaderChanged(ConsensusGroupId groupId, int newLeaderId) { - if (schemaRegion.getSchemaRegionId().equals(groupId) - && newLeaderId != IoTDBDescriptor.getInstance().getConfig().getDataNodeId()) { + if (newLeaderId != IoTDBDescriptor.getInstance().getConfig().getDataNodeId()) { logger.info( "Current node [nodeId: {}] is no longer the schema region leader [regionId: {}], " + "the new leader is [nodeId:{}]", IoTDBDescriptor.getInstance().getConfig().getDataNodeId(), schemaRegion.getSchemaRegionId(), newLeaderId); + } + } - // Shutdown leader related service for schema pipe - PipeDataNodeAgent.runtime().notifySchemaLeaderUnavailable(schemaRegion.getSchemaRegionId()); + @Override + public void notifyNotLeader() { + int dataNodeId = IoTDBDescriptor.getInstance().getConfig().getDataNodeId(); + logger.info( + "Current node [nodeId: {}] is no longer the schema region leader [regionId: {}], " + + "start cleaning up related services.", + dataNodeId, + schemaRegion.getSchemaRegionId()); - logger.info( - "Current node [nodeId: {}] is no longer the schema region leader [regionId: {}], " - + "all services on old leader are unavailable now.", - IoTDBDescriptor.getInstance().getConfig().getDataNodeId(), - schemaRegion.getSchemaRegionId()); - } + // Shutdown leader related service for schema pipe + PipeDataNodeAgent.runtime().notifySchemaLeaderUnavailable(schemaRegion.getSchemaRegionId()); + + logger.info( + "Current node [nodeId: {}] is no longer the schema region leader [regionId: {}], " + + "all services on old leader are unavailable now.", + dataNodeId, + schemaRegion.getSchemaRegionId()); } @Override
