This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty/FixBug
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/ty/FixBug by this push:
     new e935217765a more quite
e935217765a is described below

commit e935217765a455771942897ee63907e85b9baf0b
Author: JackieTien97 <[email protected]>
AuthorDate: Thu Aug 29 12:42:54 2024 +0800

    more quite
---
 .../scheduler/FixedRateFragInsStateTracker.java    | 52 +++++++++++++---------
 1 file changed, 30 insertions(+), 22 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FixedRateFragInsStateTracker.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FixedRateFragInsStateTracker.java
index 608abea2ba7..a1932753d46 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FixedRateFragInsStateTracker.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FixedRateFragInsStateTracker.java
@@ -88,10 +88,7 @@ public class FixedRateFragInsStateTracker extends 
AbstractFragInsStateTracker {
       return res;
     }
     for (FragmentInstanceId fragmentInstanceId : instanceIds) {
-      InstanceStateMetrics stateMetrics = 
instanceStateMap.get(fragmentInstanceId);
-      if (stateMetrics == null
-          || stateMetrics.lastState == null
-          || !stateMetrics.lastState.isDone()) {
+      if (unfinished(fragmentInstanceId)) {
         // FI whose state has not been updated is considered to be 
unfinished.(In Query with limit
         // clause, it's possible that the query is finished before the state 
of FI being recorded.)
         res.add(fragmentInstanceId);
@@ -100,6 +97,15 @@ public class FixedRateFragInsStateTracker extends 
AbstractFragInsStateTracker {
     return res;
   }
 
+  private boolean unfinished(FragmentInstanceId fragmentInstanceId) {
+    InstanceStateMetrics stateMetrics = 
instanceStateMap.get(fragmentInstanceId);
+    // FI whose state has not been updated is considered to be unfinished.(In 
Query with limit
+    // clause, it's possible that the query is finished before the state of FI 
being recorded.)
+    return stateMetrics == null
+        || stateMetrics.lastState == null
+        || !stateMetrics.lastState.isDone();
+  }
+
   @Override
   public synchronized void abort() {
     aborted = true;
@@ -117,27 +123,29 @@ public class FixedRateFragInsStateTracker extends 
AbstractFragInsStateTracker {
 
   private void fetchStateAndUpdate() {
     for (FragmentInstance instance : instances) {
-      try (SetThreadName threadName = new 
SetThreadName(instance.getId().getFullId())) {
-        FragmentInstanceInfo instanceInfo = fetchInstanceInfo(instance);
-        synchronized (this) {
-          InstanceStateMetrics metrics =
-              instanceStateMap.computeIfAbsent(
-                  instance.getId(), k -> new 
InstanceStateMetrics(instance.isRoot()));
-          if (needPrintState(
-              metrics.lastState, instanceInfo.getState(), 
metrics.durationToLastPrintInMS)) {
-            if (logger.isDebugEnabled()) {
-              logger.debug("[PrintFIState] state is {}", 
instanceInfo.getState());
+      if (unfinished(instance.getId())) {
+        try (SetThreadName threadName = new 
SetThreadName(instance.getId().getFullId())) {
+          FragmentInstanceInfo instanceInfo = fetchInstanceInfo(instance);
+          synchronized (this) {
+            InstanceStateMetrics metrics =
+                instanceStateMap.computeIfAbsent(
+                    instance.getId(), k -> new 
InstanceStateMetrics(instance.isRoot()));
+            if (needPrintState(
+                metrics.lastState, instanceInfo.getState(), 
metrics.durationToLastPrintInMS)) {
+              if (logger.isDebugEnabled()) {
+                logger.debug("[PrintFIState] state is {}", 
instanceInfo.getState());
+              }
+              metrics.reset(instanceInfo.getState());
+            } else {
+              metrics.addDuration(STATE_FETCH_INTERVAL_IN_MS);
             }
-            metrics.reset(instanceInfo.getState());
-          } else {
-            metrics.addDuration(STATE_FETCH_INTERVAL_IN_MS);
-          }
 
-          updateQueryState(instance.getId(), instanceInfo);
+            updateQueryState(instance.getId(), instanceInfo);
+          }
+        } catch (ClientManagerException | TException e) {
+          // TODO: do nothing ?
+          logger.warn("error happened while fetching query state", e);
         }
-      } catch (ClientManagerException | TException e) {
-        // TODO: do nothing ?
-        logger.warn("error happened while fetching query state", e);
       }
     }
   }

Reply via email to