This is an automated email from the ASF dual-hosted git repository. JackieTien97 pushed a commit to branch rc/2.0.10 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8c0566ad3bd89e3075299e5df94011888c307a61 Author: Jackie Tien <[email protected]> AuthorDate: Thu Jun 11 19:28:45 2026 +0800 Fix driver scheduler ready queue reservation leak (#17919) --- .../queue/IndexedBlockingReserveQueue.java | 36 +++++++++++++++++-- .../execution/schedule/DriverScheduler.java | 15 +++++--- .../multilevelqueue/MultilevelPriorityQueue.java | 10 ++++++ .../execution/schedule/task/DriverTask.java | 13 +++++++ .../metric/DriverSchedulerMetricSet.java | 13 +++++++ .../schedule/DefaultDriverSchedulerTest.java | 41 ++++++++++++++++++++++ 6 files changed, 121 insertions(+), 7 deletions(-) diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/schedule/queue/IndexedBlockingReserveQueue.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/schedule/queue/IndexedBlockingReserveQueue.java index ce18e18b28a..b1040e7c64a 100644 --- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/schedule/queue/IndexedBlockingReserveQueue.java +++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/schedule/queue/IndexedBlockingReserveQueue.java @@ -49,6 +49,7 @@ public abstract class IndexedBlockingReserveQueue<E extends IDIndexedAccessible> E output = pollFirst(); size--; reservedSize++; + markReserved(output); return output; } @@ -68,7 +69,7 @@ public abstract class IndexedBlockingReserveQueue<E extends IDIndexedAccessible> throw new NullPointerException(CalcMessages.PUSHED_ELEMENT_IS_NULL); } pushToQueue(element); - reservedSize--; + decreaseReservedSizeIfNecessary(element); size++; this.notifyAll(); } @@ -76,7 +77,36 @@ public abstract class IndexedBlockingReserveQueue<E extends IDIndexedAccessible> /** * For task that is not in readyQueue when it's cleared, it won't be added into the queue again. */ - public synchronized void decreaseReservedSize() { - this.reservedSize--; + public synchronized boolean decreaseReservedSize(E element) { + if (element == null) { + throw new NullPointerException(CalcMessages.PUSHED_ELEMENT_IS_NULL); + } + return decreaseReservedSizeIfNecessary(element); + } + + public final synchronized int getReservedSize() { + return reservedSize; + } + + @Override + public synchronized void clear() { + super.clear(); + this.reservedSize = 0; + } + + protected void markReserved(E element) { + // Do nothing by default. + } + + protected boolean releaseReserved(E element) { + return true; + } + + private boolean decreaseReservedSizeIfNecessary(E element) { + if (!releaseReserved(element) || reservedSize <= 0) { + return false; + } + reservedSize--; + return true; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java index e6c8d248e2e..867c890a728 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java @@ -347,16 +347,18 @@ public class DriverScheduler implements IDriverScheduler, IService { return; case READY: task.setStatus(DriverTaskStatus.ABORTED); - readyQueue.remove(task.getDriverTaskId()); + if (readyQueue.remove(task.getDriverTaskId()) == null) { + readyQueue.decreaseReservedSize(task); + } break; case BLOCKED: task.setStatus(DriverTaskStatus.ABORTED); blockedTasks.remove(task); - readyQueue.decreaseReservedSize(); + readyQueue.decreaseReservedSize(task); break; case RUNNING: task.setStatus(DriverTaskStatus.ABORTED); - readyQueue.decreaseReservedSize(); + readyQueue.decreaseReservedSize(task); break; case FINISHED: break; @@ -422,6 +424,10 @@ public class DriverScheduler implements IDriverScheduler, IService { return readyQueue.size(); } + public long getReadyQueueReservedTaskCount() { + return readyQueue.getReservedSize(); + } + public long getBlockQueueTaskCount() { return blockedTasks.size(); } @@ -497,6 +503,7 @@ public class DriverScheduler implements IDriverScheduler, IService { task.lock(); try { if (task.getStatus() != DriverTaskStatus.READY) { + readyQueue.decreaseReservedSize(task); return false; } @@ -553,7 +560,7 @@ public class DriverScheduler implements IDriverScheduler, IService { } task.updateSchedulePriority(context); task.setStatus(DriverTaskStatus.FINISHED); - readyQueue.decreaseReservedSize(); + readyQueue.decreaseReservedSize(task); } finally { task.unlock(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/multilevelqueue/MultilevelPriorityQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/multilevelqueue/MultilevelPriorityQueue.java index ab32a65959b..797f99760e3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/multilevelqueue/MultilevelPriorityQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/multilevelqueue/MultilevelPriorityQueue.java @@ -191,6 +191,16 @@ public class MultilevelPriorityQueue extends IndexedBlockingReserveQueue<DriverT "MultilevelPriorityQueue does not support access element by get."); } + @Override + protected void markReserved(DriverTask task) { + task.markReservedInReadyQueue(); + } + + @Override + protected boolean releaseReserved(DriverTask task) { + return task.releaseReservedInReadyQueue(); + } + @Override protected void clearAllElements() { highestPriorityLevelQueue.clear(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/task/DriverTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/task/DriverTask.java index ad1e1a687ae..c85d00e17e1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/task/DriverTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/task/DriverTask.java @@ -60,6 +60,7 @@ public class DriverTask implements IDIndexedAccessible { private final DriverTaskHandle driverTaskHandle; private long lastEnterReadyQueueTime; private long lastEnterBlockQueueTime; + private boolean reservedInReadyQueue; private long estimatedMemorySize; @@ -213,6 +214,18 @@ public class DriverTask implements IDIndexedAccessible { this.lastEnterBlockQueueTime = lastEnterBlockQueueTime; } + public void markReservedInReadyQueue() { + reservedInReadyQueue = true; + } + + public boolean releaseReservedInReadyQueue() { + if (!reservedInReadyQueue) { + return false; + } + reservedInReadyQueue = false; + return true; + } + /** a comparator of ddl, the less the ddl is, the low order it has. */ public static class TimeoutComparator implements Comparator<DriverTask> { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/DriverSchedulerMetricSet.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/DriverSchedulerMetricSet.java index 0b9e0c04f05..94aaccfd9b7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/DriverSchedulerMetricSet.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/DriverSchedulerMetricSet.java @@ -39,6 +39,7 @@ public class DriverSchedulerMetricSet implements IMetricSet { public static final String READY_QUEUED_TIME = "ready_queued_time"; public static final String BLOCK_QUEUED_TIME = "block_queued_time"; public static final String READY_QUEUE_TASK_COUNT = "ready_queue_task_count"; + public static final String READY_QUEUE_RESERVED_TASK_COUNT = "ready_queue_reserved_task_count"; public static final String BLOCK_QUEUE_TASK_COUNT = "block_queue_task_count"; private static final String TIMEOUT_QUEUE_SIZE = "timeout_queue_task_count"; private static final String QUERY_MAP_SIZE = "query_map_size"; @@ -67,6 +68,13 @@ public class DriverSchedulerMetricSet implements IMetricSet { DriverScheduler::getReadyQueueTaskCount, Tag.NAME.toString(), READY_QUEUE_TASK_COUNT); + metricService.createAutoGauge( + Metric.DRIVER_SCHEDULER.toString(), + MetricLevel.IMPORTANT, + DriverScheduler.getInstance(), + DriverScheduler::getReadyQueueReservedTaskCount, + Tag.NAME.toString(), + READY_QUEUE_RESERVED_TASK_COUNT); metricService.createAutoGauge( Metric.DRIVER_SCHEDULER.toString(), MetricLevel.IMPORTANT, @@ -109,6 +117,11 @@ public class DriverSchedulerMetricSet implements IMetricSet { Metric.DRIVER_SCHEDULER.toString(), Tag.NAME.toString(), READY_QUEUE_TASK_COUNT); + metricService.remove( + MetricType.AUTO_GAUGE, + Metric.DRIVER_SCHEDULER.toString(), + Tag.NAME.toString(), + READY_QUEUE_RESERVED_TASK_COUNT); metricService.remove( MetricType.AUTO_GAUGE, Metric.DRIVER_SCHEDULER.toString(), diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/DefaultDriverSchedulerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/DefaultDriverSchedulerTest.java index 0cc0e5edd71..9b3cb38cbc0 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/DefaultDriverSchedulerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/DefaultDriverSchedulerTest.java @@ -197,6 +197,47 @@ public class DefaultDriverSchedulerTest { clear(); } + @Test + public void testAbortReadyTaskAfterPollReleasesReadyQueueReservation() throws Exception { + IMPPDataExchangeManager mockMPPDataExchangeManager = + Mockito.mock(IMPPDataExchangeManager.class); + manager.setBlockManager(mockMPPDataExchangeManager); + ITaskScheduler defaultScheduler = manager.getScheduler(); + IDriver mockDriver = Mockito.mock(IDriver.class); + DriverTaskHandle driverTaskHandle = + new DriverTaskHandle( + 1, + (MultilevelPriorityQueue) manager.getReadyQueue(), + OptionalInt.of(Integer.MAX_VALUE)); + + QueryId queryId = new QueryId("test"); + FragmentInstanceId instanceId = + new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0"); + DriverTaskId driverTaskID = new DriverTaskId(instanceId, 0); + Mockito.when(mockDriver.getDriverTaskId()).thenReturn(driverTaskID); + DriverTask testTask = + new DriverTask(mockDriver, 100L, DriverTaskStatus.READY, driverTaskHandle, 0, false); + + try { + manager.registerTaskToQueryMap(queryId, testTask); + manager.submitTaskToReadyQueue(testTask); + + DriverTask polledTask = manager.getReadyQueue().poll(); + Assert.assertSame(testTask, polledTask); + Assert.assertEquals(1, manager.getReadyQueueReservedTaskCount()); + + manager.abortFragmentInstance(instanceId, new RuntimeException("test abort")); + + Assert.assertEquals(DriverTaskStatus.ABORTED, testTask.getStatus()); + Assert.assertEquals(0, manager.getReadyQueue().size()); + Assert.assertEquals(0, manager.getReadyQueueReservedTaskCount()); + Assert.assertFalse(defaultScheduler.readyToRunning(polledTask)); + Assert.assertEquals(0, manager.getReadyQueueReservedTaskCount()); + } finally { + clear(); + } + } + @Test public void testRunningToReady() { IMPPDataExchangeManager mockMPPDataExchangeManager =
