This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a322ab556a6 Fix the issue of restarting DataNode to clean up
InvalidDataRegion (#13535)
a322ab556a6 is described below
commit a322ab556a6491ec073a9b0c7d4be0c12aeba8c3
Author: 133tosakarin <[email protected]>
AuthorDate: Thu Sep 19 02:42:44 2024 +0800
Fix the issue of restarting DataNode to clean up InvalidDataRegion (#13535)
---
.../apache/iotdb/db/schemaengine/SchemaEngine.java | 58 +++++++-----
.../java/org/apache/iotdb/db/service/DataNode.java | 101 +++++++++++++++------
2 files changed, 108 insertions(+), 51 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java
index 741e27487d9..3740a7ef0dc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java
@@ -133,40 +133,27 @@ public class SchemaEngine {
}
}
- /**
- * Scan the database and schema region directories to recover schema regions
and return the
- * collected local schema partition info for localSchemaPartitionTable
recovery.
- */
- @SuppressWarnings("java:S2142")
- private void initSchemaRegion() {
+ public static Map<String, List<SchemaRegionId>> getLocalSchemaRegionInfo() {
final File schemaDir = new File(config.getSchemaDir());
final File[] sgDirList = schemaDir.listFiles();
-
+ final Map<String, List<SchemaRegionId>> localSchemaPartitionTable = new
HashMap<>();
if (sgDirList == null) {
- return;
+ return localSchemaPartitionTable;
}
-
- // recover SchemaRegion concurrently
- final ExecutorService schemaRegionRecoverPools =
- IoTDBThreadPoolFactory.newFixedThreadPool(
- Runtime.getRuntime().availableProcessors(),
- ThreadName.SCHEMA_REGION_RECOVER_TASK.getName());
- final List<Future<ISchemaRegion>> futures = new ArrayList<>();
-
for (File file : sgDirList) {
if (!file.isDirectory()) {
continue;
}
- final PartialPath storageGroup;
+ final PartialPath database;
try {
- storageGroup = PartialPath.getDatabasePath(file.getName());
+ database = PartialPath.getDatabasePath(file.getName());
} catch (IllegalPathException illegalPathException) {
// not a legal sg dir
continue;
}
- final File sgDir = new File(config.getSchemaDir(),
storageGroup.getFullPath());
+ final File sgDir = new File(config.getSchemaDir(),
database.getFullPath());
if (!sgDir.exists()) {
continue;
@@ -176,7 +163,7 @@ public class SchemaEngine {
if (schemaRegionDirs == null) {
continue;
}
-
+ List<SchemaRegionId> schemaRegionIds = new ArrayList<>();
for (final File schemaRegionDir : schemaRegionDirs) {
final SchemaRegionId schemaRegionId;
try {
@@ -185,11 +172,38 @@ public class SchemaEngine {
// the dir/file is not schemaRegionDir, ignore this.
continue;
}
- futures.add(
-
schemaRegionRecoverPools.submit(recoverSchemaRegionTask(storageGroup,
schemaRegionId)));
+ schemaRegionIds.add(schemaRegionId);
}
+ localSchemaPartitionTable.put(database.getFullPath(), schemaRegionIds);
}
+ return localSchemaPartitionTable;
+ }
+ /**
+ * Scan the database and schema region directories to recover schema regions
and return the
+ * collected local schema partition info for localSchemaPartitionTable
recovery.
+ */
+ @SuppressWarnings("java:S2142")
+ private void initSchemaRegion() {
+ // recover SchemaRegion concurrently
+ Map<String, List<SchemaRegionId>> localSchemaRegionInfo =
getLocalSchemaRegionInfo();
+ final ExecutorService schemaRegionRecoverPools =
+ IoTDBThreadPoolFactory.newFixedThreadPool(
+ Runtime.getRuntime().availableProcessors(),
+ ThreadName.SCHEMA_REGION_RECOVER_TASK.getName());
+ final List<Future<ISchemaRegion>> futures = new ArrayList<>();
+ localSchemaRegionInfo.forEach(
+ (k, v) -> {
+ for (SchemaRegionId schemaRegionId : v) {
+ try {
+ futures.add(
+ schemaRegionRecoverPools.submit(
+ recoverSchemaRegionTask(new PartialPath(k),
schemaRegionId)));
+ } catch (IllegalPathException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
for (final Future<ISchemaRegion> future : futures) {
try {
final ISchemaRegion schemaRegion = future.get();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 2dc24b8f78f..7aa5eedd20c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -32,6 +32,8 @@ import
org.apache.iotdb.commons.concurrent.IoTDBDefaultThreadExceptionHandler;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.exception.StartupException;
@@ -123,6 +125,7 @@ import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
@@ -215,7 +218,6 @@ public class DataNode extends ServerCommandLine implements
DataNodeMBean {
// Pull and check system configurations from ConfigNode-leader
pullAndCheckSystemConfigurations();
-
if (isFirstStart) {
sendRegisterRequestToConfigNode(true);
IoTDBStartCheck.getInstance().generateOrOverwriteSystemPropertiesFile();
@@ -544,49 +546,92 @@ public class DataNode extends ServerCommandLine
implements DataNodeMBean {
}
private void removeInvalidRegions(List<ConsensusGroupId>
dataNodeConsensusGroupIds) {
+ removeInvalidConsensusDataRegions(dataNodeConsensusGroupIds);
+ removeInvalidDataRegions(dataNodeConsensusGroupIds);
+ removeInvalidConsensusSchemaRegions(dataNodeConsensusGroupIds);
+ removeInvalidSchemaRegions(dataNodeConsensusGroupIds);
+ }
+
+ private void removeInvalidDataRegions(List<ConsensusGroupId>
dataNodeConsensusGroupIds) {
+ Map<String, List<DataRegionId>> localDataRegionInfo =
+ StorageEngine.getInstance().getLocalDataRegionInfo();
+ List<String> allLocalFilesFolders =
TierManager.getInstance().getAllLocalFilesFolders();
+ localDataRegionInfo.forEach(
+ (database, dataRegionIds) -> {
+ for (DataRegionId dataRegionId : dataRegionIds) {
+ if (!dataNodeConsensusGroupIds.contains(dataRegionId)) {
+ removeDataDirRegion(database, dataRegionId,
allLocalFilesFolders);
+ }
+ }
+ });
+ }
+
+ private void removeInvalidConsensusDataRegions(List<ConsensusGroupId>
dataNodeConsensusGroupIds) {
List<ConsensusGroupId> invalidDataRegionConsensusGroupIds =
DataRegionConsensusImpl.getInstance().getAllConsensusGroupIdsWithoutStarting().stream()
.filter(consensusGroupId ->
!dataNodeConsensusGroupIds.contains(consensusGroupId))
.collect(Collectors.toList());
-
- List<ConsensusGroupId> invalidSchemaRegionConsensusGroupIds =
-
SchemaRegionConsensusImpl.getInstance().getAllConsensusGroupIdsWithoutStarting().stream()
- .filter(consensusGroupId ->
!dataNodeConsensusGroupIds.contains(consensusGroupId))
- .collect(Collectors.toList());
- removeInvalidDataRegions(invalidDataRegionConsensusGroupIds);
- removeInvalidSchemaRegions(invalidSchemaRegionConsensusGroupIds);
- }
-
- private void removeInvalidDataRegions(List<ConsensusGroupId>
invalidConsensusGroupId) {
- logger.info("Remove invalid dataRegion directories... {}",
invalidConsensusGroupId);
- for (ConsensusGroupId consensusGroupId : invalidConsensusGroupId) {
+ logger.info("Remove invalid dataRegion directories... {}",
invalidDataRegionConsensusGroupIds);
+ for (ConsensusGroupId consensusGroupId :
invalidDataRegionConsensusGroupIds) {
File oldDir =
new File(
DataRegionConsensusImpl.getInstance()
.getRegionDirFromConsensusGroupId(consensusGroupId));
- removeRegionsDir(oldDir);
+ removeDir(oldDir);
}
}
- private void removeInvalidSchemaRegions(List<ConsensusGroupId>
invalidConsensusGroupId) {
- logger.info("Remove invalid schemaRegion directories... {}",
invalidConsensusGroupId);
- for (ConsensusGroupId consensusGroupId : invalidConsensusGroupId) {
+ private void removeInvalidSchemaRegions(List<ConsensusGroupId>
schemaConsensusGroupIds) {
+ Map<String, List<SchemaRegionId>> localSchemaRegionInfo =
+ SchemaEngine.getLocalSchemaRegionInfo();
+ localSchemaRegionInfo.forEach(
+ (database, schemaRegionIds) -> {
+ for (SchemaRegionId schemaRegionId : schemaRegionIds) {
+ if (!schemaConsensusGroupIds.contains(schemaRegionId)) {
+ removeInvalidSchemaDir(database, schemaRegionId);
+ }
+ }
+ });
+ }
+
+ private void removeDataDirRegion(
+ String database, DataRegionId dataRegionId, List<String> fileFolders) {
+ fileFolders.forEach(
+ folder -> {
+ String regionDir =
+ folder + File.separator + database + File.separator +
dataRegionId.getId();
+ removeDir(new File(regionDir));
+ });
+ }
+
+ private void removeInvalidConsensusSchemaRegions(
+ List<ConsensusGroupId> dataNodeConsensusGroupIds) {
+ List<ConsensusGroupId> invalidSchemaRegionConsensusGroupIds =
+
SchemaRegionConsensusImpl.getInstance().getAllConsensusGroupIdsWithoutStarting().stream()
+ .filter(consensusGroupId ->
!dataNodeConsensusGroupIds.contains(consensusGroupId))
+ .collect(Collectors.toList());
+ logger.info(
+ "Remove invalid schemaRegion directories... {}",
invalidSchemaRegionConsensusGroupIds);
+
+ for (ConsensusGroupId consensusGroupId :
invalidSchemaRegionConsensusGroupIds) {
File oldDir =
new File(
SchemaRegionConsensusImpl.getInstance()
.getRegionDirFromConsensusGroupId(consensusGroupId));
- removeRegionsDir(oldDir);
+ removeDir(oldDir);
}
}
- private void removeRegionsDir(File regionDir) {
+ private void removeInvalidSchemaDir(String database, SchemaRegionId
schemaRegionId) {
+ String systemSchemaDir =
+ config.getSystemDir() + File.separator + database + File.separator +
schemaRegionId.getId();
+ removeDir(new File(systemSchemaDir));
+ }
+
+ private void removeDir(File regionDir) {
if (regionDir.exists()) {
- try {
- FileUtils.recursivelyDeleteFolder(regionDir.getPath());
- logger.info("delete {} succeed.", regionDir.getAbsolutePath());
- } catch (IOException e) {
- logger.error("delete {} failed.", regionDir.getAbsolutePath());
- }
+ FileUtils.deleteDirectoryAndEmptyParent(regionDir);
+ logger.info("delete {} succeed.", regionDir.getAbsolutePath());
} else {
logger.info("delete {} failed, because it does not exist.",
regionDir.getAbsolutePath());
}
@@ -645,12 +690,10 @@ public class DataNode extends ServerCommandLine
implements DataNodeMBean {
(endTime - startTime));
List<TConsensusGroupId> consensusGroupIds =
dataNodeRestartResp.getConsensusGroupIds();
- List<ConsensusGroupId> dataNodeConsensusGroupIds =
+ removeInvalidRegions(
consensusGroupIds.stream()
.map(ConsensusGroupId.Factory::createFromTConsensusGroupId)
- .collect(Collectors.toList());
-
- removeInvalidRegions(dataNodeConsensusGroupIds);
+ .collect(Collectors.toList()));
} else {
/* Throw exception when restart is rejected */
throw new StartupException(dataNodeRestartResp.getStatus().getMessage());