This is an automated email from the ASF dual-hosted git repository.

JackieTien97 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 3853d380fc1 Fix driver scheduler ready queue reservation leak (#17919)
3853d380fc1 is described below

commit 3853d380fc1ff612e3153275f2e854ec634dcf03
Author: Jackie Tien <[email protected]>
AuthorDate: Thu Jun 11 19:28:45 2026 +0800

    Fix driver scheduler ready queue reservation leak (#17919)
---
 .../queue/IndexedBlockingReserveQueue.java         | 36 +++++++++++++++++--
 .../execution/schedule/DriverScheduler.java        | 15 +++++---
 .../multilevelqueue/MultilevelPriorityQueue.java   | 10 ++++++
 .../execution/schedule/task/DriverTask.java        | 13 +++++++
 .../metric/DriverSchedulerMetricSet.java           | 13 +++++++
 .../schedule/DefaultDriverSchedulerTest.java       | 41 ++++++++++++++++++++++
 6 files changed, 121 insertions(+), 7 deletions(-)

diff --git 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/schedule/queue/IndexedBlockingReserveQueue.java
 
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/schedule/queue/IndexedBlockingReserveQueue.java
index ce18e18b28a..b1040e7c64a 100644
--- 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/schedule/queue/IndexedBlockingReserveQueue.java
+++ 
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/schedule/queue/IndexedBlockingReserveQueue.java
@@ -49,6 +49,7 @@ public abstract class IndexedBlockingReserveQueue<E extends 
IDIndexedAccessible>
     E output = pollFirst();
     size--;
     reservedSize++;
+    markReserved(output);
     return output;
   }
 
@@ -68,7 +69,7 @@ public abstract class IndexedBlockingReserveQueue<E extends 
IDIndexedAccessible>
       throw new NullPointerException(CalcMessages.PUSHED_ELEMENT_IS_NULL);
     }
     pushToQueue(element);
-    reservedSize--;
+    decreaseReservedSizeIfNecessary(element);
     size++;
     this.notifyAll();
   }
@@ -76,7 +77,36 @@ public abstract class IndexedBlockingReserveQueue<E extends 
IDIndexedAccessible>
   /**
    * For task that is not in readyQueue when it's cleared, it won't be added 
into the queue again.
    */
-  public synchronized void decreaseReservedSize() {
-    this.reservedSize--;
+  public synchronized boolean decreaseReservedSize(E element) {
+    if (element == null) {
+      throw new NullPointerException(CalcMessages.PUSHED_ELEMENT_IS_NULL);
+    }
+    return decreaseReservedSizeIfNecessary(element);
+  }
+
+  public final synchronized int getReservedSize() {
+    return reservedSize;
+  }
+
+  @Override
+  public synchronized void clear() {
+    super.clear();
+    this.reservedSize = 0;
+  }
+
+  protected void markReserved(E element) {
+    // Do nothing by default.
+  }
+
+  protected boolean releaseReserved(E element) {
+    return true;
+  }
+
+  private boolean decreaseReservedSizeIfNecessary(E element) {
+    if (!releaseReserved(element) || reservedSize <= 0) {
+      return false;
+    }
+    reservedSize--;
+    return true;
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java
index e6c8d248e2e..867c890a728 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java
@@ -347,16 +347,18 @@ public class DriverScheduler implements IDriverScheduler, 
IService {
             return;
           case READY:
             task.setStatus(DriverTaskStatus.ABORTED);
-            readyQueue.remove(task.getDriverTaskId());
+            if (readyQueue.remove(task.getDriverTaskId()) == null) {
+              readyQueue.decreaseReservedSize(task);
+            }
             break;
           case BLOCKED:
             task.setStatus(DriverTaskStatus.ABORTED);
             blockedTasks.remove(task);
-            readyQueue.decreaseReservedSize();
+            readyQueue.decreaseReservedSize(task);
             break;
           case RUNNING:
             task.setStatus(DriverTaskStatus.ABORTED);
-            readyQueue.decreaseReservedSize();
+            readyQueue.decreaseReservedSize(task);
             break;
           case FINISHED:
             break;
@@ -422,6 +424,10 @@ public class DriverScheduler implements IDriverScheduler, 
IService {
     return readyQueue.size();
   }
 
+  public long getReadyQueueReservedTaskCount() {
+    return readyQueue.getReservedSize();
+  }
+
   public long getBlockQueueTaskCount() {
     return blockedTasks.size();
   }
@@ -497,6 +503,7 @@ public class DriverScheduler implements IDriverScheduler, 
IService {
       task.lock();
       try {
         if (task.getStatus() != DriverTaskStatus.READY) {
+          readyQueue.decreaseReservedSize(task);
           return false;
         }
 
@@ -553,7 +560,7 @@ public class DriverScheduler implements IDriverScheduler, 
IService {
         }
         task.updateSchedulePriority(context);
         task.setStatus(DriverTaskStatus.FINISHED);
-        readyQueue.decreaseReservedSize();
+        readyQueue.decreaseReservedSize(task);
       } finally {
         task.unlock();
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/multilevelqueue/MultilevelPriorityQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/multilevelqueue/MultilevelPriorityQueue.java
index ab32a65959b..797f99760e3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/multilevelqueue/MultilevelPriorityQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/multilevelqueue/MultilevelPriorityQueue.java
@@ -191,6 +191,16 @@ public class MultilevelPriorityQueue extends 
IndexedBlockingReserveQueue<DriverT
         "MultilevelPriorityQueue does not support access element by get.");
   }
 
+  @Override
+  protected void markReserved(DriverTask task) {
+    task.markReservedInReadyQueue();
+  }
+
+  @Override
+  protected boolean releaseReserved(DriverTask task) {
+    return task.releaseReservedInReadyQueue();
+  }
+
   @Override
   protected void clearAllElements() {
     highestPriorityLevelQueue.clear();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/task/DriverTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/task/DriverTask.java
index ad1e1a687ae..c85d00e17e1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/task/DriverTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/task/DriverTask.java
@@ -60,6 +60,7 @@ public class DriverTask implements IDIndexedAccessible {
   private final DriverTaskHandle driverTaskHandle;
   private long lastEnterReadyQueueTime;
   private long lastEnterBlockQueueTime;
+  private boolean reservedInReadyQueue;
 
   private long estimatedMemorySize;
 
@@ -213,6 +214,18 @@ public class DriverTask implements IDIndexedAccessible {
     this.lastEnterBlockQueueTime = lastEnterBlockQueueTime;
   }
 
+  public void markReservedInReadyQueue() {
+    reservedInReadyQueue = true;
+  }
+
+  public boolean releaseReservedInReadyQueue() {
+    if (!reservedInReadyQueue) {
+      return false;
+    }
+    reservedInReadyQueue = false;
+    return true;
+  }
+
   /** a comparator of ddl, the less the ddl is, the low order it has. */
   public static class TimeoutComparator implements Comparator<DriverTask> {
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/DriverSchedulerMetricSet.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/DriverSchedulerMetricSet.java
index 0b9e0c04f05..94aaccfd9b7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/DriverSchedulerMetricSet.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/DriverSchedulerMetricSet.java
@@ -39,6 +39,7 @@ public class DriverSchedulerMetricSet implements IMetricSet {
   public static final String READY_QUEUED_TIME = "ready_queued_time";
   public static final String BLOCK_QUEUED_TIME = "block_queued_time";
   public static final String READY_QUEUE_TASK_COUNT = "ready_queue_task_count";
+  public static final String READY_QUEUE_RESERVED_TASK_COUNT = 
"ready_queue_reserved_task_count";
   public static final String BLOCK_QUEUE_TASK_COUNT = "block_queue_task_count";
   private static final String TIMEOUT_QUEUE_SIZE = "timeout_queue_task_count";
   private static final String QUERY_MAP_SIZE = "query_map_size";
@@ -67,6 +68,13 @@ public class DriverSchedulerMetricSet implements IMetricSet {
         DriverScheduler::getReadyQueueTaskCount,
         Tag.NAME.toString(),
         READY_QUEUE_TASK_COUNT);
+    metricService.createAutoGauge(
+        Metric.DRIVER_SCHEDULER.toString(),
+        MetricLevel.IMPORTANT,
+        DriverScheduler.getInstance(),
+        DriverScheduler::getReadyQueueReservedTaskCount,
+        Tag.NAME.toString(),
+        READY_QUEUE_RESERVED_TASK_COUNT);
     metricService.createAutoGauge(
         Metric.DRIVER_SCHEDULER.toString(),
         MetricLevel.IMPORTANT,
@@ -109,6 +117,11 @@ public class DriverSchedulerMetricSet implements 
IMetricSet {
         Metric.DRIVER_SCHEDULER.toString(),
         Tag.NAME.toString(),
         READY_QUEUE_TASK_COUNT);
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.DRIVER_SCHEDULER.toString(),
+        Tag.NAME.toString(),
+        READY_QUEUE_RESERVED_TASK_COUNT);
     metricService.remove(
         MetricType.AUTO_GAUGE,
         Metric.DRIVER_SCHEDULER.toString(),
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/DefaultDriverSchedulerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/DefaultDriverSchedulerTest.java
index 0cc0e5edd71..9b3cb38cbc0 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/DefaultDriverSchedulerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/DefaultDriverSchedulerTest.java
@@ -197,6 +197,47 @@ public class DefaultDriverSchedulerTest {
     clear();
   }
 
+  @Test
+  public void testAbortReadyTaskAfterPollReleasesReadyQueueReservation() 
throws Exception {
+    IMPPDataExchangeManager mockMPPDataExchangeManager =
+        Mockito.mock(IMPPDataExchangeManager.class);
+    manager.setBlockManager(mockMPPDataExchangeManager);
+    ITaskScheduler defaultScheduler = manager.getScheduler();
+    IDriver mockDriver = Mockito.mock(IDriver.class);
+    DriverTaskHandle driverTaskHandle =
+        new DriverTaskHandle(
+            1,
+            (MultilevelPriorityQueue) manager.getReadyQueue(),
+            OptionalInt.of(Integer.MAX_VALUE));
+
+    QueryId queryId = new QueryId("test");
+    FragmentInstanceId instanceId =
+        new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
+    DriverTaskId driverTaskID = new DriverTaskId(instanceId, 0);
+    Mockito.when(mockDriver.getDriverTaskId()).thenReturn(driverTaskID);
+    DriverTask testTask =
+        new DriverTask(mockDriver, 100L, DriverTaskStatus.READY, 
driverTaskHandle, 0, false);
+
+    try {
+      manager.registerTaskToQueryMap(queryId, testTask);
+      manager.submitTaskToReadyQueue(testTask);
+
+      DriverTask polledTask = manager.getReadyQueue().poll();
+      Assert.assertSame(testTask, polledTask);
+      Assert.assertEquals(1, manager.getReadyQueueReservedTaskCount());
+
+      manager.abortFragmentInstance(instanceId, new RuntimeException("test 
abort"));
+
+      Assert.assertEquals(DriverTaskStatus.ABORTED, testTask.getStatus());
+      Assert.assertEquals(0, manager.getReadyQueue().size());
+      Assert.assertEquals(0, manager.getReadyQueueReservedTaskCount());
+      Assert.assertFalse(defaultScheduler.readyToRunning(polledTask));
+      Assert.assertEquals(0, manager.getReadyQueueReservedTaskCount());
+    } finally {
+      clear();
+    }
+  }
+
   @Test
   public void testRunningToReady() {
     IMPPDataExchangeManager mockMPPDataExchangeManager =

Reply via email to