This is an automated email from the ASF dual-hosted git repository.
JackieTien97 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 290842e77df [To dev/1.3] Fix driver scheduler ready queue reservation
leak (#17920)
290842e77df is described below
commit 290842e77df127b0b02130bdde5fa180c562be62
Author: Jackie Tien <[email protected]>
AuthorDate: Thu Jun 11 19:38:27 2026 +0800
[To dev/1.3] Fix driver scheduler ready queue reservation leak (#17920)
---
.../execution/schedule/DriverScheduler.java | 15 +++++---
.../queue/IndexedBlockingReserveQueue.java | 36 +++++++++++++++++--
.../multilevelqueue/MultilevelPriorityQueue.java | 10 ++++++
.../execution/schedule/task/DriverTask.java | 13 +++++++
.../metric/DriverSchedulerMetricSet.java | 13 +++++++
.../schedule/DefaultDriverSchedulerTest.java | 41 ++++++++++++++++++++++
6 files changed, 121 insertions(+), 7 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java
index 183e51b7e1e..239ebb11329 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java
@@ -336,16 +336,18 @@ public class DriverScheduler implements IDriverScheduler,
IService {
return;
case READY:
task.setStatus(DriverTaskStatus.ABORTED);
- readyQueue.remove(task.getDriverTaskId());
+ if (readyQueue.remove(task.getDriverTaskId()) == null) {
+ readyQueue.decreaseReservedSize(task);
+ }
break;
case BLOCKED:
task.setStatus(DriverTaskStatus.ABORTED);
blockedTasks.remove(task);
- readyQueue.decreaseReservedSize();
+ readyQueue.decreaseReservedSize(task);
break;
case RUNNING:
task.setStatus(DriverTaskStatus.ABORTED);
- readyQueue.decreaseReservedSize();
+ readyQueue.decreaseReservedSize(task);
break;
case FINISHED:
break;
@@ -414,6 +416,10 @@ public class DriverScheduler implements IDriverScheduler,
IService {
return readyQueue.size();
}
+ public long getReadyQueueReservedTaskCount() {
+ return readyQueue.getReservedSize();
+ }
+
public long getBlockQueueTaskCount() {
return blockedTasks.size();
}
@@ -489,6 +495,7 @@ public class DriverScheduler implements IDriverScheduler,
IService {
task.lock();
try {
if (task.getStatus() != DriverTaskStatus.READY) {
+ readyQueue.decreaseReservedSize(task);
return false;
}
@@ -545,7 +552,7 @@ public class DriverScheduler implements IDriverScheduler,
IService {
}
task.updateSchedulePriority(context);
task.setStatus(DriverTaskStatus.FINISHED);
- readyQueue.decreaseReservedSize();
+ readyQueue.decreaseReservedSize(task);
} finally {
task.unlock();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/IndexedBlockingReserveQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/IndexedBlockingReserveQueue.java
index 1891fb63556..b0f0edbd0e1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/IndexedBlockingReserveQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/IndexedBlockingReserveQueue.java
@@ -47,6 +47,7 @@ public abstract class IndexedBlockingReserveQueue<E extends
IDIndexedAccessible>
E output = pollFirst();
size--;
reservedSize++;
+ markReserved(output);
return output;
}
@@ -66,7 +67,7 @@ public abstract class IndexedBlockingReserveQueue<E extends
IDIndexedAccessible>
throw new NullPointerException("pushed element is null");
}
pushToQueue(element);
- reservedSize--;
+ decreaseReservedSizeIfNecessary(element);
size++;
this.notifyAll();
}
@@ -74,7 +75,36 @@ public abstract class IndexedBlockingReserveQueue<E extends
IDIndexedAccessible>
/**
* For task that is not in readyQueue when it's cleared, it won't be added
into the queue again.
*/
- public synchronized void decreaseReservedSize() {
- this.reservedSize--;
+ public synchronized boolean decreaseReservedSize(E element) {
+ if (element == null) {
+ throw new NullPointerException("pushed element is null");
+ }
+ return decreaseReservedSizeIfNecessary(element);
+ }
+
+ public final synchronized int getReservedSize() {
+ return reservedSize;
+ }
+
+ @Override
+ public synchronized void clear() {
+ super.clear();
+ this.reservedSize = 0;
+ }
+
+ protected void markReserved(E element) {
+ // Do nothing by default.
+ }
+
+ protected boolean releaseReserved(E element) {
+ return true;
+ }
+
+ private boolean decreaseReservedSizeIfNecessary(E element) {
+ if (!releaseReserved(element) || reservedSize <= 0) {
+ return false;
+ }
+ reservedSize--;
+ return true;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/multilevelqueue/MultilevelPriorityQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/multilevelqueue/MultilevelPriorityQueue.java
index c5f3cb3e6df..06c7275473b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/multilevelqueue/MultilevelPriorityQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/multilevelqueue/MultilevelPriorityQueue.java
@@ -191,6 +191,16 @@ public class MultilevelPriorityQueue extends
IndexedBlockingReserveQueue<DriverT
"MultilevelPriorityQueue does not support access element by get.");
}
+ @Override
+ protected void markReserved(DriverTask task) {
+ task.markReservedInReadyQueue();
+ }
+
+ @Override
+ protected boolean releaseReserved(DriverTask task) {
+ return task.releaseReservedInReadyQueue();
+ }
+
@Override
protected void clearAllElements() {
highestPriorityLevelQueue.clear();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/task/DriverTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/task/DriverTask.java
index f320c6cad6c..85bada856ca 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/task/DriverTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/task/DriverTask.java
@@ -59,6 +59,7 @@ public class DriverTask implements IDIndexedAccessible {
private final DriverTaskHandle driverTaskHandle;
private long lastEnterReadyQueueTime;
private long lastEnterBlockQueueTime;
+ private boolean reservedInReadyQueue;
private long estimatedMemorySize;
@@ -210,6 +211,18 @@ public class DriverTask implements IDIndexedAccessible {
this.lastEnterBlockQueueTime = lastEnterBlockQueueTime;
}
+ public void markReservedInReadyQueue() {
+ reservedInReadyQueue = true;
+ }
+
+ public boolean releaseReservedInReadyQueue() {
+ if (!reservedInReadyQueue) {
+ return false;
+ }
+ reservedInReadyQueue = false;
+ return true;
+ }
+
/** a comparator of ddl, the less the ddl is, the low order it has. */
public static class TimeoutComparator implements Comparator<DriverTask> {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/DriverSchedulerMetricSet.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/DriverSchedulerMetricSet.java
index 0b9e0c04f05..94aaccfd9b7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/DriverSchedulerMetricSet.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/DriverSchedulerMetricSet.java
@@ -39,6 +39,7 @@ public class DriverSchedulerMetricSet implements IMetricSet {
public static final String READY_QUEUED_TIME = "ready_queued_time";
public static final String BLOCK_QUEUED_TIME = "block_queued_time";
public static final String READY_QUEUE_TASK_COUNT = "ready_queue_task_count";
+ public static final String READY_QUEUE_RESERVED_TASK_COUNT =
"ready_queue_reserved_task_count";
public static final String BLOCK_QUEUE_TASK_COUNT = "block_queue_task_count";
private static final String TIMEOUT_QUEUE_SIZE = "timeout_queue_task_count";
private static final String QUERY_MAP_SIZE = "query_map_size";
@@ -67,6 +68,13 @@ public class DriverSchedulerMetricSet implements IMetricSet {
DriverScheduler::getReadyQueueTaskCount,
Tag.NAME.toString(),
READY_QUEUE_TASK_COUNT);
+ metricService.createAutoGauge(
+ Metric.DRIVER_SCHEDULER.toString(),
+ MetricLevel.IMPORTANT,
+ DriverScheduler.getInstance(),
+ DriverScheduler::getReadyQueueReservedTaskCount,
+ Tag.NAME.toString(),
+ READY_QUEUE_RESERVED_TASK_COUNT);
metricService.createAutoGauge(
Metric.DRIVER_SCHEDULER.toString(),
MetricLevel.IMPORTANT,
@@ -109,6 +117,11 @@ public class DriverSchedulerMetricSet implements
IMetricSet {
Metric.DRIVER_SCHEDULER.toString(),
Tag.NAME.toString(),
READY_QUEUE_TASK_COUNT);
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.DRIVER_SCHEDULER.toString(),
+ Tag.NAME.toString(),
+ READY_QUEUE_RESERVED_TASK_COUNT);
metricService.remove(
MetricType.AUTO_GAUGE,
Metric.DRIVER_SCHEDULER.toString(),
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/DefaultDriverSchedulerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/DefaultDriverSchedulerTest.java
index 0cc0e5edd71..595d5febe4c 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/DefaultDriverSchedulerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/DefaultDriverSchedulerTest.java
@@ -197,6 +197,47 @@ public class DefaultDriverSchedulerTest {
clear();
}
+ @Test
+ public void testAbortReadyTaskAfterPollReleasesReadyQueueReservation()
throws Exception {
+ IMPPDataExchangeManager mockMPPDataExchangeManager =
+ Mockito.mock(IMPPDataExchangeManager.class);
+ manager.setBlockManager(mockMPPDataExchangeManager);
+ ITaskScheduler defaultScheduler = manager.getScheduler();
+ IDriver mockDriver = Mockito.mock(IDriver.class);
+ DriverTaskHandle driverTaskHandle =
+ new DriverTaskHandle(
+ 1,
+ (MultilevelPriorityQueue) manager.getReadyQueue(),
+ OptionalInt.of(Integer.MAX_VALUE));
+
+ QueryId queryId = new QueryId("test");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
+ DriverTaskId driverTaskID = new DriverTaskId(instanceId, 0);
+ Mockito.when(mockDriver.getDriverTaskId()).thenReturn(driverTaskID);
+ DriverTask testTask =
+ new DriverTask(mockDriver, 100L, DriverTaskStatus.READY,
driverTaskHandle, 0, false);
+
+ try {
+ manager.registerTaskToQueryMap(queryId, testTask);
+ manager.submitTaskToReadyQueue(testTask);
+
+ DriverTask polledTask = manager.getReadyQueue().poll();
+ Assert.assertSame(testTask, polledTask);
+ Assert.assertEquals(1, manager.getReadyQueueReservedTaskCount());
+
+ manager.abortFragmentInstance(instanceId);
+
+ Assert.assertEquals(DriverTaskStatus.ABORTED, testTask.getStatus());
+ Assert.assertEquals(0, manager.getReadyQueue().size());
+ Assert.assertEquals(0, manager.getReadyQueueReservedTaskCount());
+ Assert.assertFalse(defaultScheduler.readyToRunning(polledTask));
+ Assert.assertEquals(0, manager.getReadyQueueReservedTaskCount());
+ } finally {
+ clear();
+ }
+ }
+
@Test
public void testRunningToReady() {
IMPPDataExchangeManager mockMPPDataExchangeManager =