This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch CorrectTimeout in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c2a9b93f555bd5fa0d7b13d0242736bd86cb6f96 Author: JackieTien97 <[email protected]> AuthorDate: Tue May 30 16:31:22 2023 +0800 [IOTDB-5939] Correct Flusing Task Timeout Detect Thread's timeout --- .../mpp/execution/fragment/FragmentInstanceExecution.java | 14 +++++++++++--- .../db/mpp/execution/fragment/FragmentInstanceManager.java | 11 ++++++----- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java index b68ec33f096..4cb85b0ae93 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java @@ -53,6 +53,8 @@ public class FragmentInstanceExecution { private final FragmentInstanceStateMachine stateMachine; + private final long timeoutInMs; + private long lastHeartbeat; public static FragmentInstanceExecution createFragmentInstanceExecution( @@ -66,9 +68,9 @@ public class FragmentInstanceExecution { long timeOut) throws CpuNotEnoughException, MemoryNotEnoughException { FragmentInstanceExecution execution = - new FragmentInstanceExecution(instanceId, context, drivers, sinkHandle, stateMachine); + new FragmentInstanceExecution( + instanceId, context, drivers, sinkHandle, stateMachine, timeOut); execution.initialize(failedInstances, scheduler); - LOGGER.debug("timeout is {}ms.", timeOut); scheduler.submitDrivers(instanceId.getQueryId(), drivers, timeOut, context.getSessionInfo()); return execution; } @@ -78,12 +80,14 @@ public class FragmentInstanceExecution { FragmentInstanceContext context, List<IDriver> drivers, ISink sink, - FragmentInstanceStateMachine stateMachine) { + FragmentInstanceStateMachine stateMachine, + long timeoutInMs) { this.instanceId = instanceId; this.context = context; this.drivers = drivers; this.sink = sink; this.stateMachine = stateMachine; + this.timeoutInMs = timeoutInMs; } public void recordHeartbeat() { @@ -110,6 +114,10 @@ public class FragmentInstanceExecution { return context.getStartTime(); } + public long getTimeoutInMs() { + return timeoutInMs; + } + public FragmentInstanceStateMachine getStateMachine() { return stateMachine; } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java index 1cf2718e9fe..b0f816f21e0 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java @@ -70,9 +70,6 @@ public class FragmentInstanceManager { // record failed instances count private final CounterStat failedInstances = new CounterStat(); - private static final long QUERY_TIMEOUT_MS = - IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(); - private final ExecutorService intoOperationExecutor; private static final QueryExecutionMetricSet QUERY_EXECUTION_METRIC_SET = @@ -314,8 +311,12 @@ public class FragmentInstanceManager { instanceExecution.forEach( (key, execution) -> { if (execution.getStateMachine().getState() == FragmentInstanceState.FLUSHING - && (now - execution.getStartTime()) > QUERY_TIMEOUT_MS) { - execution.getStateMachine().failed(new TimeoutException()); + && (now - execution.getStartTime()) > execution.getTimeoutInMs()) { + execution + .getStateMachine() + .failed( + new TimeoutException( + "Query has executed more than " + execution.getTimeoutInMs() + "ms")); } }); }
