This is an automated email from the ASF dual-hosted git repository. lancelly pushed a commit to branch timeout in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 210a8865997166fafc7244423567a679ae9d9c37 Author: lancelly <[email protected]> AuthorDate: Fri Oct 20 16:11:39 2023 +0800 fix --- .../db/queryengine/execution/schedule/DriverScheduler.java | 7 ++++++- .../execution/schedule/DriverTaskTimeoutSentinelThread.java | 11 +++++++++++ .../db/queryengine/execution/schedule/ITaskScheduler.java | 8 ++++++++ 3 files changed, 25 insertions(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java index 9228adbd77f..1ad010eead7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java @@ -263,7 +263,7 @@ public class DriverScheduler implements IDriverScheduler, IService { for (DriverTask task : submittedTasks) { registerTaskToQueryMap(queryId, task); } - timeoutQueue.push(submittedTasks.get(submittedTasks.size() - 1)); + scheduler.enforceTimeLimit(submittedTasks.get(submittedTasks.size() - 1)); for (DriverTask task : submittedTasks) { submitTaskToReadyQueue(task); } @@ -553,6 +553,11 @@ public class DriverScheduler implements IDriverScheduler, IService { clearDriverTask(task); } + @Override + public void enforceTimeLimit(DriverTask task) { + timeoutQueue.push(task); + } + @Override public void toAborted(DriverTask task) { try (SetThreadName driverTaskName = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverTaskTimeoutSentinelThread.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverTaskTimeoutSentinelThread.java index 58315930f17..424cd77f65f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverTaskTimeoutSentinelThread.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverTaskTimeoutSentinelThread.java @@ -31,6 +31,8 @@ public class DriverTaskTimeoutSentinelThread extends AbstractDriverThread { private static final Logger LOGGER = LoggerFactory.getLogger(DriverTaskTimeoutSentinelThread.class); + private final long SLEEP_BOUND = 5 * 1000L; + public DriverTaskTimeoutSentinelThread( String workerId, ThreadGroup tg, @@ -49,9 +51,18 @@ public class DriverTaskTimeoutSentinelThread extends AbstractDriverThread { if (task.isEndState()) { return; } + long waitTime = task.getDDL() - System.currentTimeMillis(); + // if the waitTime is more than SLEEP_BOUND, re-push the task in the TimeoutQueue. SlEEP_BOUND + // ensures that DriverTaskTimeoutSentinelThread won't sleep for too long when the waitTime + // of the current task is long. + if (waitTime > SLEEP_BOUND) { + scheduler.enforceTimeLimit(task); + return; + } } finally { task.unlock(); } + // If this task is not timeout, we can wait it to timeout. long waitTime = task.getDDL() - System.currentTimeMillis(); while (waitTime > 0L) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/ITaskScheduler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/ITaskScheduler.java index 4214b1455ea..0240f646a54 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/ITaskScheduler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/ITaskScheduler.java @@ -70,4 +70,12 @@ public interface ITaskScheduler { * @param task the task to be switched. */ void toAborted(DriverTask task); + + /** + * Add a time watch for the task. The task will be aborted if it can not be finished in the given + * amount of time. + * + * @param task the target task. + */ + void enforceTimeLimit(DriverTask task); }
