This is an automated email from the ASF dual-hosted git repository. shuwenwei pushed a commit to branch read_tsfile_table_function in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 9a63ebf29181b706dc638343134840419cb1f61c Author: shuwenwei <[email protected]> AuthorDate: Thu Jun 11 10:36:19 2026 +0800 fix --- .../operator/source/AlignedSeriesScanUtil.java | 4 +- .../execution/operator/source/FileLoaderUtils.java | 22 ++- .../relational/ExternalTsFileSeriesScanUtil.java | 26 +-- .../relational/MultiTsFileResourceIterator.java | 210 --------------------- .../readTsFile/ExternalTsFileQueryResource.java | 81 ++++---- .../buffer/TimeSeriesMetadataCache.java | 81 ++++---- .../execution/MemoryEstimationHelper.java | 2 +- 7 files changed, 110 insertions(+), 316 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanUtil.java index f7ddaee472e..e80f305e73b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanUtil.java @@ -38,6 +38,7 @@ import org.apache.tsfile.write.schema.IMeasurementSchema; import java.io.IOException; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; public class AlignedSeriesScanUtil extends SeriesScanUtil { @@ -100,7 +101,8 @@ public class AlignedSeriesScanUtil extends SeriesScanUtil { context, scanOptions.getGlobalTimeFilter(), isSeq, - ignoreAllNullRows); + ignoreAllNullRows, + Optional.empty()); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/FileLoaderUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/FileLoaderUtils.java index dcc38e49a61..79235360cdf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/FileLoaderUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/FileLoaderUtils.java @@ -57,6 +57,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -110,8 +111,7 @@ public class FileLoaderUtils { context.ignoreNotExistsDevice() || resource.getTimeIndexType() == ITimeIndex.FILE_TIME_INDEX_TYPE, context.isDebug(), - context, - context.isExternalTsFileScan()); + context); if (timeSeriesMetadata != null) { long t2 = System.nanoTime(); List<ModEntry> pathModifications = @@ -199,7 +199,8 @@ public class FileLoaderUtils { FragmentInstanceContext context, Filter globalTimeFilter, boolean isSeq, - boolean ignoreAllNullRows) + boolean ignoreAllNullRows, + Optional<long[]> rootMeasurementMetadataIndexNodeOffset) throws IOException { final long t1 = System.nanoTime(); boolean loadFromMem = false; @@ -213,7 +214,12 @@ public class FileLoaderUtils { if (resource.isClosed()) { alignedTimeSeriesMetadata = loadAlignedTimeSeriesMetadataFromDisk( - resource, alignedPath, context, globalTimeFilter, ignoreAllNullRows); + resource, + alignedPath, + context, + globalTimeFilter, + ignoreAllNullRows, + rootMeasurementMetadataIndexNodeOffset); } else { // if the tsfile is unclosed, we just get it directly from TsFileResource loadFromMem = true; alignedTimeSeriesMetadata = @@ -286,7 +292,8 @@ public class FileLoaderUtils { AlignedFullPath alignedPath, FragmentInstanceContext context, Filter globalTimeFilter, - boolean ignoreAllNullRows) + boolean ignoreAllNullRows, + Optional<long[]> rootMeasurementMetadataIndexNodeOffset) throws IOException { AbstractAlignedTimeSeriesMetadata alignedTimeSeriesMetadata = null; // load all the TimeseriesMetadata of vector, the first one is for time column and the @@ -298,7 +305,6 @@ public class FileLoaderUtils { boolean isDebug = context.isDebug(); String filePath = resource.getTsFilePath(); IDeviceID deviceId = alignedPath.getDeviceId(); - boolean isExternalTsFile = context.isExternalTsFileScan(); // when resource.getTimeIndexType() == 1, TsFileResource.timeIndexType is deviceTimeIndex // we should not ignore the non-exist of device in TsFileMetadata @@ -311,7 +317,7 @@ public class FileLoaderUtils { || resource.getTimeIndexType() == ITimeIndex.FILE_TIME_INDEX_TYPE, isDebug, context, - isExternalTsFile); + rootMeasurementMetadataIndexNodeOffset); if (timeColumn != null) { // only need time column, like count_time aggregation if (valueMeasurementList.isEmpty()) { @@ -341,7 +347,7 @@ public class FileLoaderUtils { || resource.getTimeIndexType() == ITimeIndex.FILE_TIME_INDEX_TYPE, isDebug, context, - isExternalTsFile); + rootMeasurementMetadataIndexNodeOffset); exist = (exist || (valueColumn != null)); valueTimeSeriesMetadataList.add(valueColumn); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileSeriesScanUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileSeriesScanUtil.java index d4d327be703..c22384160ae 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileSeriesScanUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileSeriesScanUtil.java @@ -36,6 +36,7 @@ import org.apache.tsfile.read.filter.basic.Filter; import java.io.IOException; import java.util.List; +import java.util.Optional; public class ExternalTsFileSeriesScanUtil extends AlignedSeriesScanUtil { @@ -53,24 +54,6 @@ public class ExternalTsFileSeriesScanUtil extends AlignedSeriesScanUtil { this.metadataLoader = metadataLoader; } - public ExternalTsFileSeriesScanUtil( - AlignedFullPath seriesPath, - Ordering scanOrder, - SeriesScanOptions scanOptions, - FragmentInstanceContext context, - boolean queryAllSensors, - List<TSDataType> givenDataTypes, - MultiTsFileResourceIterator resourceIterator) { - this( - seriesPath, - scanOrder, - scanOptions, - context, - queryAllSensors, - givenDataTypes, - resourceIterator::loadTimeSeriesMetadata); - } - @Override protected AbstractAlignedTimeSeriesMetadata loadTimeSeriesMetadata( TsFileResource resource, boolean isSeq) throws IOException { @@ -94,16 +77,15 @@ public class ExternalTsFileSeriesScanUtil extends AlignedSeriesScanUtil { return null; } - long[] deviceMeasurementNodeOffset = currentDeviceOffset.getMeasurementNodeOffset(); - // TODO: Use deviceMeasurementNodeOffset after FileLoaderUtils supports offset-based metadata - // loading in this branch. return FileLoaderUtils.loadAlignedTimeSeriesMetadata( resource, alignedPath, context, globalTimeFilter, resource.isSeq(), - context.isIgnoreAllNullRows()); + context.isIgnoreAllNullRows(), + Optional.of( + new long[] {currentDeviceOffset.getStartOffset(), currentDeviceOffset.getEndOffset()})); } @FunctionalInterface diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/MultiTsFileResourceIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/MultiTsFileResourceIterator.java deleted file mode 100644 index c55d336a224..00000000000 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/MultiTsFileResourceIterator.java +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.queryengine.execution.operator.source.relational; - -import org.apache.iotdb.commons.path.AlignedFullPath; -import org.apache.iotdb.commons.schema.filter.SchemaFilter; -import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext; -import org.apache.iotdb.db.queryengine.execution.operator.source.FileLoaderUtils; -import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions; -import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; - -import org.apache.tsfile.file.metadata.AbstractAlignedTimeSeriesMetadata; -import org.apache.tsfile.file.metadata.IDeviceID; -import org.apache.tsfile.read.LazyTsFileDeviceIterator; -import org.apache.tsfile.read.TsFileSequenceReader; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.function.LongConsumer; - -public class MultiTsFileResourceIterator { - - private final String tableName; - private final FragmentInstanceContext fragmentInstanceContext; - private final SeriesScanOptions seriesScanOptions; - private final SchemaFilter deviceFilter; - private final ExternalTsFileDeviceFilterVisitor deviceFilterVisitor = - new ExternalTsFileDeviceFilterVisitor(); - private final Map<TsFileResource, TsFileResourceDeviceIterator> deviceIteratorMap = - new HashMap<>(); - - private IDeviceID currentDevice; - - public MultiTsFileResourceIterator( - String tableName, - List<TsFileResource> seqResources, - List<TsFileResource> unseqResources, - Map<TsFileResource, TsFileSequenceReader> resourceReaderMap, - FragmentInstanceContext fragmentInstanceContext, - SeriesScanOptions seriesScanOptions, - SchemaFilter deviceFilter) { - this.tableName = tableName; - this.fragmentInstanceContext = fragmentInstanceContext; - this.seriesScanOptions = seriesScanOptions; - this.deviceFilter = deviceFilter; - initDeviceIterators(seqResources, resourceReaderMap); - initDeviceIterators(unseqResources, resourceReaderMap); - } - - private void initDeviceIterators( - List<TsFileResource> resources, Map<TsFileResource, TsFileSequenceReader> resourceReaderMap) { - for (TsFileResource resource : resources) { - try { - TsFileSequenceReader reader = resourceReaderMap.get(resource); - if (reader == null) { - throw new IllegalArgumentException( - "Missing external TsFile reader: " + resource.getTsFilePath()); - } - deviceIteratorMap.put(resource, new TsFileResourceDeviceIterator(resource, reader)); - } catch (IOException e) { - throw new RuntimeException( - "Failed to create device iterator for external TsFile: " + resource.getTsFilePath(), e); - } - } - } - - public boolean hasNextDevice() { - for (TsFileResourceDeviceIterator iterator : deviceIteratorMap.values()) { - if (iterator.hasNextDevice() - || (iterator.getCurrentDevice() != null - && !iterator.getCurrentDevice().equals(currentDevice))) { - return true; - } - } - return false; - } - - public IDeviceID nextDevice() { - IDeviceID nextDevice = null; - List<TsFileResource> exhaustedResources = new ArrayList<>(); - for (Map.Entry<TsFileResource, TsFileResourceDeviceIterator> entry : - deviceIteratorMap.entrySet()) { - TsFileResource resource = entry.getKey(); - TsFileResourceDeviceIterator iterator = entry.getValue(); - if (iterator.getCurrentDevice() == null - || iterator.getCurrentDevice().equals(currentDevice)) { - if (iterator.hasNextDevice()) { - if (iterator.nextDevice() == null) { - exhaustedResources.add(resource); - continue; - } - } else { - exhaustedResources.add(resource); - continue; - } - } - if (nextDevice == null || nextDevice.compareTo(iterator.getCurrentDevice()) > 0) { - nextDevice = iterator.getCurrentDevice(); - } - } - for (TsFileResource resource : exhaustedResources) { - deviceIteratorMap.remove(resource); - } - currentDevice = nextDevice; - return currentDevice; - } - - public IDeviceID getCurrentDevice() { - return currentDevice; - } - - public AbstractAlignedTimeSeriesMetadata loadTimeSeriesMetadata( - TsFileResource resource, AlignedFullPath alignedPath) throws IOException { - TsFileResourceDeviceIterator iterator = deviceIteratorMap.get(resource); - if (iterator == null - || currentDevice == null - || !currentDevice.equals(iterator.getCurrentDevice())) { - return null; - } - return iterator.loadTimeSeriesMetadata(alignedPath); - } - - public long[] getCurrentDeviceMeasurementNodeOffset(TsFileResource resource) { - TsFileResourceDeviceIterator iterator = deviceIteratorMap.get(resource); - if (iterator == null - || currentDevice == null - || !currentDevice.equals(iterator.getCurrentDevice())) { - return null; - } - return iterator.getCurrentDeviceMeasurementNodeOffset(); - } - - private boolean isDeviceMatched(IDeviceID deviceID) { - return deviceFilter == null - || Boolean.TRUE.equals(deviceFilter.accept(deviceFilterVisitor, deviceID)); - } - - private class TsFileResourceDeviceIterator { - - private final TsFileResource resource; - private final LazyTsFileDeviceIterator deviceIterator; - private IDeviceID currentDevice; - - private TsFileResourceDeviceIterator(TsFileResource resource, TsFileSequenceReader reader) - throws IOException { - this.resource = resource; - LongConsumer ioSizeRecorder = - fragmentInstanceContext.getQueryStatistics().getLoadTimeSeriesMetadataActualIOSize() - ::addAndGet; - this.deviceIterator = new LazyTsFileDeviceIterator(reader, tableName, ioSizeRecorder); - } - - private boolean hasNextDevice() { - return deviceIterator.hasNext(); - } - - private IDeviceID nextDevice() { - while (deviceIterator.hasNext()) { - IDeviceID nextDevice = deviceIterator.next(); - if (isDeviceMatched(nextDevice)) { - currentDevice = nextDevice; - return currentDevice; - } - } - currentDevice = null; - return null; - } - - private IDeviceID getCurrentDevice() { - return currentDevice; - } - - private long[] getCurrentDeviceMeasurementNodeOffset() { - return deviceIterator.getCurrentDeviceMeasurementNodeOffset(); - } - - private AbstractAlignedTimeSeriesMetadata loadTimeSeriesMetadata(AlignedFullPath alignedPath) - throws IOException { - // TODO: Pass getCurrentDeviceMeasurementNodeOffset() to FileLoaderUtils after this branch - // supports offset-based metadata loading. - return FileLoaderUtils.loadAlignedTimeSeriesMetadata( - resource, - alignedPath, - fragmentInstanceContext, - seriesScanOptions.getGlobalTimeFilter(), - resource.isSeq(), - fragmentInstanceContext.isIgnoreAllNullRows()); - } - } -} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryResource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryResource.java index bd45d94e188..4d85a11dbc6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryResource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryResource.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile; +import org.apache.iotdb.commons.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.commons.schema.filter.SchemaFilter; import org.apache.iotdb.commons.utils.FileUtils; import org.apache.iotdb.db.queryengine.common.QueryId; @@ -33,7 +34,9 @@ import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeInd import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.read.LazyTsFileDeviceIterator; import org.apache.tsfile.read.TsFileSequenceReader; +import org.apache.tsfile.utils.Accountable; import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.ReadWriteIOUtils; import java.io.BufferedInputStream; @@ -67,9 +70,7 @@ import static java.util.Objects.requireNonNull; public class ExternalTsFileQueryResource implements AutoCloseable { public static final String EXTERNAL_TSFILE_TMP_DIR = "external-tsfile"; - private static final long DEVICE_TASK_BUCKET_TARGET_SIZE_IN_BYTES = 1; - // private static final long DEVICE_TASK_BUCKET_TARGET_SIZE_IN_BYTES = 8L * 1024 * 1024; - private static final long DEVICE_OFFSET_INSTANCE_SIZE_IN_BYTES = 32L; + private static final long DEVICE_TASK_BUCKET_TARGET_SIZE_IN_BYTES = 8L * 1024 * 1024; private final QueryId queryId; private final Path queryTempRoot; @@ -78,10 +79,10 @@ public class ExternalTsFileQueryResource implements AutoCloseable { private final List<TsFileResource> tsFileResources; private final LongConsumer ioSizeRecorder; private final List<DeviceEntry> deviceEntries = new ArrayList<>(); - private List<DeviceTaskPartition> deviceTaskPartitions = Collections.emptyList(); + private final List<DeviceTaskPartition> deviceTaskPartitions = new ArrayList<>(); private Comparator<DeviceEntry> deviceEntryComparator; - private boolean closed; + private volatile boolean closed; public ExternalTsFileQueryResource( QueryId queryId, @@ -122,8 +123,7 @@ public class ExternalTsFileQueryResource implements AutoCloseable { int deviceEntryIndex = deviceEntries.size(); deviceEntries.add(deviceEntry); DeviceTask deviceTask = - new DeviceTask( - deviceEntryIndex, new ArrayList<>(deviceCollector.getCurrentDeviceOffsets())); + new DeviceTask(deviceEntryIndex, deviceCollector.getCurrentDeviceOffsets()); DeviceTaskPartition partition = deviceTaskPartitions.get( Math.floorMod(deviceID.hashCode(), deviceTaskPartitions.size())); @@ -245,7 +245,7 @@ public class ExternalTsFileQueryResource implements AutoCloseable { private void add(DeviceTask deviceTask) { pendingDeviceTasks.add(deviceTask); - estimatedSizeInBytes += estimateDeviceTaskSize(deviceTask); + estimatedSizeInBytes += deviceTask.ramBytesUsed(); } private void flush(Comparator<DeviceEntry> comparator) { @@ -332,7 +332,6 @@ public class ExternalTsFileQueryResource implements AutoCloseable { throw new IllegalArgumentException( "External TsFile device task partition count must be positive"); } - deviceTaskPartitions = new ArrayList<>(partitionCount); for (int i = 0; i < partitionCount; i++) { deviceTaskPartitions.add(new DeviceTaskPartition(i)); } @@ -365,16 +364,6 @@ public class ExternalTsFileQueryResource implements AutoCloseable { return runFile; } - private static long estimateDeviceTaskSize(DeviceTask deviceTask) { - long size = 64L; - for (DeviceOffset offset : deviceTask.deviceOffsets) { - size += - DEVICE_OFFSET_INSTANCE_SIZE_IN_BYTES - + (long) Long.BYTES * offset.measurementNodeOffset.length; - } - return size; - } - public class DeviceTaskRunReader implements AutoCloseable { private final PriorityQueue<DeviceTaskRunCursor> runCursors; @@ -572,7 +561,7 @@ public class ExternalTsFileQueryResource implements AutoCloseable { private final Map<Integer, LazyTsFileDeviceIterator> deviceIteratorMap = new HashMap<>(); private IDeviceID currentDevice; - private List<DeviceOffset> currentDeviceOffsets = Collections.emptyList(); + private List<DeviceOffset> currentDeviceOffsets; private DeviceCollector() { try { @@ -637,7 +626,9 @@ public class ExternalTsFileQueryResource implements AutoCloseable { && currentDevice.equals(deviceIterator.getCurrentDeviceID())) { deviceOffsets.add( new DeviceOffset( - entry.getKey(), deviceIterator.getCurrentDeviceMeasurementNodeOffset())); + entry.getKey(), + deviceIterator.getCurrentDeviceMeasurementNodeOffset()[0], + deviceIterator.getCurrentDeviceMeasurementNodeOffset()[1])); } } currentDeviceOffsets = deviceOffsets; @@ -654,7 +645,10 @@ public class ExternalTsFileQueryResource implements AutoCloseable { } } - private static class DeviceTask { + private static class DeviceTask implements Accountable { + + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(DeviceTask.class); private final int deviceEntryIndex; private final List<DeviceOffset> deviceOffsets; @@ -669,10 +663,8 @@ public class ExternalTsFileQueryResource implements AutoCloseable { ReadWriteIOUtils.write(deviceOffsets.size(), outputStream); for (DeviceOffset offset : deviceOffsets) { ReadWriteIOUtils.write(offset.fileIndex, outputStream); - ReadWriteIOUtils.write(offset.measurementNodeOffset.length, outputStream); - for (long measurementNodeOffset : offset.measurementNodeOffset) { - ReadWriteIOUtils.write(measurementNodeOffset, outputStream); - } + ReadWriteIOUtils.write(offset.startOffset, outputStream); + ReadWriteIOUtils.write(offset.endOffset, outputStream); } } @@ -682,33 +674,48 @@ public class ExternalTsFileQueryResource implements AutoCloseable { List<DeviceOffset> offsets = new ArrayList<>(offsetSize); for (int i = 0; i < offsetSize; i++) { int fileIndex = ReadWriteIOUtils.readInt(inputStream); - int measurementNodeOffsetLength = ReadWriteIOUtils.readInt(inputStream); - long[] measurementNodeOffset = new long[measurementNodeOffsetLength]; - for (int j = 0; j < measurementNodeOffsetLength; j++) { - measurementNodeOffset[j] = inputStream.readLong(); - } - offsets.add(new DeviceOffset(fileIndex, measurementNodeOffset)); + long startOffset = inputStream.readLong(); + long endOffset = inputStream.readLong(); + offsets.add(new DeviceOffset(fileIndex, startOffset, endOffset)); } return new DeviceTask(deviceEntryIndex, offsets); } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + + MemoryEstimationHelper.ARRAY_LIST_INSTANCE_SIZE + + RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + + (long) RamUsageEstimator.NUM_BYTES_OBJECT_REF * deviceOffsets.size() + + deviceOffsets.size() * DeviceOffset.INSTANCE_SIZE; + } } public static class DeviceOffset { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(DeviceOffset.class); + private final int fileIndex; - private final long[] measurementNodeOffset; + private final long startOffset; + private final long endOffset; - private DeviceOffset(int fileIndex, long[] measurementNodeOffset) { + private DeviceOffset(int fileIndex, long startOffset, long endOffset) { this.fileIndex = fileIndex; - this.measurementNodeOffset = measurementNodeOffset; + this.startOffset = startOffset; + this.endOffset = endOffset; } public int getFileIndex() { return fileIndex; } - public long[] getMeasurementNodeOffset() { - return measurementNodeOffset; + public long getStartOffset() { + return startOffset; + } + + public long getEndOffset() { + return endOffset; } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCache.java index ba26db0f1d7..7eacf5928f1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCache.java @@ -50,6 +50,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.WeakHashMap; import java.util.concurrent.atomic.AtomicLong; @@ -119,7 +120,7 @@ public class TimeSeriesMetadataCache { boolean debug, QueryContext queryContext) throws IOException { - return get(filePath, key, allSensors, ignoreNotExists, debug, queryContext, false); + return get(filePath, key, allSensors, ignoreNotExists, debug, queryContext, Optional.empty()); } @SuppressWarnings({"squid:S1860", "squid:S6541", "squid:S3776"}) // Suppress synchronize warning @@ -130,7 +131,7 @@ public class TimeSeriesMetadataCache { boolean ignoreNotExists, boolean debug, QueryContext queryContext, - boolean externalTsFile) + Optional<long[]> deviceMetadataIndexNodeOffset) throws IOException { long startTime = System.nanoTime(); long loadBloomFilterTime = 0; @@ -139,6 +140,7 @@ public class TimeSeriesMetadataCache { LongConsumer bloomFilterIoSizeRecorder = queryContext.getQueryStatistics().getLoadBloomFilterActualIOSize()::addAndGet; boolean cacheHit = true; + boolean externalTsFile = queryContext.isExternalTsFileScan(); try { if (!CACHE_ENABLE || externalTsFile) { String deviceStringFormat = key.device.toString(); @@ -148,19 +150,25 @@ public class TimeSeriesMetadataCache { TsFileSequenceReader reader = FileReaderManager.getInstance() .get(filePath, key.tsFileID, true, bloomFilterIoSizeRecorder, externalTsFile); - BloomFilter bloomFilter = reader.readBloomFilter(bloomFilterIoSizeRecorder); - queryContext.getQueryStatistics().getLoadBloomFilterFromDiskCount().incrementAndGet(); - if (bloomFilter != null - && !bloomFilter.contains( - deviceStringFormat + IoTDBConstant.PATH_SEPARATOR + key.measurement)) { + if (!deviceMetadataIndexNodeOffset.isPresent()) { + BloomFilter bloomFilter = reader.readBloomFilter(bloomFilterIoSizeRecorder); + queryContext.getQueryStatistics().getLoadBloomFilterFromDiskCount().incrementAndGet(); + if (bloomFilter != null + && !bloomFilter.contains( + deviceStringFormat + IoTDBConstant.PATH_SEPARATOR + key.measurement)) { + loadBloomFilterTime = System.nanoTime() - startTime; + return null; + } loadBloomFilterTime = System.nanoTime() - startTime; - return null; } - loadBloomFilterTime = System.nanoTime() - startTime; TimeseriesMetadata timeseriesMetadata = reader.readTimeseriesMetadata( - key.device, key.measurement, ignoreNotExists, timeSeriesMetadataIoSizeRecorder); + key.device, + deviceMetadataIndexNodeOffset, + key.measurement, + ignoreNotExists, + timeSeriesMetadataIoSizeRecorder); return (timeseriesMetadata == null || timeseriesMetadata.getStatistics().getCount() == 0) ? null : timeseriesMetadata; @@ -184,41 +192,40 @@ public class TimeSeriesMetadataCache { if (timeseriesMetadata == null) { cacheHit = false; - long loadBloomFilterStartTime = System.nanoTime(); - // bloom filter part - BloomFilter bloomFilter = - BloomFilterCache.getInstance() - .get( - new BloomFilterCache.BloomFilterCacheKey(filePath, key.tsFileID), - debug, - bloomFilterIoSizeRecorder, - queryContext.getQueryStatistics().getLoadBloomFilterFromCacheCount() - ::addAndGet, - queryContext.getQueryStatistics().getLoadBloomFilterFromDiskCount() - ::addAndGet, - externalTsFile); - if (bloomFilter != null - && !bloomFilter.contains( - deviceStringFormat + TsFileConstant.PATH_SEPARATOR + key.measurement)) { - if (debug) { - DEBUG_LOGGER.info(StorageEngineMessages.TS_METADATA_FILTERED_BY_BLOOM_FILTER, key); + if (!deviceMetadataIndexNodeOffset.isPresent()) { + long loadBloomFilterStartTime = System.nanoTime(); + // bloom filter part + BloomFilter bloomFilter = + BloomFilterCache.getInstance() + .get( + new BloomFilterCache.BloomFilterCacheKey(filePath, key.tsFileID), + debug, + bloomFilterIoSizeRecorder, + queryContext.getQueryStatistics().getLoadBloomFilterFromCacheCount() + ::addAndGet, + queryContext.getQueryStatistics().getLoadBloomFilterFromDiskCount() + ::addAndGet, + false); + if (bloomFilter != null + && !bloomFilter.contains( + deviceStringFormat + TsFileConstant.PATH_SEPARATOR + key.measurement)) { + if (debug) { + DEBUG_LOGGER.info( + StorageEngineMessages.TS_METADATA_FILTERED_BY_BLOOM_FILTER, key); + } + loadBloomFilterTime = System.nanoTime() - loadBloomFilterStartTime; + return null; } + loadBloomFilterTime = System.nanoTime() - loadBloomFilterStartTime; - return null; } - - loadBloomFilterTime = System.nanoTime() - loadBloomFilterStartTime; TsFileSequenceReader reader = FileReaderManager.getInstance() - .get( - filePath, - key.tsFileID, - true, - timeSeriesMetadataIoSizeRecorder, - externalTsFile); + .get(filePath, key.tsFileID, true, timeSeriesMetadataIoSizeRecorder, false); List<TimeseriesMetadata> timeSeriesMetadataList = reader.readTimeseriesMetadata( key.device, + deviceMetadataIndexNodeOffset, key.measurement, allSensors, ignoreNotExists, diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/execution/MemoryEstimationHelper.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/execution/MemoryEstimationHelper.java index 95796129011..e1436a11538 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/execution/MemoryEstimationHelper.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/execution/MemoryEstimationHelper.java @@ -44,7 +44,7 @@ public class MemoryEstimationHelper { private static final long MEASUREMENT_PATH_INSTANCE_SIZE = RamUsageEstimator.shallowSizeOfInstance(AlignedPath.class); - private static final long ARRAY_LIST_INSTANCE_SIZE = + public static final long ARRAY_LIST_INSTANCE_SIZE = RamUsageEstimator.shallowSizeOfInstance(ArrayList.class); private static final long INTEGER_INSTANCE_SIZE = RamUsageEstimator.shallowSizeOfInstance(Integer.class);
