This is an automated email from the ASF dual-hosted git repository. shuwenwei pushed a commit to branch read_tsfile_table_function in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 80fce1a741ddb740ceb9e7c43817bddb0aae7896 Author: shuwenwei <[email protected]> AuthorDate: Thu Jun 11 15:34:59 2026 +0800 add memory control --- .../db/queryengine/common/MPPQueryContext.java | 6 ++- .../readTsFile/ExternalTsFileQueryResource.java | 57 +++++++++++++++++----- 2 files changed, 51 insertions(+), 12 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java index 8b1303ef44e..68d2de1c2a7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java @@ -261,7 +261,7 @@ public class MPPQueryContext implements IAuditEntity { } ExternalTsFileQueryResource externalTsFileQueryResource = new ExternalTsFileQueryResource( - queryId, + this, Paths.get(IoTDBDescriptor.getInstance().getConfig().getSortTmpDir()) .resolve(ExternalTsFileQueryResource.EXTERNAL_TSFILE_TMP_DIR) .resolve(queryId.getId()) @@ -517,6 +517,10 @@ public class MPPQueryContext implements IAuditEntity { this.memoryReservationManager.reserveMemoryImmediately(); } + public void reserveMemoryForFrontEndImmediately(final long bytes) { + this.memoryReservationManager.reserveMemoryImmediately(bytes); + } + public void releaseAllMemoryReservedForFrontEnd() { this.memoryReservationManager.releaseAllReservedMemory(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryResource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryResource.java index 4d85a11dbc6..74f39f73613 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryResource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryResource.java @@ -19,9 +19,11 @@ package org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile; +import org.apache.iotdb.calc.exception.MemoryNotEnoughException; import org.apache.iotdb.commons.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.commons.schema.filter.SchemaFilter; import org.apache.iotdb.commons.utils.FileUtils; +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.QueryId; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.ExternalTsFileDeviceFilterVisitor; import org.apache.iotdb.db.queryengine.plan.relational.metadata.AlignedDeviceEntry; @@ -70,9 +72,9 @@ import static java.util.Objects.requireNonNull; public class ExternalTsFileQueryResource implements AutoCloseable { public static final String EXTERNAL_TSFILE_TMP_DIR = "external-tsfile"; - private static final long DEVICE_TASK_BUCKET_TARGET_SIZE_IN_BYTES = 8L * 1024 * 1024; private final QueryId queryId; + private final MPPQueryContext queryContext; private final Path queryTempRoot; private final String tableName; private final List<String> tsFilePaths; @@ -85,18 +87,18 @@ public class ExternalTsFileQueryResource implements AutoCloseable { private volatile boolean closed; public ExternalTsFileQueryResource( - QueryId queryId, + MPPQueryContext queryContext, Path tempRoot, String tableName, List<String> tsFilePaths, LongConsumer ioSizeRecorder, boolean useExactTempRoot) { - this.queryId = queryId; + this.queryContext = requireNonNull(queryContext, "queryContext is null"); + this.queryId = queryContext.getQueryId(); this.queryTempRoot = useExactTempRoot ? requireNonNull(tempRoot, "tempRoot is null") - : requireNonNull(tempRoot, "tempRoot is null") - .resolve(requireNonNull(queryId, "queryId is null").getId()); + : requireNonNull(tempRoot, "tempRoot is null").resolve(this.queryId.getId()); this.tableName = tableName; this.tsFilePaths = Collections.unmodifiableList(new ArrayList<>(requireNonNull(tsFilePaths, "tsFilePaths"))); @@ -128,7 +130,7 @@ public class ExternalTsFileQueryResource implements AutoCloseable { deviceTaskPartitions.get( Math.floorMod(deviceID.hashCode(), deviceTaskPartitions.size())); partition.add(deviceTask); - if (partition.getEstimatedSizeInBytes() >= DEVICE_TASK_BUCKET_TARGET_SIZE_IN_BYTES) { + if (partition.shouldFlush()) { partition.flush(comparator); } } @@ -225,11 +227,15 @@ public class ExternalTsFileQueryResource implements AutoCloseable { public class DeviceTaskPartition { + private static final long DEVICE_TASK_BUCKET_TARGET_SIZE_IN_BYTES = 8L * 1024 * 1024; + private static final long MEMORY_RESERVE_BATCH_SIZE_IN_BYTES = 1024 * 1024; + private final int partitionIndex; private final List<DeviceTask> pendingDeviceTasks = new ArrayList<>(); private final List<Integer> deviceEntryIndexes = new ArrayList<>(); private final List<Path> runFiles = new ArrayList<>(); - private long estimatedSizeInBytes; + private long reservedBytes; + private long unreservedBytes; private DeviceTaskPartition(int partitionIndex) { this.partitionIndex = partitionIndex; @@ -245,7 +251,7 @@ public class ExternalTsFileQueryResource implements AutoCloseable { private void add(DeviceTask deviceTask) { pendingDeviceTasks.add(deviceTask); - estimatedSizeInBytes += deviceTask.ramBytesUsed(); + unreservedBytes += deviceTask.ramBytesUsed(); } private void flush(Comparator<DeviceEntry> comparator) { @@ -266,7 +272,7 @@ public class ExternalTsFileQueryResource implements AutoCloseable { deviceEntryIndexes.add(deviceTask.deviceEntryIndex); } pendingDeviceTasks.clear(); - estimatedSizeInBytes = 0; + releaseDeviceTaskMemory(); } private void sortPendingDeviceTasks(Comparator<DeviceEntry> comparator) { @@ -286,8 +292,37 @@ public class ExternalTsFileQueryResource implements AutoCloseable { } } - private long getEstimatedSizeInBytes() { - return estimatedSizeInBytes; + private boolean shouldFlush() { + if (getPendingMemoryBytes() >= DEVICE_TASK_BUCKET_TARGET_SIZE_IN_BYTES) { + return true; + } + if (unreservedBytes < MEMORY_RESERVE_BATCH_SIZE_IN_BYTES) { + return false; + } + return !reserveUnreservedMemory(); + } + + private boolean reserveUnreservedMemory() { + try { + queryContext.reserveMemoryForFrontEndImmediately(unreservedBytes); + } catch (MemoryNotEnoughException e) { + return false; + } + reservedBytes += unreservedBytes; + unreservedBytes = 0; + return true; + } + + private long getPendingMemoryBytes() { + return reservedBytes + unreservedBytes; + } + + private void releaseDeviceTaskMemory() { + if (reservedBytes != 0) { + queryContext.releaseMemoryReservedForFrontEnd(reservedBytes); + reservedBytes = 0; + } + unreservedBytes = 0; } private boolean hasDeviceTasks() {
