This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new c5717846148 Fix the conflict between Files.walk and delete file  &&  
Catch UncheckedIOException && some other bugs (#12429)
c5717846148 is described below

commit c5717846148635db27a21653b325d0149073f93f
Author: Li Yu Heng <[email protected]>
AuthorDate: Sun Apr 28 19:27:02 2024 +0800

    Fix the conflict between Files.walk and delete file  &&  Catch 
UncheckedIOException && some other bugs (#12429)
    
    * done
    
    * other small fix
    
    * improve
---
 .../iotdb/confignode/manager/ProcedureManager.java     | 18 ++++++++++++++----
 .../iotdb/confignode/manager/load/cache/LoadCache.java |  4 +++-
 .../procedure/impl/region/AddRegionPeerProcedure.java  |  1 +
 .../impl/region/RemoveRegionPeerProcedure.java         |  8 ++++++--
 .../org/apache/iotdb/consensus/iot/IoTConsensus.java   |  4 ++--
 .../iotdb/consensus/iot/IoTConsensusServerImpl.java    | 11 ++++++++---
 .../apache/iotdb/db/service/RegionMigrateService.java  |  8 ++++++++
 7 files changed, 42 insertions(+), 12 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 58cdac06271..387fe1accb7 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -107,6 +107,7 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TDeleteLogicalViewReq;
 import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TSubscribeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TUnsubscribeReq;
+import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.db.exception.BatchProcessException;
 import org.apache.iotdb.db.schemaengine.template.Template;
 import org.apache.iotdb.rpc.RpcUtils;
@@ -567,14 +568,23 @@ public class ProcedureManager {
                   return false;
                 })
             .findAny();
-    if (anotherMigrateProcedure.isPresent()) {
+    ConfigNodeConfig conf = ConfigNodeDescriptor.getInstance().getConf();
+    if (TConsensusGroupType.DataRegion == regionGroupId.getType()
+        && 
ConsensusFactory.SIMPLE_CONSENSUS.equals(conf.getDataRegionConsensusProtocolClass()))
 {
+      failMessage =
+          "The region you are trying to migrate is using SimpleConsensus, and 
SimpleConsensus not supports region migration.";
+    } else if (TConsensusGroupType.SchemaRegion == regionGroupId.getType()
+        && 
ConsensusFactory.SIMPLE_CONSENSUS.equals(conf.getSchemaRegionConsensusProtocolClass()))
 {
+      failMessage =
+          "The region you are trying to migrate is using SimpleConsensus, and 
SimpleConsensus not supports region migration.";
+    } else if (anotherMigrateProcedure.isPresent()) {
       failMessage =
           String.format(
               "Submit RegionMigrateProcedure failed, "
                   + "because another RegionMigrateProcedure of the same 
consensus group %d is already in processing. "
-                  + "A consensus group is able to have at most 1 
RegionMigrateProcedure at the same time"
-                  + "For further information, you can search [pid%d] in log.",
-              regionGroupId, anotherMigrateProcedure.get().getProcId());
+                  + "A consensus group is able to have at most 1 
RegionMigrateProcedure at the same time. "
+                  + "For further information, please search [pid%d] in log. ",
+              regionGroupId.getId(), 
anotherMigrateProcedure.get().getProcId());
     } else if (originalDataNode == null) {
       failMessage =
           String.format(
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
index be561937b9b..1ad90a2b02a 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
@@ -51,6 +51,7 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
@@ -218,7 +219,8 @@ public class LoadCache {
    * @param dataNodeId the specified DataNode
    */
   public void removeRegionCache(TConsensusGroupId regionGroupId, int 
dataNodeId) {
-    regionGroupCacheMap.get(regionGroupId).removeRegionCache(dataNodeId);
+    Optional.of(regionGroupCacheMap.get(regionGroupId))
+        .ifPresent(cache -> cache.removeRegionCache(dataNodeId));
   }
 
   /**
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java
index bebf03f6792..b8dd6075431 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java
@@ -96,6 +96,7 @@ public class AddRegionPeerProcedure
           setKillPoint(state);
           if (status.getCode() != SUCCESS_STATUS.getStatusCode()) {
             rollback(env, handler);
+            return Flow.NO_MORE_STATE;
           }
           setNextState(AddRegionPeerState.DO_ADD_REGION_PEER);
           break;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
index 17fdc903295..1bdf473c9eb 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
@@ -106,7 +106,7 @@ public class RemoveRegionPeerProcedure
                 getProcId(),
                 state);
             setNextState(DELETE_OLD_REGION_PEER);
-            break;
+            return Flow.HAS_MORE_STATE;
           }
           TRegionMigrateResult removeRegionPeerResult =
               handler.waitTaskFinish(this.getProcId(), coordinator);
@@ -115,6 +115,8 @@ public class RemoveRegionPeerProcedure
                 "[pid{}][RemoveRegion] {} executed failed, procedure will 
continue. You should manually clear peer list.",
                 getProcId(),
                 state);
+            setNextState(DELETE_OLD_REGION_PEER);
+            return Flow.HAS_MORE_STATE;
           }
           setNextState(DELETE_OLD_REGION_PEER);
           break;
@@ -129,7 +131,7 @@ public class RemoveRegionPeerProcedure
                 "[pid{}][RemoveRegion] DELETE_OLD_REGION_PEER task submitted 
failed, procedure will continue. You should manually delete region file.",
                 getProcId());
             setNextState(REMOVE_REGION_LOCATION_CACHE);
-            break;
+            return Flow.HAS_MORE_STATE;
           }
           TRegionMigrateResult deleteOldRegionPeerResult =
               handler.waitTaskFinish(this.getProcId(), targetDataNode);
@@ -137,6 +139,8 @@ public class RemoveRegionPeerProcedure
             LOGGER.warn(
                 "[pid{}][RemoveRegion] DELETE_OLD_REGION_PEER executed failed, 
procedure will continue. You should manually delete region file.",
                 getProcId());
+            setNextState(REMOVE_REGION_LOCATION_CACHE);
+            return Flow.HAS_MORE_STATE;
           }
           setNextState(REMOVE_REGION_LOCATION_CACHE);
           break;
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
index 2693cd8716b..46d911608f9 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
@@ -280,13 +280,13 @@ public class IoTConsensus implements IConsensus {
         (k, v) -> {
           exist.set(true);
           v.stop();
-          FileUtils.deleteFileOrDirectory(new File(buildPeerDir(storageDir, 
groupId)));
           return null;
         });
-    KillPoint.setKillPoint(IoTConsensusDeleteLocalPeerKillPoints.AFTER_DELETE);
     if (!exist.get()) {
       throw new ConsensusGroupNotExistException(groupId);
     }
+    FileUtils.deleteFileOrDirectory(new File(buildPeerDir(storageDir, 
groupId)));
+    KillPoint.setKillPoint(IoTConsensusDeleteLocalPeerKillPoints.AFTER_DELETE);
   }
 
   @Override
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index 34f2de4aec5..42aac9f4641 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -74,6 +74,7 @@ import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.file.Files;
@@ -962,7 +963,7 @@ public class IoTConsensusServerImpl {
   }
 
   private void renameTmpConfigurationFileToRemoveSuffix() throws IOException {
-    try (Stream<Path> stream = Files.walk(Paths.get(storageDir))) {
+    try (Stream<Path> stream = Files.list(Paths.get(storageDir))) {
       List<Path> paths =
           stream
               .filter(Files::isRegularFile)
@@ -978,18 +979,20 @@ public class IoTConsensusServerImpl {
           try {
             Files.delete(targetFile.toPath());
           } catch (IOException e) {
-            logger.error("Unexpected error occurs when delete file: {}", 
targetPath);
+            logger.error("Unexpected error occurs when delete file: {}", 
targetPath, e);
           }
         }
         if (!filePath.toFile().renameTo(targetFile)) {
           logger.error("Unexpected error occurs when rename file: {} -> {}", 
filePath, targetPath);
         }
       }
+    } catch (UncheckedIOException e) {
+      throw e.getCause();
     }
   }
 
   private void deleteConfiguration() throws IOException {
-    try (Stream<Path> stream = Files.walk(Paths.get(storageDir))) {
+    try (Stream<Path> stream = Files.list(Paths.get(storageDir))) {
       stream
           .filter(Files::isRegularFile)
           .filter(filePath -> 
filePath.getFileName().toString().endsWith(CONFIGURATION_FILE_NAME))
@@ -1004,6 +1007,8 @@ public class IoTConsensusServerImpl {
                       e);
                 }
               });
+    } catch (UncheckedIOException e) {
+      throw e.getCause();
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
index 7061a6b5c38..4a455637220 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
@@ -304,6 +304,10 @@ public class RegionMigrateService implements IService {
               regionId,
               i,
               e);
+        } catch (Exception e) {
+          addPeerSucceed = false;
+          throwable = e;
+          taskLogger.warn("Unexpected exception", e);
         }
         if (addPeerSucceed || throwable instanceof InterruptedException) {
           break;
@@ -402,6 +406,10 @@ public class RegionMigrateService implements IService {
               regionId,
               i,
               e);
+        } catch (Exception e) {
+          removePeerSucceed = false;
+          throwable = e;
+          taskLogger.warn("Unexpected exception", e);
         }
         if (removePeerSucceed || throwable instanceof InterruptedException) {
           break;

Reply via email to