This is an automated email from the ASF dual-hosted git repository.

JackieTien97 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 290842e77df [To dev/1.3] Fix driver scheduler ready queue reservation 
leak (#17920)
290842e77df is described below

commit 290842e77df127b0b02130bdde5fa180c562be62
Author: Jackie Tien <[email protected]>
AuthorDate: Thu Jun 11 19:38:27 2026 +0800

    [To dev/1.3] Fix driver scheduler ready queue reservation leak (#17920)
---
 .../execution/schedule/DriverScheduler.java        | 15 +++++---
 .../queue/IndexedBlockingReserveQueue.java         | 36 +++++++++++++++++--
 .../multilevelqueue/MultilevelPriorityQueue.java   | 10 ++++++
 .../execution/schedule/task/DriverTask.java        | 13 +++++++
 .../metric/DriverSchedulerMetricSet.java           | 13 +++++++
 .../schedule/DefaultDriverSchedulerTest.java       | 41 ++++++++++++++++++++++
 6 files changed, 121 insertions(+), 7 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java
index 183e51b7e1e..239ebb11329 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java
@@ -336,16 +336,18 @@ public class DriverScheduler implements IDriverScheduler, 
IService {
             return;
           case READY:
             task.setStatus(DriverTaskStatus.ABORTED);
-            readyQueue.remove(task.getDriverTaskId());
+            if (readyQueue.remove(task.getDriverTaskId()) == null) {
+              readyQueue.decreaseReservedSize(task);
+            }
             break;
           case BLOCKED:
             task.setStatus(DriverTaskStatus.ABORTED);
             blockedTasks.remove(task);
-            readyQueue.decreaseReservedSize();
+            readyQueue.decreaseReservedSize(task);
             break;
           case RUNNING:
             task.setStatus(DriverTaskStatus.ABORTED);
-            readyQueue.decreaseReservedSize();
+            readyQueue.decreaseReservedSize(task);
             break;
           case FINISHED:
             break;
@@ -414,6 +416,10 @@ public class DriverScheduler implements IDriverScheduler, 
IService {
     return readyQueue.size();
   }
 
+  public long getReadyQueueReservedTaskCount() {
+    return readyQueue.getReservedSize();
+  }
+
   public long getBlockQueueTaskCount() {
     return blockedTasks.size();
   }
@@ -489,6 +495,7 @@ public class DriverScheduler implements IDriverScheduler, 
IService {
       task.lock();
       try {
         if (task.getStatus() != DriverTaskStatus.READY) {
+          readyQueue.decreaseReservedSize(task);
           return false;
         }
 
@@ -545,7 +552,7 @@ public class DriverScheduler implements IDriverScheduler, 
IService {
         }
         task.updateSchedulePriority(context);
         task.setStatus(DriverTaskStatus.FINISHED);
-        readyQueue.decreaseReservedSize();
+        readyQueue.decreaseReservedSize(task);
       } finally {
         task.unlock();
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/IndexedBlockingReserveQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/IndexedBlockingReserveQueue.java
index 1891fb63556..b0f0edbd0e1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/IndexedBlockingReserveQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/IndexedBlockingReserveQueue.java
@@ -47,6 +47,7 @@ public abstract class IndexedBlockingReserveQueue<E extends 
IDIndexedAccessible>
     E output = pollFirst();
     size--;
     reservedSize++;
+    markReserved(output);
     return output;
   }
 
@@ -66,7 +67,7 @@ public abstract class IndexedBlockingReserveQueue<E extends 
IDIndexedAccessible>
       throw new NullPointerException("pushed element is null");
     }
     pushToQueue(element);
-    reservedSize--;
+    decreaseReservedSizeIfNecessary(element);
     size++;
     this.notifyAll();
   }
@@ -74,7 +75,36 @@ public abstract class IndexedBlockingReserveQueue<E extends 
IDIndexedAccessible>
   /**
    * For task that is not in readyQueue when it's cleared, it won't be added 
into the queue again.
    */
-  public synchronized void decreaseReservedSize() {
-    this.reservedSize--;
+  public synchronized boolean decreaseReservedSize(E element) {
+    if (element == null) {
+      throw new NullPointerException("pushed element is null");
+    }
+    return decreaseReservedSizeIfNecessary(element);
+  }
+
+  public final synchronized int getReservedSize() {
+    return reservedSize;
+  }
+
+  @Override
+  public synchronized void clear() {
+    super.clear();
+    this.reservedSize = 0;
+  }
+
+  protected void markReserved(E element) {
+    // Do nothing by default.
+  }
+
+  protected boolean releaseReserved(E element) {
+    return true;
+  }
+
+  private boolean decreaseReservedSizeIfNecessary(E element) {
+    if (!releaseReserved(element) || reservedSize <= 0) {
+      return false;
+    }
+    reservedSize--;
+    return true;
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/multilevelqueue/MultilevelPriorityQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/multilevelqueue/MultilevelPriorityQueue.java
index c5f3cb3e6df..06c7275473b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/multilevelqueue/MultilevelPriorityQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/multilevelqueue/MultilevelPriorityQueue.java
@@ -191,6 +191,16 @@ public class MultilevelPriorityQueue extends 
IndexedBlockingReserveQueue<DriverT
         "MultilevelPriorityQueue does not support access element by get.");
   }
 
+  @Override
+  protected void markReserved(DriverTask task) {
+    task.markReservedInReadyQueue();
+  }
+
+  @Override
+  protected boolean releaseReserved(DriverTask task) {
+    return task.releaseReservedInReadyQueue();
+  }
+
   @Override
   protected void clearAllElements() {
     highestPriorityLevelQueue.clear();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/task/DriverTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/task/DriverTask.java
index f320c6cad6c..85bada856ca 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/task/DriverTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/task/DriverTask.java
@@ -59,6 +59,7 @@ public class DriverTask implements IDIndexedAccessible {
   private final DriverTaskHandle driverTaskHandle;
   private long lastEnterReadyQueueTime;
   private long lastEnterBlockQueueTime;
+  private boolean reservedInReadyQueue;
 
   private long estimatedMemorySize;
 
@@ -210,6 +211,18 @@ public class DriverTask implements IDIndexedAccessible {
     this.lastEnterBlockQueueTime = lastEnterBlockQueueTime;
   }
 
+  public void markReservedInReadyQueue() {
+    reservedInReadyQueue = true;
+  }
+
+  public boolean releaseReservedInReadyQueue() {
+    if (!reservedInReadyQueue) {
+      return false;
+    }
+    reservedInReadyQueue = false;
+    return true;
+  }
+
   /** a comparator of ddl, the less the ddl is, the low order it has. */
   public static class TimeoutComparator implements Comparator<DriverTask> {
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/DriverSchedulerMetricSet.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/DriverSchedulerMetricSet.java
index 0b9e0c04f05..94aaccfd9b7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/DriverSchedulerMetricSet.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/DriverSchedulerMetricSet.java
@@ -39,6 +39,7 @@ public class DriverSchedulerMetricSet implements IMetricSet {
   public static final String READY_QUEUED_TIME = "ready_queued_time";
   public static final String BLOCK_QUEUED_TIME = "block_queued_time";
   public static final String READY_QUEUE_TASK_COUNT = "ready_queue_task_count";
+  public static final String READY_QUEUE_RESERVED_TASK_COUNT = 
"ready_queue_reserved_task_count";
   public static final String BLOCK_QUEUE_TASK_COUNT = "block_queue_task_count";
   private static final String TIMEOUT_QUEUE_SIZE = "timeout_queue_task_count";
   private static final String QUERY_MAP_SIZE = "query_map_size";
@@ -67,6 +68,13 @@ public class DriverSchedulerMetricSet implements IMetricSet {
         DriverScheduler::getReadyQueueTaskCount,
         Tag.NAME.toString(),
         READY_QUEUE_TASK_COUNT);
+    metricService.createAutoGauge(
+        Metric.DRIVER_SCHEDULER.toString(),
+        MetricLevel.IMPORTANT,
+        DriverScheduler.getInstance(),
+        DriverScheduler::getReadyQueueReservedTaskCount,
+        Tag.NAME.toString(),
+        READY_QUEUE_RESERVED_TASK_COUNT);
     metricService.createAutoGauge(
         Metric.DRIVER_SCHEDULER.toString(),
         MetricLevel.IMPORTANT,
@@ -109,6 +117,11 @@ public class DriverSchedulerMetricSet implements 
IMetricSet {
         Metric.DRIVER_SCHEDULER.toString(),
         Tag.NAME.toString(),
         READY_QUEUE_TASK_COUNT);
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.DRIVER_SCHEDULER.toString(),
+        Tag.NAME.toString(),
+        READY_QUEUE_RESERVED_TASK_COUNT);
     metricService.remove(
         MetricType.AUTO_GAUGE,
         Metric.DRIVER_SCHEDULER.toString(),
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/DefaultDriverSchedulerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/DefaultDriverSchedulerTest.java
index 0cc0e5edd71..595d5febe4c 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/DefaultDriverSchedulerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/DefaultDriverSchedulerTest.java
@@ -197,6 +197,47 @@ public class DefaultDriverSchedulerTest {
     clear();
   }
 
+  @Test
+  public void testAbortReadyTaskAfterPollReleasesReadyQueueReservation() 
throws Exception {
+    IMPPDataExchangeManager mockMPPDataExchangeManager =
+        Mockito.mock(IMPPDataExchangeManager.class);
+    manager.setBlockManager(mockMPPDataExchangeManager);
+    ITaskScheduler defaultScheduler = manager.getScheduler();
+    IDriver mockDriver = Mockito.mock(IDriver.class);
+    DriverTaskHandle driverTaskHandle =
+        new DriverTaskHandle(
+            1,
+            (MultilevelPriorityQueue) manager.getReadyQueue(),
+            OptionalInt.of(Integer.MAX_VALUE));
+
+    QueryId queryId = new QueryId("test");
+    FragmentInstanceId instanceId =
+        new FragmentInstanceId(new PlanFragmentId(queryId, 0), "inst-0");
+    DriverTaskId driverTaskID = new DriverTaskId(instanceId, 0);
+    Mockito.when(mockDriver.getDriverTaskId()).thenReturn(driverTaskID);
+    DriverTask testTask =
+        new DriverTask(mockDriver, 100L, DriverTaskStatus.READY, 
driverTaskHandle, 0, false);
+
+    try {
+      manager.registerTaskToQueryMap(queryId, testTask);
+      manager.submitTaskToReadyQueue(testTask);
+
+      DriverTask polledTask = manager.getReadyQueue().poll();
+      Assert.assertSame(testTask, polledTask);
+      Assert.assertEquals(1, manager.getReadyQueueReservedTaskCount());
+
+      manager.abortFragmentInstance(instanceId);
+
+      Assert.assertEquals(DriverTaskStatus.ABORTED, testTask.getStatus());
+      Assert.assertEquals(0, manager.getReadyQueue().size());
+      Assert.assertEquals(0, manager.getReadyQueueReservedTaskCount());
+      Assert.assertFalse(defaultScheduler.readyToRunning(polledTask));
+      Assert.assertEquals(0, manager.getReadyQueueReservedTaskCount());
+    } finally {
+      clear();
+    }
+  }
+
   @Test
   public void testRunningToReady() {
     IMPPDataExchangeManager mockMPPDataExchangeManager =

Reply via email to