This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty/ChangeLog
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 5f9d1cc4957a64279ab85ab84edc2975530ceace
Author: JackieTien97 <[email protected]>
AuthorDate: Wed Aug 31 17:30:07 2022 +0800

    Add flushing state timeout detection in FragmentInstanceManager
---
 .../execution/exchange/MPPDataExchangeManager.java |  2 +-
 .../fragment/FragmentInstanceContext.java          |  4 ++++
 .../fragment/FragmentInstanceManager.java          | 25 +++++++++++++++++++++-
 3 files changed, 29 insertions(+), 2 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
index b2b32222d9..2742816e0f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
@@ -155,7 +155,7 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
           // have already been stopped. For example, in the query whit 
LimitOperator, the downstream
           // FragmentInstance may be finished, although the upstream is still 
working.
           logger.warn(
-              "received NewDataBlockEvent but the upstream 
FragmentInstance[{}] is not found",
+              "received NewDataBlockEvent but the downstream 
FragmentInstance[{}] is not found",
               e.getTargetFragmentInstanceId());
           return;
         }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
index 593947d080..62ab803edb 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
@@ -188,6 +188,10 @@ public class FragmentInstanceContext extends QueryContext {
     return executionEndTime.get();
   }
 
+  public long getStartTime() {
+    return executionStartTime.get();
+  }
+
   public FragmentInstanceInfo getInstanceInfo() {
     return new FragmentInstanceInfo(stateMachine.getState(), getEndTime(), 
getFailedCause());
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index f6ebb1b42e..f27aedb217 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.execution.fragment;
 
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.storagegroup.DataRegion;
 import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
@@ -41,6 +42,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static java.util.Objects.requireNonNull;
 import static 
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
@@ -63,6 +65,9 @@ public class FragmentInstanceManager {
   // record failed instances count
   private final CounterStat failedInstances = new CounterStat();
 
+  private static final long QUERY_TIMEOUT_MS =
+      IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold();
+
   public static FragmentInstanceManager getInstance() {
     return FragmentInstanceManager.InstanceHolder.INSTANCE;
   }
@@ -79,6 +84,12 @@ public class FragmentInstanceManager {
 
     ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
         instanceManagementExecutor, this::removeOldInstances, 200, 200, 
TimeUnit.MILLISECONDS);
+    ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+        instanceManagementExecutor,
+        this::cancelTimeoutFlushingInstances,
+        200,
+        200,
+        TimeUnit.MILLISECONDS);
   }
 
   public FragmentInstanceInfo execDataQueryFragmentInstance(
@@ -175,7 +186,7 @@ public class FragmentInstanceManager {
 
   /** Cancels a FragmentInstance. */
   public FragmentInstanceInfo cancelTask(FragmentInstanceId instanceId) {
-    logger.error("cancelTask");
+    logger.debug("cancelTask");
     requireNonNull(instanceId, "taskId is null");
 
     FragmentInstanceContext context = instanceContext.remove(instanceId);
@@ -223,6 +234,18 @@ public class FragmentInstanceManager {
             });
   }
 
+  private void cancelTimeoutFlushingInstances() {
+    long now = System.currentTimeMillis();
+    instanceContext.entrySet().stream()
+        .filter(
+            entry -> {
+              FragmentInstanceContext context = entry.getValue();
+              return context.getStateMachine().getState() == 
FragmentInstanceState.FLUSHING
+                  && (now - context.getStartTime()) > QUERY_TIMEOUT_MS;
+            })
+        .forEach(entry -> entry.getValue().failed(new TimeoutException()));
+  }
+
   private static class InstanceHolder {
 
     private InstanceHolder() {}

Reply via email to