This is an automated email from the ASF dual-hosted git repository. shuwenwei pushed a commit to branch read_tsfile_table_function in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 5e7bc0af91559f7c67f2672c8b52399aae4da48c Author: shuwenwei <[email protected]> AuthorDate: Fri Jun 12 11:10:56 2026 +0800 fix mem control --- .../ExternalTsFileAggTableScanOperator.java | 3 ++- .../ExternalTsFileTableScanOperator.java | 3 ++- .../readTsFile/ExternalTsFileQueryDataSource.java | 28 ++++------------------ .../readTsFile/ExternalTsFileQueryResource.java | 13 ++++++---- 4 files changed, 18 insertions(+), 29 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileAggTableScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileAggTableScanOperator.java index f6e4a84fc75..a3e7566fc93 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileAggTableScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileAggTableScanOperator.java @@ -28,6 +28,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.E import org.apache.iotdb.db.queryengine.plan.relational.metadata.AlignedDeviceEntry; import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource; +import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.tsfile.file.metadata.AbstractAlignedTimeSeriesMetadata; @@ -118,7 +119,7 @@ public class ExternalTsFileAggTableScanOperator extends DefaultAggTableScanOpera && deviceEntries.get(currentDeviceIndex) != null; } - private ExternalTsFileQueryDataSource updateCurrentDeviceQueryDataSource() { + private QueryDataSource updateCurrentDeviceQueryDataSource() { try { if (!deviceTaskReader.nextDevice()) { throw new IllegalStateException( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileTableScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileTableScanOperator.java index 73a9cb11ad5..59d072b1362 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileTableScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileTableScanOperator.java @@ -27,6 +27,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.E import org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ExternalTsFileQueryResource.DeviceTaskRunReader; import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource; +import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.tsfile.file.metadata.AbstractAlignedTimeSeriesMetadata; @@ -105,7 +106,7 @@ public class ExternalTsFileTableScanOperator extends TableScanOperator { } } - private ExternalTsFileQueryDataSource updateCurrentDeviceQueryDataSource() { + private QueryDataSource updateCurrentDeviceQueryDataSource() { try { if (!deviceTaskReader.nextDevice()) { throw new IllegalStateException( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryDataSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryDataSource.java index ac55a8cbfe2..fd18da081c7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryDataSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryDataSource.java @@ -21,47 +21,29 @@ package org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile; import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource; import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; -import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; - -import org.apache.tsfile.file.metadata.IDeviceID; -import org.apache.tsfile.read.filter.basic.Filter; import java.util.Collections; -import java.util.List; public class ExternalTsFileQueryDataSource extends QueryDataSource { private final ExternalTsFileQueryResource externalTsFileQueryResource; public ExternalTsFileQueryDataSource(ExternalTsFileQueryResource externalTsFileQueryResource) { - this(externalTsFileQueryResource, Collections.emptyList()); - } - - ExternalTsFileQueryDataSource( - ExternalTsFileQueryResource externalTsFileQueryResource, - List<TsFileResource> unseqResources) { - super(Collections.emptyList(), unseqResources); + super(Collections.emptyList(), Collections.emptyList()); this.externalTsFileQueryResource = externalTsFileQueryResource; } - @Override - public IQueryDataSource clone() { - return new ExternalTsFileQueryDataSource(externalTsFileQueryResource, getUnseqResources()); - } - public ExternalTsFileQueryResource getExternalTsFileQueryResource() { return externalTsFileQueryResource; } @Override - public boolean isSeqSatisfied( - IDeviceID deviceID, int curIndex, Filter timeFilter, boolean debug) { - return true; + public IQueryDataSource clone() { + return new ExternalTsFileQueryDataSource(externalTsFileQueryResource); } @Override - public boolean isUnSeqSatisfied( - IDeviceID deviceID, int curIndex, Filter timeFilter, boolean debug) { - return true; + public boolean isEmpty() { + return externalTsFileQueryResource.getTsFileResources().isEmpty(); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryResource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryResource.java index 74f39f73613..9e3208a7251 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryResource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryResource.java @@ -28,6 +28,7 @@ import org.apache.iotdb.db.queryengine.common.QueryId; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.ExternalTsFileDeviceFilterVisitor; import org.apache.iotdb.db.queryengine.plan.relational.metadata.AlignedDeviceEntry; import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; +import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; @@ -112,6 +113,7 @@ public class ExternalTsFileQueryResource implements AutoCloseable { public void collectDeviceEntries( SchemaFilter schemaFilter, Comparator<DeviceEntry> comparator, int partitionCount) { checkNotClosed(); + acquireMemoryForTsFileReaders(); ExternalTsFileDeviceFilterVisitor deviceFilterVisitor = new ExternalTsFileDeviceFilterVisitor(); try (DeviceCollector deviceCollector = new DeviceCollector()) { createDeviceTaskPartitions(partitionCount); @@ -139,6 +141,10 @@ public class ExternalTsFileQueryResource implements AutoCloseable { } } + private void acquireMemoryForTsFileReaders() { + queryContext.reserveMemoryForFrontEndImmediately((long) tsFilePaths.size() * 4 * 1024); + } + public DeviceTaskRunReader getDeviceTaskRunReader(int partitionIndex) { checkNotClosed(); DeviceTaskPartition partition = getDeviceTaskPartition(partitionIndex); @@ -403,7 +409,7 @@ public class ExternalTsFileQueryResource implements AutoCloseable { private final PriorityQueue<DeviceTaskRunCursor> runCursors; private DeviceEntry currentDevice; - private ExternalTsFileQueryDataSource currentDeviceQueryDataSource; + private QueryDataSource currentDeviceQueryDataSource; private Map<TsFileResource, DeviceOffset> currentDeviceOffsetMap; private DeviceTaskRunReader(DeviceTaskPartition partition) throws IOException { @@ -452,8 +458,7 @@ public class ExternalTsFileQueryResource implements AutoCloseable { unseqResources.add(tsFileResource); currentDeviceOffsetMap.put(tsFileResource, deviceOffset); } - currentDeviceQueryDataSource = - new ExternalTsFileQueryDataSource(ExternalTsFileQueryResource.this, unseqResources); + currentDeviceQueryDataSource = new QueryDataSource(Collections.emptyList(), unseqResources); return true; } @@ -461,7 +466,7 @@ public class ExternalTsFileQueryResource implements AutoCloseable { return currentDevice; } - public ExternalTsFileQueryDataSource getCurrentDeviceQueryDataSource() { + public QueryDataSource getCurrentDeviceQueryDataSource() { return currentDeviceQueryDataSource; }
