This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch MetadataIndex in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 1c1252232a04cb34c42a97b505fc8162f617c2c3 Author: JackieTien97 <[email protected]> AuthorDate: Sun Apr 25 15:24:46 2021 +0800 init --- .../cluster/query/reader/ClusterReaderFactory.java | 3 +- .../db/engine/cache/TimeSeriesMetadataCache.java | 119 +++++++++++++++++ .../db/engine/storagegroup/TsFileResource.java | 15 +-- .../db/query/executor/fill/LastPointReader.java | 48 ++----- .../query/reader/series/SeriesAggregateReader.java | 2 +- .../reader/series/SeriesRawDataBatchReader.java | 4 +- .../iotdb/db/query/reader/series/SeriesReader.java | 7 +- .../reader/series/SeriesReaderByTimestamp.java | 2 +- .../query/reader/series/SeriesReaderFactory.java | 94 -------------- .../db/query/reader/series/VectorSeriesReader.java | 144 --------------------- .../org/apache/iotdb/db/utils/FileLoaderUtils.java | 93 ++++++++++++- .../tsfile/file/metadata/ITimeSeriesMetadata.java | 3 + .../file/metadata/VectorTimeSeriesMetadata.java | 16 +++ .../iotdb/tsfile/read/TsFileSequenceReader.java | 10 ++ 14 files changed, 264 insertions(+), 296 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java index bd45755..261f5cd 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java @@ -63,7 +63,6 @@ import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader; import org.apache.iotdb.db.query.reader.series.SeriesRawDataPointReader; import org.apache.iotdb.db.query.reader.series.SeriesReader; import org.apache.iotdb.db.query.reader.series.SeriesReaderByTimestamp; -import org.apache.iotdb.db.query.reader.series.SeriesReaderFactory; import org.apache.iotdb.db.utils.SerializeUtils; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; @@ -515,7 +514,7 @@ public class ClusterReaderFactory { ((SlotPartitionTable) metaGroupMember.getPartitionTable()).getNodeSlots(header); QueryDataSource queryDataSource = QueryResourceManager.getInstance().getQueryDataSource(path, context, timeFilter); - return SeriesReaderFactory.createSeriesReader( + return new SeriesReader( path, allSensors, dataType, diff --git a/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java b/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java index 88d076d..2b5ed33 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java @@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.lang.ref.WeakReference; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; @@ -225,6 +226,124 @@ public class TimeSeriesMetadataCache { } } + @SuppressWarnings("squid:S1860") // Suppress synchronize warning + public List<TimeseriesMetadata> get( + TimeSeriesMetadataCacheKey key, + List<String> subSensorList, + Set<String> allSensors, + boolean debug) + throws IOException { + if (!CACHE_ENABLE) { + // bloom filter part + TsFileSequenceReader reader = FileReaderManager.getInstance().get(key.filePath, true); + BloomFilter bloomFilter = reader.readBloomFilter(); + if (bloomFilter != null + && !bloomFilter.contains(key.device + IoTDBConstant.PATH_SEPARATOR + key.measurement)) { + return null; + } + return reader.readTimeseriesMetadata(new Path(key.device, key.measurement), subSensorList); + } + + cacheRequestNum.incrementAndGet(); + + List<TimeseriesMetadata> res = new ArrayList<>(); + + getVectorTimeSeriesMetadataListFromCache(key, subSensorList, res); + + if (!res.isEmpty()) { + cacheHitNum.incrementAndGet(); + printCacheLog(true); + } else { + if (debug) { + DEBUG_LOGGER.info( + "Cache miss: {}.{} in file: {}", key.device, key.measurement, key.filePath); + DEBUG_LOGGER.info("Device: {}, all sensors: {}", key.device, allSensors); + } + // allow for the parallelism of different devices + synchronized ( + devices.computeIfAbsent(key.device + SEPARATOR + key.filePath, WeakReference::new)) { + // double check + getVectorTimeSeriesMetadataListFromCache(key, subSensorList, res); + if (!res.isEmpty()) { + cacheHitNum.incrementAndGet(); + printCacheLog(true); + } else { + Path path = new Path(key.device, key.measurement); + // bloom filter part + TsFileSequenceReader reader = FileReaderManager.getInstance().get(key.filePath, true); + BloomFilter bloomFilter = reader.readBloomFilter(); + if (bloomFilter != null && !bloomFilter.contains(path.getFullPath())) { + if (debug) { + DEBUG_LOGGER.info("TimeSeries meta data {} is filter by bloomFilter!", key); + } + return null; + } + printCacheLog(false); + List<TimeseriesMetadata> timeSeriesMetadataList = + reader.readTimeseriesMetadata(path, allSensors); + // put TimeSeriesMetadata of all sensors used in this query into cache + lock.writeLock().lock(); + try { + timeSeriesMetadataList.forEach( + metadata -> { + TimeSeriesMetadataCacheKey k = + new TimeSeriesMetadataCacheKey( + key.filePath, key.device, metadata.getMeasurementId()); + if (!lruCache.containsKey(k)) { + lruCache.put(k, metadata); + } + }); + getVectorTimeSeriesMetadataListFromCache(key, subSensorList, res); + } finally { + lock.writeLock().unlock(); + } + } + } + } + if (res.isEmpty()) { + if (debug) { + DEBUG_LOGGER.info("The file doesn't have this time series {}.", key); + } + return null; + } else { + if (debug) { + DEBUG_LOGGER.info( + "Get timeseries: {}.{} metadata in file: {} from cache: {}.", + key.device, + key.measurement, + key.filePath, + res); + } + for (int i = 0; i < res.size(); i++) { + res.set(i, new TimeseriesMetadata(res.get(i))); + } + return res; + } + } + + private void getVectorTimeSeriesMetadataListFromCache( + TimeSeriesMetadataCacheKey key, List<String> subSensorList, List<TimeseriesMetadata> res) { + lock.readLock().lock(); + try { + TimeseriesMetadata timeseriesMetadata = lruCache.get(key); + if (timeseriesMetadata != null) { + res.add(timeseriesMetadata); + for (String subSensor : subSensorList) { + timeseriesMetadata = + lruCache.get(new TimeSeriesMetadataCacheKey(key.filePath, key.device, subSensor)); + if (timeseriesMetadata != null) { + res.add(timeseriesMetadata); + } else { + res.clear(); + break; + } + } + } + } finally { + lock.readLock().unlock(); + } + } + private void printCacheLog(boolean isHit) { if (!logger.isDebugEnabled()) { return; diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java index c6a7abc..17814b6 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java @@ -667,19 +667,8 @@ public class TsFileResource { * * @return TimeseriesMetadata or the first ValueTimeseriesMetadata in VectorTimeseriesMetadata */ - public TimeseriesMetadata getTimeSeriesMetadata() { - if (timeSeriesMetadata == null) { - return null; - } - if (timeSeriesMetadata instanceof TimeseriesMetadata) { - return (TimeseriesMetadata) timeSeriesMetadata; - } else { - // it's ok for us to return the first value timeseries metadata, - // because the MemChunkMetadataLoader is not depend on the timeseries metadata - return ((VectorTimeSeriesMetadata) timeSeriesMetadata) - .getValueTimeseriesMetadataList() - .get(0); - } + public ITimeSeriesMetadata getTimeSeriesMetadata() { + return timeSeriesMetadata; } public void setUpgradedResources(List<TsFileResource> upgradedResources) { diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/fill/LastPointReader.java b/server/src/main/java/org/apache/iotdb/db/query/executor/fill/LastPointReader.java index e7eb64f..5c3ccb3 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/executor/fill/LastPointReader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/fill/LastPointReader.java @@ -21,14 +21,11 @@ package org.apache.iotdb.db.query.executor.fill; import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.metadata.PartialPath; -import org.apache.iotdb.db.metadata.VectorPartialPath; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.utils.FileLoaderUtils; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata; -import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; -import org.apache.iotdb.tsfile.file.metadata.VectorTimeSeriesMetadata; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.TimeValuePair; @@ -39,7 +36,6 @@ import org.apache.iotdb.tsfile.utils.TsPrimitiveType; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.PriorityQueue; import java.util.Set; @@ -58,9 +54,9 @@ public class LastPointReader { private QueryDataSource dataSource; - private ChunkMetadata cachedLastChunk; + private IChunkMetadata cachedLastChunk; - private List<TimeseriesMetadata> unseqTimeseriesMetadataList = new ArrayList<>(); + private List<ITimeSeriesMetadata> unseqTimeseriesMetadataList = new ArrayList<>(); public LastPointReader() {} @@ -85,10 +81,10 @@ public class LastPointReader { TimeValuePair resultPoint = retrieveValidLastPointFromSeqFiles(); UnpackOverlappedUnseqFiles(resultPoint.getTimestamp()); - PriorityQueue<ChunkMetadata> sortedChunkMetatdataList = sortUnseqChunkMetadatasByEndtime(); + PriorityQueue<IChunkMetadata> sortedChunkMetatdataList = sortUnseqChunkMetadatasByEndtime(); while (!sortedChunkMetatdataList.isEmpty() && resultPoint.getTimestamp() <= sortedChunkMetatdataList.peek().getEndTime()) { - ChunkMetadata chunkMetadata = sortedChunkMetatdataList.poll(); + IChunkMetadata chunkMetadata = sortedChunkMetatdataList.poll(); TimeValuePair chunkLastPoint = getChunkLastPoint(chunkMetadata); if (chunkLastPoint.getTimestamp() > resultPoint.getTimestamp() || (chunkLastPoint.getTimestamp() == resultPoint.getTimestamp() @@ -108,23 +104,9 @@ public class LastPointReader { for (int index = seqFileResource.size() - 1; index >= 0; index--) { TsFileResource resource = seqFileResource.get(index); ITimeSeriesMetadata timeseriesMetadata; - if (seriesPath instanceof VectorPartialPath) { - timeseriesMetadata = - new VectorTimeSeriesMetadata( - FileLoaderUtils.loadTimeSeriesMetadata( - resource, seriesPath, context, timeFilter, deviceMeasurements), - Collections.singletonList( - FileLoaderUtils.loadTimeSeriesMetadata( - resource, - ((VectorPartialPath) seriesPath).getSubSensorsPathList().get(0), - context, - timeFilter, - deviceMeasurements))); - } else { - timeseriesMetadata = - FileLoaderUtils.loadTimeSeriesMetadata( - resource, seriesPath, context, timeFilter, deviceMeasurements); - } + timeseriesMetadata = + FileLoaderUtils.loadTimeSeriesMetadata( + resource, seriesPath, context, timeFilter, deviceMeasurements); if (timeseriesMetadata != null) { if (!timeseriesMetadata.isModified() && endtimeContainedByTimeFilter(timeseriesMetadata.getStatistics())) { @@ -155,7 +137,7 @@ public class LastPointReader { while (!unseqFileResource.isEmpty() && (lBoundTime <= unseqFileResource.peek().getEndTime(seriesPath.getDevice()))) { - TimeseriesMetadata timeseriesMetadata = + ITimeSeriesMetadata timeseriesMetadata = FileLoaderUtils.loadTimeSeriesMetadata( unseqFileResource.poll(), seriesPath, context, timeFilter, deviceMeasurements); @@ -175,7 +157,7 @@ public class LastPointReader { } } - private TimeValuePair getChunkLastPoint(ChunkMetadata chunkMetaData) throws IOException { + private TimeValuePair getChunkLastPoint(IChunkMetadata chunkMetaData) throws IOException { TimeValuePair lastPoint = new TimeValuePair(Long.MIN_VALUE, null); if (chunkMetaData == null) { return lastPoint; @@ -204,7 +186,7 @@ public class LastPointReader { return lastPoint; } - private boolean shouldUpdate(ChunkMetadata cachedChunk, ChunkMetadata newChunk) { + private boolean shouldUpdate(IChunkMetadata cachedChunk, IChunkMetadata newChunk) { return (newChunk.getVersion() > cachedChunk.getVersion()) || (newChunk.getVersion() == cachedChunk.getVersion() && newChunk.getOffsetOfChunkHeader() > cachedChunk.getOffsetOfChunkHeader()); @@ -223,8 +205,8 @@ public class LastPointReader { return unseqTsFilesSet; } - private PriorityQueue<ChunkMetadata> sortUnseqChunkMetadatasByEndtime() throws IOException { - PriorityQueue<ChunkMetadata> chunkMetadataList = + private PriorityQueue<IChunkMetadata> sortUnseqChunkMetadatasByEndtime() throws IOException { + PriorityQueue<IChunkMetadata> chunkMetadataList = new PriorityQueue<>( (o1, o2) -> { long endTime1 = o1.getEndTime(); @@ -241,11 +223,9 @@ public class LastPointReader { ? -1 : Long.compare(o2.getOffsetOfChunkHeader(), o1.getOffsetOfChunkHeader())); }); - for (TimeseriesMetadata timeseriesMetadata : unseqTimeseriesMetadataList) { + for (ITimeSeriesMetadata timeseriesMetadata : unseqTimeseriesMetadataList) { if (timeseriesMetadata != null) { - for (IChunkMetadata chunkMetadata : timeseriesMetadata.loadChunkMetadataList()) { - chunkMetadataList.add((ChunkMetadata) chunkMetadata); - } + chunkMetadataList.addAll(timeseriesMetadata.loadChunkMetadataList()); } } return chunkMetadataList; diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReader.java index a76327d..04cb907 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReader.java @@ -45,7 +45,7 @@ public class SeriesAggregateReader implements IAggregateReader { TsFileFilter fileFilter, boolean ascending) { this.seriesReader = - SeriesReaderFactory.createSeriesReader( + new SeriesReader( seriesPath, allSensors, dataType, diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java index d53db04..8698955 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesRawDataBatchReader.java @@ -58,7 +58,7 @@ public class SeriesRawDataBatchReader implements ManagedSeriesReader { TsFileFilter fileFilter, boolean ascending) { this.seriesReader = - SeriesReaderFactory.createSeriesReader( + new SeriesReader( seriesPath, allSensors, dataType, @@ -84,7 +84,7 @@ public class SeriesRawDataBatchReader implements ManagedSeriesReader { Set<String> allSensors = new HashSet<>(); allSensors.add(seriesPath.getMeasurement()); this.seriesReader = - SeriesReaderFactory.createSeriesReader( + new SeriesReader( seriesPath, allSensors, dataType, diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java index 041fa76..3181d10 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java @@ -34,7 +34,6 @@ import org.apache.iotdb.db.utils.QueryUtils; import org.apache.iotdb.db.utils.TestOnly; import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata; -import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.TimeValuePair; @@ -125,6 +124,7 @@ public class SeriesReader { boolean ascending) { this.seriesPath = seriesPath; this.allSensors = allSensors; + this.allSensors.add(seriesPath.getMeasurement()); this.dataType = dataType; this.context = context; QueryUtils.filterQueryDataSource(dataSource, fileFilter); @@ -168,6 +168,7 @@ public class SeriesReader { boolean ascending) { this.seriesPath = seriesPath; this.allSensors = allSensors; + this.allSensors.add(seriesPath.getMeasurement()); this.dataType = dataType; this.context = context; this.timeFilter = timeFilter; @@ -935,7 +936,7 @@ public class SeriesReader { } protected void unpackSeqTsFileResource() throws IOException { - TimeseriesMetadata timeseriesMetadata = + ITimeSeriesMetadata timeseriesMetadata = FileLoaderUtils.loadTimeSeriesMetadata( orderUtils.getNextSeqFileResource(seqFileResource, true), seriesPath, @@ -949,7 +950,7 @@ public class SeriesReader { } protected void unpackUnseqTsFileResource() throws IOException { - TimeseriesMetadata timeseriesMetadata = + ITimeSeriesMetadata timeseriesMetadata = FileLoaderUtils.loadTimeSeriesMetadata( unseqFileResource.remove(0), seriesPath, context, getAnyFilter(), allSensors); if (timeseriesMetadata != null) { diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java index 543c706..27796ff 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java @@ -48,7 +48,7 @@ public class SeriesReaderByTimestamp implements IReaderByTimestamp { UnaryFilter timeFilter = ascending ? TimeFilter.gtEq(Long.MIN_VALUE) : TimeFilter.ltEq(Long.MAX_VALUE); this.seriesReader = - SeriesReaderFactory.createSeriesReader( + new SeriesReader( seriesPath, allSensors, dataType, diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderFactory.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderFactory.java deleted file mode 100644 index 577e021..0000000 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderFactory.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.query.reader.series; - -import org.apache.iotdb.db.engine.querycontext.QueryDataSource; -import org.apache.iotdb.db.engine.storagegroup.TsFileResource; -import org.apache.iotdb.db.metadata.PartialPath; -import org.apache.iotdb.db.metadata.VectorPartialPath; -import org.apache.iotdb.db.query.context.QueryContext; -import org.apache.iotdb.db.query.filter.TsFileFilter; -import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.read.filter.basic.Filter; - -import java.util.List; -import java.util.Set; - -public class SeriesReaderFactory { - - private SeriesReaderFactory() {} - - public static SeriesReader createSeriesReader( - PartialPath seriesPath, - Set<String> allSensors, - TSDataType dataType, - QueryContext context, - QueryDataSource dataSource, - Filter timeFilter, - Filter valueFilter, - TsFileFilter fileFilter, - boolean ascending) { - if (seriesPath instanceof VectorPartialPath) { - return new VectorSeriesReader( - seriesPath, - allSensors, - dataType, - context, - dataSource, - timeFilter, - valueFilter, - fileFilter, - ascending); - } else { - return new SeriesReader( - seriesPath, - allSensors, - dataType, - context, - dataSource, - timeFilter, - valueFilter, - fileFilter, - ascending); - } - } - - public static SeriesReader createSeriesReader( - PartialPath seriesPath, - Set<String> allSensors, - TSDataType dataType, - QueryContext context, - List<TsFileResource> seqFileResource, - List<TsFileResource> unseqFileResource, - Filter timeFilter, - Filter valueFilter, - boolean ascending) { - return new SeriesReader( - seriesPath, - allSensors, - dataType, - context, - seqFileResource, - unseqFileResource, - timeFilter, - valueFilter, - ascending); - } -} diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/VectorSeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/VectorSeriesReader.java deleted file mode 100644 index 6de0150..0000000 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/VectorSeriesReader.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.query.reader.series; - -import org.apache.iotdb.db.engine.querycontext.QueryDataSource; -import org.apache.iotdb.db.engine.storagegroup.TsFileResource; -import org.apache.iotdb.db.metadata.PartialPath; -import org.apache.iotdb.db.metadata.VectorPartialPath; -import org.apache.iotdb.db.query.context.QueryContext; -import org.apache.iotdb.db.query.filter.TsFileFilter; -import org.apache.iotdb.db.utils.FileLoaderUtils; -import org.apache.iotdb.db.utils.TestOnly; -import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; -import org.apache.iotdb.tsfile.file.metadata.VectorTimeSeriesMetadata; -import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.read.filter.basic.Filter; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Set; - -public class VectorSeriesReader extends SeriesReader { - - private final VectorPartialPath vectorPartialPath; - - public VectorSeriesReader( - PartialPath seriesPath, - Set<String> allSensors, - TSDataType dataType, - QueryContext context, - QueryDataSource dataSource, - Filter timeFilter, - Filter valueFilter, - TsFileFilter fileFilter, - boolean ascending) { - super( - seriesPath, - allSensors, - dataType, - context, - dataSource, - timeFilter, - valueFilter, - fileFilter, - ascending); - this.allSensors.add(seriesPath.getMeasurement()); - this.vectorPartialPath = (VectorPartialPath) seriesPath; - } - - @TestOnly - VectorSeriesReader( - PartialPath seriesPath, - Set<String> allSensors, - TSDataType dataType, - QueryContext context, - List<TsFileResource> seqFileResource, - List<TsFileResource> unseqFileResource, - Filter timeFilter, - Filter valueFilter, - boolean ascending) { - super( - seriesPath, - allSensors, - dataType, - context, - seqFileResource, - unseqFileResource, - timeFilter, - valueFilter, - ascending); - this.allSensors.add(seriesPath.getMeasurement()); - this.vectorPartialPath = (VectorPartialPath) seriesPath; - } - - @Override - protected void unpackSeqTsFileResource() throws IOException { - TsFileResource resource = orderUtils.getNextSeqFileResource(seqFileResource, true); - TimeseriesMetadata timeseriesMetadata = - FileLoaderUtils.loadTimeSeriesMetadata( - resource, vectorPartialPath, context, getAnyFilter(), allSensors); - if (timeseriesMetadata != null) { - timeseriesMetadata.setSeq(true); - List<TimeseriesMetadata> valueTimeseriesMetadataList = new ArrayList<>(); - for (PartialPath subSensor : vectorPartialPath.getSubSensorsPathList()) { - TimeseriesMetadata valueTimeSeriesMetadata = - FileLoaderUtils.loadTimeSeriesMetadata( - resource, subSensor, context, getAnyFilter(), allSensors); - if (valueTimeSeriesMetadata == null) { - throw new IOException("File doesn't contains value"); - } - valueTimeSeriesMetadata.setSeq(true); - valueTimeseriesMetadataList.add(valueTimeSeriesMetadata); - } - VectorTimeSeriesMetadata vectorTimeSeriesMetadata = - new VectorTimeSeriesMetadata(timeseriesMetadata, valueTimeseriesMetadataList); - seqTimeSeriesMetadata.add(vectorTimeSeriesMetadata); - } - } - - @Override - protected void unpackUnseqTsFileResource() throws IOException { - TsFileResource resource = unseqFileResource.remove(0); - TimeseriesMetadata timeseriesMetadata = - FileLoaderUtils.loadTimeSeriesMetadata( - resource, vectorPartialPath, context, getAnyFilter(), allSensors); - if (timeseriesMetadata != null) { - timeseriesMetadata.setModified(true); - timeseriesMetadata.setSeq(false); - List<TimeseriesMetadata> valueTimeseriesMetadataList = new ArrayList<>(); - for (PartialPath subSensor : vectorPartialPath.getSubSensorsPathList()) { - TimeseriesMetadata valueTimeSeriesMetadata = - FileLoaderUtils.loadTimeSeriesMetadata( - resource, subSensor, context, getAnyFilter(), allSensors); - if (valueTimeSeriesMetadata == null) { - throw new IOException("File contains value"); - } - timeseriesMetadata.setModified(true); - valueTimeSeriesMetadata.setSeq(false); - valueTimeseriesMetadataList.add(valueTimeSeriesMetadata); - } - VectorTimeSeriesMetadata vectorTimeSeriesMetadata = - new VectorTimeSeriesMetadata(timeseriesMetadata, valueTimeseriesMetadataList); - unSeqTimeSeriesMetadata.add(vectorTimeSeriesMetadata); - } - } -} diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java index 6464c4a..c9c3090 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java @@ -22,6 +22,7 @@ import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache; import org.apache.iotdb.db.engine.modification.Modification; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.metadata.PartialPath; +import org.apache.iotdb.db.metadata.VectorPartialPath; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.control.FileReaderManager; import org.apache.iotdb.db.query.reader.chunk.MemChunkLoader; @@ -33,6 +34,7 @@ import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata; import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; import org.apache.iotdb.tsfile.file.metadata.VectorChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.VectorTimeSeriesMetadata; import org.apache.iotdb.tsfile.read.TsFileSequenceReader; import org.apache.iotdb.tsfile.read.common.Chunk; import org.apache.iotdb.tsfile.read.common.Path; @@ -48,6 +50,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map.Entry; import java.util.Set; +import java.util.stream.Collectors; public class FileLoaderUtils { @@ -89,14 +92,26 @@ public class FileLoaderUtils { * @param allSensors measurements queried at the same time of this device * @param filter any filter, only used to check time range */ - public static TimeseriesMetadata loadTimeSeriesMetadata( + public static ITimeSeriesMetadata loadTimeSeriesMetadata( TsFileResource resource, PartialPath seriesPath, QueryContext context, Filter filter, Set<String> allSensors) throws IOException { - TimeseriesMetadata timeSeriesMetadata; + // deal with vector + if (seriesPath instanceof VectorPartialPath) { + return loadVectorTimeSeriesMetadata( + resource, + seriesPath, + ((VectorPartialPath) seriesPath).getSubSensorsPathList(), + context, + filter, + allSensors); + } + + // common path + ITimeSeriesMetadata timeSeriesMetadata; if (resource.isClosed()) { if (!resource.getTsFile().exists()) { return null; @@ -140,6 +155,80 @@ public class FileLoaderUtils { return timeSeriesMetadata; } + private static VectorTimeSeriesMetadata loadVectorTimeSeriesMetadata( + TsFileResource resource, + PartialPath seriesPath, + List<PartialPath> subSensorList, + QueryContext context, + Filter filter, + Set<String> allSensors) + throws IOException { + VectorTimeSeriesMetadata vectorTimeSeriesMetadata = null; + if (resource.isClosed()) { + if (!resource.getTsFile().exists()) { + return null; + } + List<TimeseriesMetadata> timeSeriesMetadata = + TimeSeriesMetadataCache.getInstance() + .get( + new TimeSeriesMetadataCache.TimeSeriesMetadataCacheKey( + resource.getTsFilePath(), + seriesPath.getDevice(), + seriesPath.getMeasurement()), + subSensorList.stream() + .map(PartialPath::getMeasurement) + .collect(Collectors.toList()), + allSensors, + context.isDebug()); + if (timeSeriesMetadata != null) { + timeSeriesMetadata + .get(0) + .setChunkMetadataLoader( + new DiskChunkMetadataLoader(resource, seriesPath, context, filter)); + for (int i = 1; i < timeSeriesMetadata.size(); i++) { + timeSeriesMetadata + .get(i) + .setChunkMetadataLoader( + new DiskChunkMetadataLoader(resource, subSensorList.get(i - 1), context, filter)); + } + vectorTimeSeriesMetadata = + new VectorTimeSeriesMetadata( + timeSeriesMetadata.get(0), + timeSeriesMetadata.subList(1, timeSeriesMetadata.size())); + } + } else { + vectorTimeSeriesMetadata = (VectorTimeSeriesMetadata) resource.getTimeSeriesMetadata(); + if (vectorTimeSeriesMetadata != null) { + vectorTimeSeriesMetadata.setChunkMetadataLoader( + new MemChunkMetadataLoader(resource, seriesPath, context, filter)); + } + } + + if (vectorTimeSeriesMetadata != null) { + List<Modification> pathModifications = + context.getPathModifications(resource.getModFile(), seriesPath); + vectorTimeSeriesMetadata.getTimeseriesMetadata().setModified(!pathModifications.isEmpty()); + if (vectorTimeSeriesMetadata.getTimeseriesMetadata().getStatistics().getStartTime() + > vectorTimeSeriesMetadata.getTimeseriesMetadata().getStatistics().getEndTime()) { + return null; + } + if (filter != null + && !filter.satisfyStartEndTime( + vectorTimeSeriesMetadata.getTimeseriesMetadata().getStatistics().getStartTime(), + vectorTimeSeriesMetadata.getTimeseriesMetadata().getStatistics().getEndTime())) { + return null; + } + List<TimeseriesMetadata> valueTimeSeriesMetadataList = + vectorTimeSeriesMetadata.getValueTimeseriesMetadataList(); + for (int i = 0; i < valueTimeSeriesMetadataList.size(); i++) { + pathModifications = + context.getPathModifications(resource.getModFile(), subSensorList.get(i)); + valueTimeSeriesMetadataList.get(i).setModified(!pathModifications.isEmpty()); + } + } + return vectorTimeSeriesMetadata; + } + /** * load all chunk metadata of one time series in one file. * diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ITimeSeriesMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ITimeSeriesMetadata.java index b508f57..6bb6ec1 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ITimeSeriesMetadata.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ITimeSeriesMetadata.java @@ -19,6 +19,7 @@ package org.apache.iotdb.tsfile.file.metadata; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; +import org.apache.iotdb.tsfile.read.controller.IChunkMetadataLoader; import java.io.IOException; import java.util.List; @@ -38,4 +39,6 @@ public interface ITimeSeriesMetadata { List<IChunkMetadata> loadChunkMetadataList() throws IOException; List<IChunkMetadata> getChunkMetadataList(); + + void setChunkMetadataLoader(IChunkMetadataLoader chunkMetadataLoader); } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorTimeSeriesMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorTimeSeriesMetadata.java index 67e6c0c..f82194d 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorTimeSeriesMetadata.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorTimeSeriesMetadata.java @@ -19,6 +19,7 @@ package org.apache.iotdb.tsfile.file.metadata; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; +import org.apache.iotdb.tsfile.read.controller.IChunkMetadataLoader; import java.io.IOException; import java.util.ArrayList; @@ -50,6 +51,9 @@ public class VectorTimeSeriesMetadata implements ITimeSeriesMetadata { @Override public void setModified(boolean modified) { timeseriesMetadata.setModified(modified); + for (TimeseriesMetadata subSensor : valueTimeseriesMetadataList) { + subSensor.setModified(modified); + } } @Override @@ -60,6 +64,9 @@ public class VectorTimeSeriesMetadata implements ITimeSeriesMetadata { @Override public void setSeq(boolean seq) { timeseriesMetadata.setSeq(seq); + for (TimeseriesMetadata subSensor : valueTimeseriesMetadataList) { + subSensor.setSeq(seq); + } } @Override @@ -91,7 +98,16 @@ public class VectorTimeSeriesMetadata implements ITimeSeriesMetadata { return null; } + @Override + public void setChunkMetadataLoader(IChunkMetadataLoader chunkMetadataLoader) { + timeseriesMetadata.setChunkMetadataLoader(chunkMetadataLoader); + } + public List<TimeseriesMetadata> getValueTimeseriesMetadataList() { return valueTimeseriesMetadataList; } + + public TimeseriesMetadata getTimeseriesMetadata() { + return timeseriesMetadata; + } } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java index 41d0cee..ed153cc 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java @@ -366,6 +366,16 @@ public class TsFileSequenceReader implements AutoCloseable { } /** + * Find the leaf node that contains this vector, return all the needed subSensor and time column + * + * @return TimeseriesMetadata for the time column and all the needed subSensor, the order of the + * element in this list should be the same as subSensorList + */ + public List<TimeseriesMetadata> readTimeseriesMetadata(Path path, List<String> subSensorList) { + return Collections.emptyList(); + } + + /** * Find the leaf node that contains path, return all the sensors in that leaf node which are also * in allSensors set */
