This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
     new 2eda8bec8fd Fix query stuck while DN restarting & keep quite while 
cleaning sort tmp file
2eda8bec8fd is described below

commit 2eda8bec8fd1ba0fac187ed2da88be517256ebff
Author: Jackie Tien <[email protected]>
AuthorDate: Thu Aug 29 16:35:36 2024 +0800

    Fix query stuck while DN restarting & keep quite while cleaning sort tmp 
file
---
 .../db/queryengine/execution/driver/Driver.java    |  2 +-
 .../fragment/FragmentInstanceExecution.java        |  2 +-
 .../execution/fragment/FragmentInstanceState.java  |  2 +-
 .../scheduler/FixedRateFragInsStateTracker.java    | 60 ++++++++++++++--------
 .../org/apache/iotdb/commons/utils/FileUtils.java  | 10 +++-
 5 files changed, 50 insertions(+), 26 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/Driver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/Driver.java
index 1211df6b74b..812c84298fc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/Driver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/Driver.java
@@ -445,7 +445,7 @@ public abstract class Driver implements IDriver {
     if (!tmpPipeLineDir.exists()) {
       return;
     }
-    FileUtils.deleteFileOrDirectory(tmpPipeLineDir);
+    FileUtils.deleteFileOrDirectory(tmpPipeLineDir, true);
   }
 
   private static Throwable addSuppressedException(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
index c42834f5190..d1c8cc8ff6a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
@@ -358,7 +358,7 @@ public class FragmentInstanceExecution {
               + File.separator;
       File tmpFile = new File(tmpFilePath);
       if (tmpFile.exists()) {
-        FileUtils.deleteFileOrDirectory(tmpFile);
+        FileUtils.deleteFileOrDirectory(tmpFile, true);
       }
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceState.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceState.java
index 77531e94494..092b1be3816 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceState.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceState.java
@@ -47,7 +47,7 @@ public enum FragmentInstanceState {
   /** Instance execution failed. */
   FAILED(true, true),
   /** Instance is not found. */
-  NO_SUCH_INSTANCE(false, false);
+  NO_SUCH_INSTANCE(false, true);
 
   public static final Set<FragmentInstanceState> TERMINAL_INSTANCE_STATES =
       Stream.of(FragmentInstanceState.values())
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FixedRateFragInsStateTracker.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FixedRateFragInsStateTracker.java
index 31b5742a9d0..a1932753d46 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FixedRateFragInsStateTracker.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FixedRateFragInsStateTracker.java
@@ -88,10 +88,7 @@ public class FixedRateFragInsStateTracker extends 
AbstractFragInsStateTracker {
       return res;
     }
     for (FragmentInstanceId fragmentInstanceId : instanceIds) {
-      InstanceStateMetrics stateMetrics = 
instanceStateMap.get(fragmentInstanceId);
-      if (stateMetrics == null
-          || stateMetrics.lastState == null
-          || !stateMetrics.lastState.isDone()) {
+      if (unfinished(fragmentInstanceId)) {
         // FI whose state has not been updated is considered to be 
unfinished.(In Query with limit
         // clause, it's possible that the query is finished before the state 
of FI being recorded.)
         res.add(fragmentInstanceId);
@@ -100,6 +97,15 @@ public class FixedRateFragInsStateTracker extends 
AbstractFragInsStateTracker {
     return res;
   }
 
+  private boolean unfinished(FragmentInstanceId fragmentInstanceId) {
+    InstanceStateMetrics stateMetrics = 
instanceStateMap.get(fragmentInstanceId);
+    // FI whose state has not been updated is considered to be unfinished.(In 
Query with limit
+    // clause, it's possible that the query is finished before the state of FI 
being recorded.)
+    return stateMetrics == null
+        || stateMetrics.lastState == null
+        || !stateMetrics.lastState.isDone();
+  }
+
   @Override
   public synchronized void abort() {
     aborted = true;
@@ -117,32 +123,42 @@ public class FixedRateFragInsStateTracker extends 
AbstractFragInsStateTracker {
 
   private void fetchStateAndUpdate() {
     for (FragmentInstance instance : instances) {
-      try (SetThreadName threadName = new 
SetThreadName(instance.getId().getFullId())) {
-        FragmentInstanceInfo instanceInfo = fetchInstanceInfo(instance);
-        synchronized (this) {
-          InstanceStateMetrics metrics =
-              instanceStateMap.computeIfAbsent(
-                  instance.getId(), k -> new 
InstanceStateMetrics(instance.isRoot()));
-          if (needPrintState(
-              metrics.lastState, instanceInfo.getState(), 
metrics.durationToLastPrintInMS)) {
-            if (logger.isDebugEnabled()) {
-              logger.debug("[PrintFIState] state is {}", 
instanceInfo.getState());
+      if (unfinished(instance.getId())) {
+        try (SetThreadName threadName = new 
SetThreadName(instance.getId().getFullId())) {
+          FragmentInstanceInfo instanceInfo = fetchInstanceInfo(instance);
+          synchronized (this) {
+            InstanceStateMetrics metrics =
+                instanceStateMap.computeIfAbsent(
+                    instance.getId(), k -> new 
InstanceStateMetrics(instance.isRoot()));
+            if (needPrintState(
+                metrics.lastState, instanceInfo.getState(), 
metrics.durationToLastPrintInMS)) {
+              if (logger.isDebugEnabled()) {
+                logger.debug("[PrintFIState] state is {}", 
instanceInfo.getState());
+              }
+              metrics.reset(instanceInfo.getState());
+            } else {
+              metrics.addDuration(STATE_FETCH_INTERVAL_IN_MS);
             }
-            metrics.reset(instanceInfo.getState());
-          } else {
-            metrics.addDuration(STATE_FETCH_INTERVAL_IN_MS);
-          }
 
-          updateQueryState(instance.getId(), instanceInfo);
+            updateQueryState(instance.getId(), instanceInfo);
+          }
+        } catch (ClientManagerException | TException e) {
+          // TODO: do nothing ?
+          logger.warn("error happened while fetching query state", e);
         }
-      } catch (ClientManagerException | TException e) {
-        // TODO: do nothing ?
-        logger.warn("error happened while fetching query state", e);
       }
     }
   }
 
   private void updateQueryState(FragmentInstanceId instanceId, 
FragmentInstanceInfo instanceInfo) {
+    // no such instance may be caused by DN restarting
+    if (instanceInfo.getState() == FragmentInstanceState.NO_SUCH_INSTANCE) {
+      stateMachine.transitionToFailed(
+          new RuntimeException(
+              String.format(
+                  "FragmentInstance[%s] is failed. %s, may be caused by DN 
restarting.",
+                  instanceId, instanceInfo.getMessage())));
+    }
     if (instanceInfo.getState().isFailed()) {
       if (instanceInfo.getFailureInfoList() == null
           || instanceInfo.getFailureInfoList().isEmpty()) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
index 71498bbe220..1fca92394af 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
@@ -62,6 +62,10 @@ public class FileUtils {
   }
 
   public static void deleteFileOrDirectory(File file) {
+    deleteFileOrDirectory(file, false);
+  }
+
+  public static void deleteFileOrDirectory(File file, boolean 
quiteForNoSuchFile) {
     if (file.isDirectory()) {
       for (File subfile : file.listFiles()) {
         deleteFileOrDirectory(subfile);
@@ -69,7 +73,11 @@ public class FileUtils {
     }
     try {
       Files.delete(file.toPath());
-    } catch (NoSuchFileException | DirectoryNotEmptyException e) {
+    } catch (NoSuchFileException e) {
+      if (!quiteForNoSuchFile) {
+        LOGGER.warn("{}: {}", e.getMessage(), Arrays.toString(file.list()), e);
+      }
+    } catch (DirectoryNotEmptyException e) {
       LOGGER.warn("{}: {}", e.getMessage(), Arrays.toString(file.list()), e);
     } catch (Exception e) {
       LOGGER.warn("{}: {}", e.getMessage(), file.getName(), e);

Reply via email to