This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c5717846148 Fix the conflict between Files.walk and delete file &&
Catch UncheckedIOException && some other bugs (#12429)
c5717846148 is described below
commit c5717846148635db27a21653b325d0149073f93f
Author: Li Yu Heng <[email protected]>
AuthorDate: Sun Apr 28 19:27:02 2024 +0800
Fix the conflict between Files.walk and delete file && Catch
UncheckedIOException && some other bugs (#12429)
* done
* other small fix
* improve
---
.../iotdb/confignode/manager/ProcedureManager.java | 18 ++++++++++++++----
.../iotdb/confignode/manager/load/cache/LoadCache.java | 4 +++-
.../procedure/impl/region/AddRegionPeerProcedure.java | 1 +
.../impl/region/RemoveRegionPeerProcedure.java | 8 ++++++--
.../org/apache/iotdb/consensus/iot/IoTConsensus.java | 4 ++--
.../iotdb/consensus/iot/IoTConsensusServerImpl.java | 11 ++++++++---
.../apache/iotdb/db/service/RegionMigrateService.java | 8 ++++++++
7 files changed, 42 insertions(+), 12 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 58cdac06271..387fe1accb7 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -107,6 +107,7 @@ import
org.apache.iotdb.confignode.rpc.thrift.TDeleteLogicalViewReq;
import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TSubscribeReq;
import org.apache.iotdb.confignode.rpc.thrift.TUnsubscribeReq;
+import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.exception.BatchProcessException;
import org.apache.iotdb.db.schemaengine.template.Template;
import org.apache.iotdb.rpc.RpcUtils;
@@ -567,14 +568,23 @@ public class ProcedureManager {
return false;
})
.findAny();
- if (anotherMigrateProcedure.isPresent()) {
+ ConfigNodeConfig conf = ConfigNodeDescriptor.getInstance().getConf();
+ if (TConsensusGroupType.DataRegion == regionGroupId.getType()
+ &&
ConsensusFactory.SIMPLE_CONSENSUS.equals(conf.getDataRegionConsensusProtocolClass()))
{
+ failMessage =
+ "The region you are trying to migrate is using SimpleConsensus, and
SimpleConsensus not supports region migration.";
+ } else if (TConsensusGroupType.SchemaRegion == regionGroupId.getType()
+ &&
ConsensusFactory.SIMPLE_CONSENSUS.equals(conf.getSchemaRegionConsensusProtocolClass()))
{
+ failMessage =
+ "The region you are trying to migrate is using SimpleConsensus, and
SimpleConsensus not supports region migration.";
+ } else if (anotherMigrateProcedure.isPresent()) {
failMessage =
String.format(
"Submit RegionMigrateProcedure failed, "
+ "because another RegionMigrateProcedure of the same
consensus group %d is already in processing. "
- + "A consensus group is able to have at most 1
RegionMigrateProcedure at the same time"
- + "For further information, you can search [pid%d] in log.",
- regionGroupId, anotherMigrateProcedure.get().getProcId());
+ + "A consensus group is able to have at most 1
RegionMigrateProcedure at the same time. "
+ + "For further information, please search [pid%d] in log. ",
+ regionGroupId.getId(),
anotherMigrateProcedure.get().getProcId());
} else if (originalDataNode == null) {
failMessage =
String.format(
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
index be561937b9b..1ad90a2b02a 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
@@ -51,6 +51,7 @@ import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
@@ -218,7 +219,8 @@ public class LoadCache {
* @param dataNodeId the specified DataNode
*/
public void removeRegionCache(TConsensusGroupId regionGroupId, int
dataNodeId) {
- regionGroupCacheMap.get(regionGroupId).removeRegionCache(dataNodeId);
+ Optional.of(regionGroupCacheMap.get(regionGroupId))
+ .ifPresent(cache -> cache.removeRegionCache(dataNodeId));
}
/**
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java
index bebf03f6792..b8dd6075431 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java
@@ -96,6 +96,7 @@ public class AddRegionPeerProcedure
setKillPoint(state);
if (status.getCode() != SUCCESS_STATUS.getStatusCode()) {
rollback(env, handler);
+ return Flow.NO_MORE_STATE;
}
setNextState(AddRegionPeerState.DO_ADD_REGION_PEER);
break;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
index 17fdc903295..1bdf473c9eb 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
@@ -106,7 +106,7 @@ public class RemoveRegionPeerProcedure
getProcId(),
state);
setNextState(DELETE_OLD_REGION_PEER);
- break;
+ return Flow.HAS_MORE_STATE;
}
TRegionMigrateResult removeRegionPeerResult =
handler.waitTaskFinish(this.getProcId(), coordinator);
@@ -115,6 +115,8 @@ public class RemoveRegionPeerProcedure
"[pid{}][RemoveRegion] {} executed failed, procedure will
continue. You should manually clear peer list.",
getProcId(),
state);
+ setNextState(DELETE_OLD_REGION_PEER);
+ return Flow.HAS_MORE_STATE;
}
setNextState(DELETE_OLD_REGION_PEER);
break;
@@ -129,7 +131,7 @@ public class RemoveRegionPeerProcedure
"[pid{}][RemoveRegion] DELETE_OLD_REGION_PEER task submitted
failed, procedure will continue. You should manually delete region file.",
getProcId());
setNextState(REMOVE_REGION_LOCATION_CACHE);
- break;
+ return Flow.HAS_MORE_STATE;
}
TRegionMigrateResult deleteOldRegionPeerResult =
handler.waitTaskFinish(this.getProcId(), targetDataNode);
@@ -137,6 +139,8 @@ public class RemoveRegionPeerProcedure
LOGGER.warn(
"[pid{}][RemoveRegion] DELETE_OLD_REGION_PEER executed failed,
procedure will continue. You should manually delete region file.",
getProcId());
+ setNextState(REMOVE_REGION_LOCATION_CACHE);
+ return Flow.HAS_MORE_STATE;
}
setNextState(REMOVE_REGION_LOCATION_CACHE);
break;
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
index 2693cd8716b..46d911608f9 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
@@ -280,13 +280,13 @@ public class IoTConsensus implements IConsensus {
(k, v) -> {
exist.set(true);
v.stop();
- FileUtils.deleteFileOrDirectory(new File(buildPeerDir(storageDir,
groupId)));
return null;
});
- KillPoint.setKillPoint(IoTConsensusDeleteLocalPeerKillPoints.AFTER_DELETE);
if (!exist.get()) {
throw new ConsensusGroupNotExistException(groupId);
}
+ FileUtils.deleteFileOrDirectory(new File(buildPeerDir(storageDir,
groupId)));
+ KillPoint.setKillPoint(IoTConsensusDeleteLocalPeerKillPoints.AFTER_DELETE);
}
@Override
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index 34f2de4aec5..42aac9f4641 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -74,6 +74,7 @@ import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
@@ -962,7 +963,7 @@ public class IoTConsensusServerImpl {
}
private void renameTmpConfigurationFileToRemoveSuffix() throws IOException {
- try (Stream<Path> stream = Files.walk(Paths.get(storageDir))) {
+ try (Stream<Path> stream = Files.list(Paths.get(storageDir))) {
List<Path> paths =
stream
.filter(Files::isRegularFile)
@@ -978,18 +979,20 @@ public class IoTConsensusServerImpl {
try {
Files.delete(targetFile.toPath());
} catch (IOException e) {
- logger.error("Unexpected error occurs when delete file: {}",
targetPath);
+ logger.error("Unexpected error occurs when delete file: {}",
targetPath, e);
}
}
if (!filePath.toFile().renameTo(targetFile)) {
logger.error("Unexpected error occurs when rename file: {} -> {}",
filePath, targetPath);
}
}
+ } catch (UncheckedIOException e) {
+ throw e.getCause();
}
}
private void deleteConfiguration() throws IOException {
- try (Stream<Path> stream = Files.walk(Paths.get(storageDir))) {
+ try (Stream<Path> stream = Files.list(Paths.get(storageDir))) {
stream
.filter(Files::isRegularFile)
.filter(filePath ->
filePath.getFileName().toString().endsWith(CONFIGURATION_FILE_NAME))
@@ -1004,6 +1007,8 @@ public class IoTConsensusServerImpl {
e);
}
});
+ } catch (UncheckedIOException e) {
+ throw e.getCause();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
index 7061a6b5c38..4a455637220 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
@@ -304,6 +304,10 @@ public class RegionMigrateService implements IService {
regionId,
i,
e);
+ } catch (Exception e) {
+ addPeerSucceed = false;
+ throwable = e;
+ taskLogger.warn("Unexpected exception", e);
}
if (addPeerSucceed || throwable instanceof InterruptedException) {
break;
@@ -402,6 +406,10 @@ public class RegionMigrateService implements IService {
regionId,
i,
e);
+ } catch (Exception e) {
+ removePeerSucceed = false;
+ throwable = e;
+ taskLogger.warn("Unexpected exception", e);
}
if (removePeerSucceed || throwable instanceof InterruptedException) {
break;