This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d0f517de5 Add more log to the cancellation operation of 
FragmentInstanceStateTracker (#6647)
7d0f517de5 is described below

commit 7d0f517de5616ab33d5021dac083ad296bcf6bd8
Author: Zhang.Jinrui <[email protected]>
AuthorDate: Wed Jul 13 09:07:36 2022 +0800

    Add more log to the cancellation operation of FragmentInstanceStateTracker 
(#6647)
---
 .../scheduler/FixedRateFragInsStateTracker.java    | 29 ++++++++++++++++++++--
 1 file changed, 27 insertions(+), 2 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
index 6388e35e18..55e93974d9 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
@@ -43,9 +43,13 @@ public class FixedRateFragInsStateTracker extends 
AbstractFragInsStateTracker {
 
   private static final Logger logger = 
LoggerFactory.getLogger(FixedRateFragInsStateTracker.class);
 
+  private static final long SAME_STATE_PRINT_RATE_IN_MS = 10 * 60 * 1000;
+
   // TODO: (xingtanzjr) consider how much Interval is OK for state tracker
   private static final long STATE_FETCH_INTERVAL_IN_MS = 500;
   private ScheduledFuture<?> trackTask;
+  private volatile FragmentInstanceState lastState;
+  private volatile long durationToLastPrintInMS;
 
   public FixedRateFragInsStateTracker(
       QueryStateMachine stateMachine,
@@ -69,8 +73,15 @@ public class FixedRateFragInsStateTracker extends 
AbstractFragInsStateTracker {
 
   @Override
   public void abort() {
+    logger.info("start to abort state tracker");
     if (trackTask != null) {
-      trackTask.cancel(true);
+      logger.info("start to cancel fixed rate tracking task");
+      boolean cancelResult = trackTask.cancel(true);
+      if (!cancelResult) {
+        logger.error("cancel state tracking task failed. {}", 
trackTask.isCancelled());
+      } else {
+        logger.info("cancellation succeeds");
+      }
     }
   }
 
@@ -78,7 +89,13 @@ public class FixedRateFragInsStateTracker extends 
AbstractFragInsStateTracker {
     for (FragmentInstance instance : instances) {
       try (SetThreadName threadName = new 
SetThreadName(instance.getId().getFullId())) {
         FragmentInstanceState state = fetchState(instance);
-        logger.info("State is {}", state);
+        if (needPrintState(lastState, state, durationToLastPrintInMS)) {
+          logger.info("State is {}", state);
+          lastState = state;
+          durationToLastPrintInMS = 0;
+        } else {
+          durationToLastPrintInMS += STATE_FETCH_INTERVAL_IN_MS;
+        }
 
         if (state != null) {
           stateMachine.updateFragInstanceState(instance.getId(), state);
@@ -89,4 +106,12 @@ public class FixedRateFragInsStateTracker extends 
AbstractFragInsStateTracker {
       }
     }
   }
+
+  private boolean needPrintState(
+      FragmentInstanceState previous, FragmentInstanceState current, long 
durationToLastPrintInMS) {
+    if (current != previous) {
+      return true;
+    }
+    return durationToLastPrintInMS >= SAME_STATE_PRINT_RATE_IN_MS;
+  }
 }

Reply via email to