This is an automated email from the ASF dual-hosted git repository.

shuwenwei pushed a commit to branch read_tsfile_table_function
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 80fce1a741ddb740ceb9e7c43817bddb0aae7896
Author: shuwenwei <[email protected]>
AuthorDate: Thu Jun 11 15:34:59 2026 +0800

    add memory control
---
 .../db/queryengine/common/MPPQueryContext.java     |  6 ++-
 .../readTsFile/ExternalTsFileQueryResource.java    | 57 +++++++++++++++++-----
 2 files changed, 51 insertions(+), 12 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
index 8b1303ef44e..68d2de1c2a7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
@@ -261,7 +261,7 @@ public class MPPQueryContext implements IAuditEntity {
     }
     ExternalTsFileQueryResource externalTsFileQueryResource =
         new ExternalTsFileQueryResource(
-            queryId,
+            this,
             
Paths.get(IoTDBDescriptor.getInstance().getConfig().getSortTmpDir())
                 .resolve(ExternalTsFileQueryResource.EXTERNAL_TSFILE_TMP_DIR)
                 .resolve(queryId.getId())
@@ -517,6 +517,10 @@ public class MPPQueryContext implements IAuditEntity {
     this.memoryReservationManager.reserveMemoryImmediately();
   }
 
+  public void reserveMemoryForFrontEndImmediately(final long bytes) {
+    this.memoryReservationManager.reserveMemoryImmediately(bytes);
+  }
+
   public void releaseAllMemoryReservedForFrontEnd() {
     this.memoryReservationManager.releaseAllReservedMemory();
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryResource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryResource.java
index 4d85a11dbc6..74f39f73613 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryResource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryResource.java
@@ -19,9 +19,11 @@
 
 package 
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile;
 
+import org.apache.iotdb.calc.exception.MemoryNotEnoughException;
 import org.apache.iotdb.commons.queryengine.execution.MemoryEstimationHelper;
 import org.apache.iotdb.commons.schema.filter.SchemaFilter;
 import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.common.QueryId;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.ExternalTsFileDeviceFilterVisitor;
 import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.AlignedDeviceEntry;
@@ -70,9 +72,9 @@ import static java.util.Objects.requireNonNull;
 public class ExternalTsFileQueryResource implements AutoCloseable {
 
   public static final String EXTERNAL_TSFILE_TMP_DIR = "external-tsfile";
-  private static final long DEVICE_TASK_BUCKET_TARGET_SIZE_IN_BYTES = 8L * 
1024 * 1024;
 
   private final QueryId queryId;
+  private final MPPQueryContext queryContext;
   private final Path queryTempRoot;
   private final String tableName;
   private final List<String> tsFilePaths;
@@ -85,18 +87,18 @@ public class ExternalTsFileQueryResource implements 
AutoCloseable {
   private volatile boolean closed;
 
   public ExternalTsFileQueryResource(
-      QueryId queryId,
+      MPPQueryContext queryContext,
       Path tempRoot,
       String tableName,
       List<String> tsFilePaths,
       LongConsumer ioSizeRecorder,
       boolean useExactTempRoot) {
-    this.queryId = queryId;
+    this.queryContext = requireNonNull(queryContext, "queryContext is null");
+    this.queryId = queryContext.getQueryId();
     this.queryTempRoot =
         useExactTempRoot
             ? requireNonNull(tempRoot, "tempRoot is null")
-            : requireNonNull(tempRoot, "tempRoot is null")
-                .resolve(requireNonNull(queryId, "queryId is null").getId());
+            : requireNonNull(tempRoot, "tempRoot is 
null").resolve(this.queryId.getId());
     this.tableName = tableName;
     this.tsFilePaths =
         Collections.unmodifiableList(new 
ArrayList<>(requireNonNull(tsFilePaths, "tsFilePaths")));
@@ -128,7 +130,7 @@ public class ExternalTsFileQueryResource implements 
AutoCloseable {
             deviceTaskPartitions.get(
                 Math.floorMod(deviceID.hashCode(), 
deviceTaskPartitions.size()));
         partition.add(deviceTask);
-        if (partition.getEstimatedSizeInBytes() >= 
DEVICE_TASK_BUCKET_TARGET_SIZE_IN_BYTES) {
+        if (partition.shouldFlush()) {
           partition.flush(comparator);
         }
       }
@@ -225,11 +227,15 @@ public class ExternalTsFileQueryResource implements 
AutoCloseable {
 
   public class DeviceTaskPartition {
 
+    private static final long DEVICE_TASK_BUCKET_TARGET_SIZE_IN_BYTES = 8L * 
1024 * 1024;
+    private static final long MEMORY_RESERVE_BATCH_SIZE_IN_BYTES = 1024 * 1024;
+
     private final int partitionIndex;
     private final List<DeviceTask> pendingDeviceTasks = new ArrayList<>();
     private final List<Integer> deviceEntryIndexes = new ArrayList<>();
     private final List<Path> runFiles = new ArrayList<>();
-    private long estimatedSizeInBytes;
+    private long reservedBytes;
+    private long unreservedBytes;
 
     private DeviceTaskPartition(int partitionIndex) {
       this.partitionIndex = partitionIndex;
@@ -245,7 +251,7 @@ public class ExternalTsFileQueryResource implements 
AutoCloseable {
 
     private void add(DeviceTask deviceTask) {
       pendingDeviceTasks.add(deviceTask);
-      estimatedSizeInBytes += deviceTask.ramBytesUsed();
+      unreservedBytes += deviceTask.ramBytesUsed();
     }
 
     private void flush(Comparator<DeviceEntry> comparator) {
@@ -266,7 +272,7 @@ public class ExternalTsFileQueryResource implements 
AutoCloseable {
         deviceEntryIndexes.add(deviceTask.deviceEntryIndex);
       }
       pendingDeviceTasks.clear();
-      estimatedSizeInBytes = 0;
+      releaseDeviceTaskMemory();
     }
 
     private void sortPendingDeviceTasks(Comparator<DeviceEntry> comparator) {
@@ -286,8 +292,37 @@ public class ExternalTsFileQueryResource implements 
AutoCloseable {
       }
     }
 
-    private long getEstimatedSizeInBytes() {
-      return estimatedSizeInBytes;
+    private boolean shouldFlush() {
+      if (getPendingMemoryBytes() >= DEVICE_TASK_BUCKET_TARGET_SIZE_IN_BYTES) {
+        return true;
+      }
+      if (unreservedBytes < MEMORY_RESERVE_BATCH_SIZE_IN_BYTES) {
+        return false;
+      }
+      return !reserveUnreservedMemory();
+    }
+
+    private boolean reserveUnreservedMemory() {
+      try {
+        queryContext.reserveMemoryForFrontEndImmediately(unreservedBytes);
+      } catch (MemoryNotEnoughException e) {
+        return false;
+      }
+      reservedBytes += unreservedBytes;
+      unreservedBytes = 0;
+      return true;
+    }
+
+    private long getPendingMemoryBytes() {
+      return reservedBytes + unreservedBytes;
+    }
+
+    private void releaseDeviceTaskMemory() {
+      if (reservedBytes != 0) {
+        queryContext.releaseMemoryReservedForFrontEnd(reservedBytes);
+        reservedBytes = 0;
+      }
+      unreservedBytes = 0;
     }
 
     private boolean hasDeviceTasks() {

Reply via email to