This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b88c82b5ced Avoid the brain split phenomenon in the symmetric network
partition scenario #13221
b88c82b5ced is described below
commit b88c82b5cedae39adf23ad91552ff663936ab45a
Author: Potato <[email protected]>
AuthorDate: Mon Aug 19 18:08:03 2024 +0800
Avoid the brain split phenomenon in the symmetric network partition
scenario #13221
Signed-off-by: OneSizeFitQuorum <[email protected]>
---
.../statemachine/ConfigRegionStateMachine.java | 65 ++++++++++++----------
.../org/apache/iotdb/consensus/IStateMachine.java | 5 ++
.../ratis/ApplicationStateMachineProxy.java | 6 ++
.../schemaregion/SchemaRegionStateMachine.java | 29 ++++++----
4 files changed, 67 insertions(+), 38 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
index b0d48cb6544..3861521f6e9 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
@@ -240,40 +240,48 @@ public class ConfigRegionStateMachine implements
IStateMachine, IStateMachine.Ev
int currentNodeId =
ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId();
if (currentNodeId != newLeaderId) {
LOGGER.info(
- "Current node [nodeId:{}, ip:port: {}] is not longer the leader, "
+ "Current node [nodeId:{}, ip:port: {}] is no longer the leader, "
+ "the new leader is [nodeId:{}]",
currentNodeId,
currentNodeTEndPoint,
newLeaderId);
+ }
+ }
- // Stop leader scheduling services
-
configManager.getPipeManager().getPipeRuntimeCoordinator().stopPipeMetaSync();
-
configManager.getPipeManager().getPipeRuntimeCoordinator().stopPipeHeartbeat();
- configManager
- .getSubscriptionManager()
- .getSubscriptionCoordinator()
- .stopSubscriptionMetaSync();
- configManager.getLoadManager().stopLoadServices();
- configManager.getProcedureManager().stopExecutor();
- configManager.getRetryFailedTasksThread().stopRetryFailedTasksService();
- configManager.getPartitionManager().stopRegionCleaner();
- configManager.getCQManager().stopCQScheduler();
- configManager.getClusterSchemaManager().clearSchemaQuotaCache();
- // Remove Metric after leader change
- configManager.removeMetrics();
-
- // Shutdown leader related service for config pipe
- PipeConfigNodeAgent.runtime().notifyLeaderUnavailable();
-
- // Clean receiver file dir
- PipeConfigNodeAgent.receiver().cleanPipeReceiverDir();
+ @Override
+ public void notifyNotLeader() {
+ // We get currentNodeId here because the currentNodeId
+ // couldn't initialize earlier than the ConfigRegionStateMachine
+ int currentNodeId =
ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId();
+ LOGGER.info(
+ "Current node [nodeId:{}, ip:port: {}] is no longer the leader, "
+ + "start cleaning up related services",
+ currentNodeId,
+ currentNodeTEndPoint);
+ // Stop leader scheduling services
+
configManager.getPipeManager().getPipeRuntimeCoordinator().stopPipeMetaSync();
+
configManager.getPipeManager().getPipeRuntimeCoordinator().stopPipeHeartbeat();
+
configManager.getSubscriptionManager().getSubscriptionCoordinator().stopSubscriptionMetaSync();
+ configManager.getLoadManager().stopLoadServices();
+ configManager.getProcedureManager().stopExecutor();
+ configManager.getRetryFailedTasksThread().stopRetryFailedTasksService();
+ configManager.getPartitionManager().stopRegionCleaner();
+ configManager.getCQManager().stopCQScheduler();
+ configManager.getClusterSchemaManager().clearSchemaQuotaCache();
+ // Remove Metric after leader change
+ configManager.removeMetrics();
- LOGGER.info(
- "Current node [nodeId:{}, ip:port: {}] is not longer the leader, "
- + "all services on old leader are unavailable now.",
- currentNodeId,
- currentNodeTEndPoint);
- }
+ // Shutdown leader related service for config pipe
+ PipeConfigNodeAgent.runtime().notifyLeaderUnavailable();
+
+ // Clean receiver file dir
+ PipeConfigNodeAgent.receiver().cleanPipeReceiverDir();
+
+ LOGGER.info(
+ "Current node [nodeId:{}, ip:port: {}] is no longer the leader, "
+ + "all services on old leader are unavailable now.",
+ currentNodeId,
+ currentNodeTEndPoint);
}
@Override
@@ -482,6 +490,7 @@ public class ConfigRegionStateMachine implements
IStateMachine, IStateMachine.Ev
}
static class FileComparator implements Comparator<String> {
+
@Override
public int compare(String filename1, String filename2) {
long id1 = parseEndIndex(filename1);
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
index c6f0b7d6173..692b60beb2a 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
@@ -158,6 +158,11 @@ public interface IStateMachine {
default void notifyLeaderReady() {
// do nothing default
}
+
+ /** Notify the {@link IStateMachine} that this server is no longer the
leader. */
+ default void notifyNotLeader() {
+ // do nothing default
+ }
}
/**
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
index e0e6b542dee..12095189c52 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
@@ -54,6 +54,7 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
+import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
@@ -296,6 +297,11 @@ public class ApplicationStateMachineProxy extends
BaseStateMachine {
applicationStateMachine.event().notifyLeaderReady();
}
+ @Override
+ public void notifyNotLeader(Collection<TransactionContext> pendingEntries)
throws IOException {
+ applicationStateMachine.event().notifyNotLeader();
+ }
+
@Override
public void notifyConfigurationChanged(
long term, long index, RaftConfigurationProto newRaftConfiguration) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaRegionStateMachine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaRegionStateMachine.java
index ca2c5f28212..f07cc34e5f6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaRegionStateMachine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaRegionStateMachine.java
@@ -69,24 +69,33 @@ public class SchemaRegionStateMachine extends
BaseStateMachine {
@Override
public void notifyLeaderChanged(ConsensusGroupId groupId, int newLeaderId) {
- if (schemaRegion.getSchemaRegionId().equals(groupId)
- && newLeaderId !=
IoTDBDescriptor.getInstance().getConfig().getDataNodeId()) {
+ if (newLeaderId !=
IoTDBDescriptor.getInstance().getConfig().getDataNodeId()) {
logger.info(
"Current node [nodeId: {}] is no longer the schema region leader
[regionId: {}], "
+ "the new leader is [nodeId:{}]",
IoTDBDescriptor.getInstance().getConfig().getDataNodeId(),
schemaRegion.getSchemaRegionId(),
newLeaderId);
+ }
+ }
- // Shutdown leader related service for schema pipe
-
PipeDataNodeAgent.runtime().notifySchemaLeaderUnavailable(schemaRegion.getSchemaRegionId());
+ @Override
+ public void notifyNotLeader() {
+ int dataNodeId = IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
+ logger.info(
+ "Current node [nodeId: {}] is no longer the schema region leader
[regionId: {}], "
+ + "start cleaning up related services.",
+ dataNodeId,
+ schemaRegion.getSchemaRegionId());
- logger.info(
- "Current node [nodeId: {}] is no longer the schema region leader
[regionId: {}], "
- + "all services on old leader are unavailable now.",
- IoTDBDescriptor.getInstance().getConfig().getDataNodeId(),
- schemaRegion.getSchemaRegionId());
- }
+ // Shutdown leader related service for schema pipe
+
PipeDataNodeAgent.runtime().notifySchemaLeaderUnavailable(schemaRegion.getSchemaRegionId());
+
+ logger.info(
+ "Current node [nodeId: {}] is no longer the schema region leader
[regionId: {}], "
+ + "all services on old leader are unavailable now.",
+ dataNodeId,
+ schemaRegion.getSchemaRegionId());
}
@Override