This is an automated email from the ASF dual-hosted git repository.

Wei-hao-Li pushed a commit to branch showQ
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f6f6a8c4a6373d30f7ee8a679e87c9105160748a
Author: Weihao Li <[email protected]>
AuthorDate: Mon Jun 29 12:34:35 2026 +0800

    fix some
    
    Signed-off-by: Weihao Li <[email protected]>
---
 .../fragment/FragmentInstanceContext.java          |  5 ++
 .../plan/planner/LocalExecutionPlanner.java        | 91 ++++++++++++++++------
 .../NotThreadSafeMemoryReservationManager.java     | 43 +++++++++-
 .../memory/ThreadSafeMemoryReservationManager.java |  5 ++
 .../LocalExecutionPlannerOperatorsMemoryTest.java  | 85 ++++++++++++++++++++
 5 files changed, 203 insertions(+), 26 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
index 81dd89387bf..4cf29a80dda 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
@@ -45,6 +45,7 @@ import 
org.apache.iotdb.db.queryengine.metric.DriverSchedulerMetricSet;
 import org.apache.iotdb.db.queryengine.metric.QueryRelatedResourceMetricSet;
 import org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet;
 import org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet;
+import 
org.apache.iotdb.db.queryengine.plan.planner.memory.NotThreadSafeMemoryReservationManager;
 import 
org.apache.iotdb.db.queryengine.plan.planner.memory.ThreadSafeMemoryReservationManager;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.TimePredicate;
 import 
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.read_tsfile.ExternalTsFileQueryDataSource;
@@ -1365,6 +1366,10 @@ public class FragmentInstanceContext extends 
QueryContext {
 
   public void setHighestPriority(boolean highestPriority) {
     this.highestPriority = highestPriority;
+    if (memoryReservationManager instanceof 
NotThreadSafeMemoryReservationManager) {
+      ((NotThreadSafeMemoryReservationManager) memoryReservationManager)
+          .setHighestPriority(highestPriority);
+    }
   }
 
   public boolean isSingleSourcePath() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
index 95f28052c1f..470aa4257a7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.commons.memory.MemoryBlockType;
 import org.apache.iotdb.commons.path.IFullPath;
 import org.apache.iotdb.commons.queryengine.common.SqlDialect;
 import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.DataNodeMemoryConfig;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -118,7 +119,7 @@ public class LocalExecutionPlanner {
     context.invalidateParentPlanNodeIdToMemoryEstimator();
 
     // check whether current free memory is enough to execute current query
-    long estimatedMemorySize = checkMemory(memoryEstimator, 
instanceContext.getStateMachine());
+    long estimatedMemorySize = checkMemory(memoryEstimator, instanceContext);
 
     context.addPipelineDriverFactory(root, context.getDriverContext(), 
estimatedMemorySize);
 
@@ -157,7 +158,7 @@ public class LocalExecutionPlanner {
     context.invalidateParentPlanNodeIdToMemoryEstimator();
 
     // check whether current free memory is enough to execute current query
-    checkMemory(memoryEstimator, instanceContext.getStateMachine());
+    checkMemory(memoryEstimator, instanceContext);
 
     context.addPipelineDriverFactory(root, context.getDriverContext(), 0);
 
@@ -193,7 +194,7 @@ public class LocalExecutionPlanner {
   }
 
   private long checkMemory(
-      final PipelineMemoryEstimator memoryEstimator, 
FragmentInstanceStateMachine stateMachine)
+      final PipelineMemoryEstimator memoryEstimator, FragmentInstanceContext 
instanceContext)
       throws MemoryNotEnoughException {
 
     // if it is disabled, just return
@@ -206,14 +207,75 @@ public class LocalExecutionPlanner {
 
     
QueryRelatedResourceMetricSet.getInstance().updateEstimatedMemory(estimatedMemorySize);
 
-    if (OPERATORS_MEMORY_BLOCK.allocate(estimatedMemorySize)) {
+    if (instanceContext.isHighestPriority()) {
+      return 0L;
+    }
+
+    long reservedBytes = allocateOperatorsMemory(estimatedMemorySize);
+    if (reservedBytes < 0) {
+      throw new MemoryNotEnoughException(
+          String.format(
+              "There is not enough memory to execute current fragment 
instance, "
+                  + "current remaining free memory is %dB, "
+                  + "estimated memory usage for current fragment instance is 
%dB",
+              OPERATORS_MEMORY_BLOCK.getFreeMemoryInBytes(), 
estimatedMemorySize));
+    }
+    FragmentInstanceStateMachine stateMachine = 
instanceContext.getStateMachine();
+    if (reservedBytes > 0) {
+      stateMachine.addStateChangeListener(
+          newState -> {
+            if (newState.isDone()) {
+              try (SetThreadName fragmentInstanceName =
+                  new 
SetThreadName(stateMachine.getFragmentInstanceId().getFullId())) {
+                OPERATORS_MEMORY_BLOCK.release(reservedBytes);
+                if (LOGGER.isDebugEnabled()) {
+                  LOGGER.debug(
+                      "[ReleaseMemory] release: {}, current remaining memory: 
{}",
+                      reservedBytes,
+                      OPERATORS_MEMORY_BLOCK.getFreeMemoryInBytes());
+                }
+              }
+            }
+          });
+    }
+    return reservedBytes;
+  }
+
+  /**
+   * Try to reserve bytes from the operators memory block.
+   *
+   * @return allocated bytes on success ({@code >= 0}), {@code -1} if 
allocation failed
+   */
+  private long allocateOperatorsMemory(final long memoryInBytes) {
+    if (memoryInBytes <= 0) {
+      return 0L;
+    }
+    if (OPERATORS_MEMORY_BLOCK.allocate(memoryInBytes)) {
       if (LOGGER.isDebugEnabled()) {
         LOGGER.debug(
             "[ConsumeMemory] consume: {}, current remaining memory: {}",
-            estimatedMemorySize,
+            memoryInBytes,
             OPERATORS_MEMORY_BLOCK.getFreeMemoryInBytes());
       }
-    } else {
+      return memoryInBytes;
+    }
+    return -1L;
+  }
+
+  @TestOnly
+  long allocateOperatorsMemoryForTest(final long memoryInBytes) {
+    return allocateOperatorsMemory(memoryInBytes);
+  }
+
+  @TestOnly
+  long reserveOperatorsMemoryForFragmentForTest(
+      final long estimatedMemorySize, final boolean isHighestPriority)
+      throws MemoryNotEnoughException {
+    if (isHighestPriority) {
+      return 0L;
+    }
+    long reservedBytes = allocateOperatorsMemory(estimatedMemorySize);
+    if (reservedBytes < 0) {
       throw new MemoryNotEnoughException(
           String.format(
               "There is not enough memory to execute current fragment 
instance, "
@@ -221,22 +283,7 @@ public class LocalExecutionPlanner {
                   + "estimated memory usage for current fragment instance is 
%dB",
               OPERATORS_MEMORY_BLOCK.getFreeMemoryInBytes(), 
estimatedMemorySize));
     }
-    stateMachine.addStateChangeListener(
-        newState -> {
-          if (newState.isDone()) {
-            try (SetThreadName fragmentInstanceName =
-                new 
SetThreadName(stateMachine.getFragmentInstanceId().getFullId())) {
-              OPERATORS_MEMORY_BLOCK.release(estimatedMemorySize);
-              if (LOGGER.isDebugEnabled()) {
-                LOGGER.debug(
-                    "[ReleaseMemory] release: {}, current remaining memory: 
{}",
-                    estimatedMemorySize,
-                    OPERATORS_MEMORY_BLOCK.getFreeMemoryInBytes());
-              }
-            }
-          }
-        });
-    return estimatedMemorySize;
+    return reservedBytes;
   }
 
   private QueryDataSourceType getQueryDataSourceType(DataDriverContext 
dataDriverContext) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java
index d156628532c..9f49c5c7786 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.queryengine.plan.planner.memory;
 
 import org.apache.iotdb.calc.plan.planner.memory.MemoryReservationManager;
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.queryengine.common.QueryId;
 import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
 
@@ -39,6 +40,8 @@ public class NotThreadSafeMemoryReservationManager implements 
MemoryReservationM
 
   private final String contextHolder;
 
+  private boolean isHighestPriority;
+
   private long reservedBytesInTotal = 0;
 
   private long bytesToBeReserved = 0;
@@ -50,8 +53,24 @@ public class NotThreadSafeMemoryReservationManager 
implements MemoryReservationM
     this.contextHolder = contextHolder;
   }
 
+  public void setHighestPriority(boolean isHighestPriority) {
+    this.isHighestPriority = isHighestPriority;
+  }
+
+  public boolean isHighestPriority() {
+    return isHighestPriority;
+  }
+
+  @TestOnly
+  public long getReservedBytesInTotalForTest() {
+    return reservedBytesInTotal;
+  }
+
   @Override
   public void reserveMemoryCumulatively(final long size) {
+    if (isHighestPriority) {
+      return;
+    }
     bytesToBeReserved += size;
     if (bytesToBeReserved >= MEMORY_BATCH_THRESHOLD) {
       reserveMemoryImmediately();
@@ -60,6 +79,9 @@ public class NotThreadSafeMemoryReservationManager implements 
MemoryReservationM
 
   @Override
   public void reserveMemoryImmediately() {
+    if (isHighestPriority) {
+      return;
+    }
     if (bytesToBeReserved != 0) {
       LOCAL_EXECUTION_PLANNER.reserveFromFreeMemoryForOperators(
           bytesToBeReserved, reservedBytesInTotal, queryId.getId(), 
contextHolder);
@@ -70,15 +92,19 @@ public class NotThreadSafeMemoryReservationManager 
implements MemoryReservationM
 
   @Override
   public void reserveMemoryImmediately(final long size) {
-    if (size != 0) {
-      LOCAL_EXECUTION_PLANNER.reserveFromFreeMemoryForOperators(
-          size, reservedBytesInTotal, queryId.getId(), contextHolder);
-      reservedBytesInTotal += size;
+    if (isHighestPriority || size == 0) {
+      return;
     }
+    LOCAL_EXECUTION_PLANNER.reserveFromFreeMemoryForOperators(
+        size, reservedBytesInTotal, queryId.getId(), contextHolder);
+    reservedBytesInTotal += size;
   }
 
   @Override
   public void releaseMemoryCumulatively(final long size) {
+    if (isHighestPriority) {
+      return;
+    }
     bytesToBeReleased += size;
     if (bytesToBeReleased >= MEMORY_BATCH_THRESHOLD) {
       long bytesToRelease;
@@ -96,6 +122,9 @@ public class NotThreadSafeMemoryReservationManager 
implements MemoryReservationM
 
   @Override
   public void releaseAllReservedMemory() {
+    if (isHighestPriority) {
+      return;
+    }
     if (reservedBytesInTotal != 0) {
       
LOCAL_EXECUTION_PLANNER.releaseToFreeMemoryForOperators(reservedBytesInTotal);
       reservedBytesInTotal = 0;
@@ -106,6 +135,9 @@ public class NotThreadSafeMemoryReservationManager 
implements MemoryReservationM
 
   @Override
   public Pair<Long, Long> releaseMemoryVirtually(final long size) {
+    if (isHighestPriority) {
+      return new Pair<>(size, 0L);
+    }
     if (bytesToBeReserved >= size) {
       bytesToBeReserved -= size;
       return new Pair<>(size, 0L);
@@ -121,6 +153,9 @@ public class NotThreadSafeMemoryReservationManager 
implements MemoryReservationM
   @Override
   public void reserveMemoryVirtually(
       final long bytesToBeReserved, final long bytesAlreadyReserved) {
+    if (isHighestPriority) {
+      return;
+    }
     reservedBytesInTotal += bytesAlreadyReserved;
     reserveMemoryCumulatively(bytesToBeReserved);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java
index 2a544421f3f..0a1c6eee418 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java
@@ -66,4 +66,9 @@ public class ThreadSafeMemoryReservationManager extends 
NotThreadSafeMemoryReser
       final long bytesToBeReserved, final long bytesAlreadyReserved) {
     super.reserveMemoryVirtually(bytesToBeReserved, bytesAlreadyReserved);
   }
+
+  @Override
+  public synchronized void setHighestPriority(boolean isHighestPriority) {
+    super.setHighestPriority(isHighestPriority);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlannerOperatorsMemoryTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlannerOperatorsMemoryTest.java
new file mode 100644
index 00000000000..4f255d642be
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlannerOperatorsMemoryTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner;
+
+import org.apache.iotdb.calc.exception.MemoryNotEnoughException;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class LocalExecutionPlannerOperatorsMemoryTest {
+
+  private static final LocalExecutionPlanner PLANNER = 
LocalExecutionPlanner.getInstance();
+
+  private long bytesHeldByTest = 0L;
+
+  @After
+  public void tearDown() {
+    if (bytesHeldByTest > 0) {
+      PLANNER.releaseToFreeMemoryForOperators(bytesHeldByTest);
+      bytesHeldByTest = 0L;
+    }
+  }
+
+  @Test
+  public void testAllocateOperatorsMemoryFailsWhenInsufficient() {
+    long free = PLANNER.getFreeMemoryForOperators();
+    Assert.assertEquals(-1L, PLANNER.allocateOperatorsMemoryForTest(free + 
1024L));
+  }
+
+  @Test
+  public void testAllocateOperatorsMemorySucceedsWhenAvailable() {
+    long request = Math.min(1024L, PLANNER.getFreeMemoryForOperators());
+    long reserved = PLANNER.allocateOperatorsMemoryForTest(request);
+    Assert.assertEquals(request, reserved);
+    bytesHeldByTest = reserved;
+  }
+
+  @Test
+  public void testHighestPriorityBypassWithoutConsumingOperatorsPool()
+      throws MemoryNotEnoughException {
+    long freeBefore = PLANNER.getFreeMemoryForOperators();
+    long request = freeBefore + 1024L;
+
+    Assert.assertEquals(0L, 
PLANNER.reserveOperatorsMemoryForFragmentForTest(request, true));
+    Assert.assertEquals(freeBefore, PLANNER.getFreeMemoryForOperators());
+  }
+
+  @Test
+  public void testHighestPriorityBypassWhenPoolInsufficient() throws 
MemoryNotEnoughException {
+    long free = PLANNER.getFreeMemoryForOperators();
+    long request = free + 1024L;
+
+    Assert.assertEquals(0L, 
PLANNER.reserveOperatorsMemoryForFragmentForTest(request, true));
+  }
+
+  @Test
+  public void testNormalPriorityThrowsWhenPoolInsufficient() {
+    long free = PLANNER.getFreeMemoryForOperators();
+    long request = free + 1024L;
+
+    try {
+      PLANNER.reserveOperatorsMemoryForFragmentForTest(request, false);
+      Assert.fail("Expect MemoryNotEnoughException");
+    } catch (MemoryNotEnoughException ignore) {
+    }
+  }
+}

Reply via email to