This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty/ChangeLog in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 5f9d1cc4957a64279ab85ab84edc2975530ceace Author: JackieTien97 <[email protected]> AuthorDate: Wed Aug 31 17:30:07 2022 +0800 Add flushing state timeout detection in FragmentInstanceManager --- .../execution/exchange/MPPDataExchangeManager.java | 2 +- .../fragment/FragmentInstanceContext.java | 4 ++++ .../fragment/FragmentInstanceManager.java | 25 +++++++++++++++++++++- 3 files changed, 29 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java index b2b32222d9..2742816e0f 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java @@ -155,7 +155,7 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager { // have already been stopped. For example, in the query whit LimitOperator, the downstream // FragmentInstance may be finished, although the upstream is still working. logger.warn( - "received NewDataBlockEvent but the upstream FragmentInstance[{}] is not found", + "received NewDataBlockEvent but the downstream FragmentInstance[{}] is not found", e.getTargetFragmentInstanceId()); return; } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java index 593947d080..62ab803edb 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java @@ -188,6 +188,10 @@ public class FragmentInstanceContext extends QueryContext { return executionEndTime.get(); } + public long getStartTime() { + return executionStartTime.get(); + } + public FragmentInstanceInfo getInstanceInfo() { return new FragmentInstanceInfo(stateMachine.getState(), getEndTime(), getFailedCause()); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java index f6ebb1b42e..f27aedb217 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.execution.fragment; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.storagegroup.DataRegion; import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion; import org.apache.iotdb.db.mpp.common.FragmentInstanceId; @@ -41,6 +42,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static java.util.Objects.requireNonNull; import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; @@ -63,6 +65,9 @@ public class FragmentInstanceManager { // record failed instances count private final CounterStat failedInstances = new CounterStat(); + private static final long QUERY_TIMEOUT_MS = + IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(); + public static FragmentInstanceManager getInstance() { return FragmentInstanceManager.InstanceHolder.INSTANCE; } @@ -79,6 +84,12 @@ public class FragmentInstanceManager { ScheduledExecutorUtil.safelyScheduleWithFixedDelay( instanceManagementExecutor, this::removeOldInstances, 200, 200, TimeUnit.MILLISECONDS); + ScheduledExecutorUtil.safelyScheduleWithFixedDelay( + instanceManagementExecutor, + this::cancelTimeoutFlushingInstances, + 200, + 200, + TimeUnit.MILLISECONDS); } public FragmentInstanceInfo execDataQueryFragmentInstance( @@ -175,7 +186,7 @@ public class FragmentInstanceManager { /** Cancels a FragmentInstance. */ public FragmentInstanceInfo cancelTask(FragmentInstanceId instanceId) { - logger.error("cancelTask"); + logger.debug("cancelTask"); requireNonNull(instanceId, "taskId is null"); FragmentInstanceContext context = instanceContext.remove(instanceId); @@ -223,6 +234,18 @@ public class FragmentInstanceManager { }); } + private void cancelTimeoutFlushingInstances() { + long now = System.currentTimeMillis(); + instanceContext.entrySet().stream() + .filter( + entry -> { + FragmentInstanceContext context = entry.getValue(); + return context.getStateMachine().getState() == FragmentInstanceState.FLUSHING + && (now - context.getStartTime()) > QUERY_TIMEOUT_MS; + }) + .forEach(entry -> entry.getValue().failed(new TimeoutException())); + } + private static class InstanceHolder { private InstanceHolder() {}
