This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
     new b27f75295ed Avoid the brain split phenomenon in the symmetric network 
partition scenario #13221 (#13226)
b27f75295ed is described below

commit b27f75295ed66f6387c11f5b46ae3de22b764e74
Author: Potato <[email protected]>
AuthorDate: Mon Aug 19 20:36:17 2024 +0800

    Avoid the brain split phenomenon in the symmetric network partition 
scenario #13221 (#13226)
    
    Signed-off-by: OneSizeFitQuorum <[email protected]>
---
 .../statemachine/ConfigRegionStateMachine.java     | 65 ++++++++++++----------
 .../org/apache/iotdb/consensus/IStateMachine.java  |  5 ++
 .../ratis/ApplicationStateMachineProxy.java        |  6 ++
 .../schemaregion/SchemaRegionStateMachine.java     | 29 ++++++----
 4 files changed, 67 insertions(+), 38 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
index b0d48cb6544..3861521f6e9 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
@@ -240,40 +240,48 @@ public class ConfigRegionStateMachine implements 
IStateMachine, IStateMachine.Ev
     int currentNodeId = 
ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId();
     if (currentNodeId != newLeaderId) {
       LOGGER.info(
-          "Current node [nodeId:{}, ip:port: {}] is not longer the leader, "
+          "Current node [nodeId:{}, ip:port: {}] is no longer the leader, "
               + "the new leader is [nodeId:{}]",
           currentNodeId,
           currentNodeTEndPoint,
           newLeaderId);
+    }
+  }
 
-      // Stop leader scheduling services
-      
configManager.getPipeManager().getPipeRuntimeCoordinator().stopPipeMetaSync();
-      
configManager.getPipeManager().getPipeRuntimeCoordinator().stopPipeHeartbeat();
-      configManager
-          .getSubscriptionManager()
-          .getSubscriptionCoordinator()
-          .stopSubscriptionMetaSync();
-      configManager.getLoadManager().stopLoadServices();
-      configManager.getProcedureManager().stopExecutor();
-      configManager.getRetryFailedTasksThread().stopRetryFailedTasksService();
-      configManager.getPartitionManager().stopRegionCleaner();
-      configManager.getCQManager().stopCQScheduler();
-      configManager.getClusterSchemaManager().clearSchemaQuotaCache();
-      // Remove Metric after leader change
-      configManager.removeMetrics();
-
-      // Shutdown leader related service for config pipe
-      PipeConfigNodeAgent.runtime().notifyLeaderUnavailable();
-
-      // Clean receiver file dir
-      PipeConfigNodeAgent.receiver().cleanPipeReceiverDir();
+  @Override
+  public void notifyNotLeader() {
+    // We get currentNodeId here because the currentNodeId
+    // couldn't initialize earlier than the ConfigRegionStateMachine
+    int currentNodeId = 
ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId();
+    LOGGER.info(
+        "Current node [nodeId:{}, ip:port: {}] is no longer the leader, "
+            + "start cleaning up related services",
+        currentNodeId,
+        currentNodeTEndPoint);
+    // Stop leader scheduling services
+    
configManager.getPipeManager().getPipeRuntimeCoordinator().stopPipeMetaSync();
+    
configManager.getPipeManager().getPipeRuntimeCoordinator().stopPipeHeartbeat();
+    
configManager.getSubscriptionManager().getSubscriptionCoordinator().stopSubscriptionMetaSync();
+    configManager.getLoadManager().stopLoadServices();
+    configManager.getProcedureManager().stopExecutor();
+    configManager.getRetryFailedTasksThread().stopRetryFailedTasksService();
+    configManager.getPartitionManager().stopRegionCleaner();
+    configManager.getCQManager().stopCQScheduler();
+    configManager.getClusterSchemaManager().clearSchemaQuotaCache();
+    // Remove Metric after leader change
+    configManager.removeMetrics();
 
-      LOGGER.info(
-          "Current node [nodeId:{}, ip:port: {}] is not longer the leader, "
-              + "all services on old leader are unavailable now.",
-          currentNodeId,
-          currentNodeTEndPoint);
-    }
+    // Shutdown leader related service for config pipe
+    PipeConfigNodeAgent.runtime().notifyLeaderUnavailable();
+
+    // Clean receiver file dir
+    PipeConfigNodeAgent.receiver().cleanPipeReceiverDir();
+
+    LOGGER.info(
+        "Current node [nodeId:{}, ip:port: {}] is no longer the leader, "
+            + "all services on old leader are unavailable now.",
+        currentNodeId,
+        currentNodeTEndPoint);
   }
 
   @Override
@@ -482,6 +490,7 @@ public class ConfigRegionStateMachine implements 
IStateMachine, IStateMachine.Ev
   }
 
   static class FileComparator implements Comparator<String> {
+
     @Override
     public int compare(String filename1, String filename2) {
       long id1 = parseEndIndex(filename1);
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
index bad1c4d9b7e..d0473ca1d04 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
@@ -158,6 +158,11 @@ public interface IStateMachine {
     default void notifyLeaderReady() {
       // do nothing default
     }
+
+    /** Notify the {@link IStateMachine} that this server is no longer the 
leader. */
+    default void notifyNotLeader() {
+      // do nothing default
+    }
   }
 
   /**
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
index e0e6b542dee..12095189c52 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
@@ -54,6 +54,7 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.StandardCopyOption;
+import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.BiConsumer;
 
@@ -296,6 +297,11 @@ public class ApplicationStateMachineProxy extends 
BaseStateMachine {
     applicationStateMachine.event().notifyLeaderReady();
   }
 
+  @Override
+  public void notifyNotLeader(Collection<TransactionContext> pendingEntries) 
throws IOException {
+    applicationStateMachine.event().notifyNotLeader();
+  }
+
   @Override
   public void notifyConfigurationChanged(
       long term, long index, RaftConfigurationProto newRaftConfiguration) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaRegionStateMachine.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaRegionStateMachine.java
index ca2c5f28212..f07cc34e5f6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaRegionStateMachine.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaRegionStateMachine.java
@@ -69,24 +69,33 @@ public class SchemaRegionStateMachine extends 
BaseStateMachine {
 
   @Override
   public void notifyLeaderChanged(ConsensusGroupId groupId, int newLeaderId) {
-    if (schemaRegion.getSchemaRegionId().equals(groupId)
-        && newLeaderId != 
IoTDBDescriptor.getInstance().getConfig().getDataNodeId()) {
+    if (newLeaderId != 
IoTDBDescriptor.getInstance().getConfig().getDataNodeId()) {
       logger.info(
           "Current node [nodeId: {}] is no longer the schema region leader 
[regionId: {}], "
               + "the new leader is [nodeId:{}]",
           IoTDBDescriptor.getInstance().getConfig().getDataNodeId(),
           schemaRegion.getSchemaRegionId(),
           newLeaderId);
+    }
+  }
 
-      // Shutdown leader related service for schema pipe
-      
PipeDataNodeAgent.runtime().notifySchemaLeaderUnavailable(schemaRegion.getSchemaRegionId());
+  @Override
+  public void notifyNotLeader() {
+    int dataNodeId = IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
+    logger.info(
+        "Current node [nodeId: {}] is no longer the schema region leader 
[regionId: {}], "
+            + "start cleaning up related services.",
+        dataNodeId,
+        schemaRegion.getSchemaRegionId());
 
-      logger.info(
-          "Current node [nodeId: {}] is no longer the schema region leader 
[regionId: {}], "
-              + "all services on old leader are unavailable now.",
-          IoTDBDescriptor.getInstance().getConfig().getDataNodeId(),
-          schemaRegion.getSchemaRegionId());
-    }
+    // Shutdown leader related service for schema pipe
+    
PipeDataNodeAgent.runtime().notifySchemaLeaderUnavailable(schemaRegion.getSchemaRegionId());
+
+    logger.info(
+        "Current node [nodeId: {}] is no longer the schema region leader 
[regionId: {}], "
+            + "all services on old leader are unavailable now.",
+        dataNodeId,
+        schemaRegion.getSchemaRegionId());
   }
 
   @Override

Reply via email to