This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.1 by this push:
     new 43a50b1fd17 [To rel/1.1] [IOTDB-5939] Correct Flusing Task Timeout 
Detect Thread's timeout
43a50b1fd17 is described below

commit 43a50b1fd177c2a270729af7127bbe665f505a91
Author: JackieTien97 <[email protected]>
AuthorDate: Tue May 30 16:44:14 2023 +0800

    [To rel/1.1] [IOTDB-5939] Correct Flusing Task Timeout Detect Thread's 
timeout
---
 .../mpp/execution/fragment/FragmentInstanceExecution.java  | 14 +++++++++++---
 .../db/mpp/execution/fragment/FragmentInstanceManager.java | 11 ++++++-----
 2 files changed, 17 insertions(+), 8 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
index 765837b33e3..35a20199a27 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
@@ -47,6 +47,8 @@ public class FragmentInstanceExecution {
 
   private final FragmentInstanceStateMachine stateMachine;
 
+  private final long timeoutInMs;
+
   private long lastHeartbeat;
 
   public static FragmentInstanceExecution createFragmentInstanceExecution(
@@ -59,9 +61,9 @@ public class FragmentInstanceExecution {
       CounterStat failedInstances,
       long timeOut) {
     FragmentInstanceExecution execution =
-        new FragmentInstanceExecution(instanceId, context, drivers, 
sinkHandle, stateMachine);
+        new FragmentInstanceExecution(
+            instanceId, context, drivers, sinkHandle, stateMachine, timeOut);
     execution.initialize(failedInstances, scheduler);
-    LOGGER.debug("timeout is {}ms.", timeOut);
     scheduler.submitDrivers(instanceId.getQueryId(), drivers, timeOut);
     return execution;
   }
@@ -71,12 +73,14 @@ public class FragmentInstanceExecution {
       FragmentInstanceContext context,
       List<IDriver> drivers,
       ISink sink,
-      FragmentInstanceStateMachine stateMachine) {
+      FragmentInstanceStateMachine stateMachine,
+      long timeoutInMs) {
     this.instanceId = instanceId;
     this.context = context;
     this.drivers = drivers;
     this.sink = sink;
     this.stateMachine = stateMachine;
+    this.timeoutInMs = timeoutInMs;
   }
 
   public void recordHeartbeat() {
@@ -103,6 +107,10 @@ public class FragmentInstanceExecution {
     return context.getStartTime();
   }
 
+  public long getTimeoutInMs() {
+    return timeoutInMs;
+  }
+
   public FragmentInstanceStateMachine getStateMachine() {
     return stateMachine;
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index d563f9cf24d..0b092ca0b8c 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -70,9 +70,6 @@ public class FragmentInstanceManager {
   // record failed instances count
   private final CounterStat failedInstances = new CounterStat();
 
-  private static final long QUERY_TIMEOUT_MS =
-      IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold();
-
   private final ExecutorService intoOperationExecutor;
 
   private static final QueryMetricsManager QUERY_METRICS = 
QueryMetricsManager.getInstance();
@@ -304,8 +301,12 @@ public class FragmentInstanceManager {
     instanceExecution.forEach(
         (key, execution) -> {
           if (execution.getStateMachine().getState() == 
FragmentInstanceState.FLUSHING
-              && (now - execution.getStartTime()) > QUERY_TIMEOUT_MS) {
-            execution.getStateMachine().failed(new TimeoutException());
+              && (now - execution.getStartTime()) > 
execution.getTimeoutInMs()) {
+            execution
+                .getStateMachine()
+                .failed(
+                    new TimeoutException(
+                        "Query has executed more than " + 
execution.getTimeoutInMs() + "ms"));
           }
         });
   }

Reply via email to