This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch lmh/addQueryMetrics in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 7bf9d103a118406335d89fb5a84e4dc4db7842d1 Author: Minghui Liu <[email protected]> AuthorDate: Sun Dec 18 23:52:44 2022 +0800 add metrics: series_scan_cost --- .../iotdb/commons/service/metric/enums/Metric.java | 3 +- .../iotdb/commons/service/metric/enums/Tag.java | 3 +- .../apache/iotdb/db/engine/cache/ChunkCache.java | 49 +- .../db/engine/cache/TimeSeriesMetadataCache.java | 154 +++--- .../operator/source/AlignedSeriesScanUtil.java | 1 + .../execution/operator/source/SeriesScanUtil.java | 359 +++++++------- .../iotdb/db/mpp/metric/QueryMetricsManager.java | 11 + .../db/mpp/metric/SeriesScanCostMetricSet.java | 542 +++++++++++++++++++++ .../query/reader/chunk/DiskAlignedChunkLoader.java | 38 +- .../db/query/reader/chunk/DiskChunkLoader.java | 23 +- .../query/reader/chunk/MemAlignedChunkLoader.java | 15 +- .../db/query/reader/chunk/MemChunkLoader.java | 15 +- .../metadata/DiskAlignedChunkMetadataLoader.java | 112 +++-- .../chunk/metadata/DiskChunkMetadataLoader.java | 118 +++-- .../metadata/MemAlignedChunkMetadataLoader.java | 75 +-- .../chunk/metadata/MemChunkMetadataLoader.java | 76 +-- .../db/service/metrics/DataNodeMetricsHelper.java | 2 + .../org/apache/iotdb/db/utils/FileLoaderUtils.java | 267 +++++----- .../read/reader/chunk/AlignedChunkReader.java | 34 +- 19 files changed, 1344 insertions(+), 553 deletions(-) diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java b/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java index 1c0ab35d31..e025895606 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java @@ -64,7 +64,8 @@ public enum Metric { STAGE, QUERY_PLAN_COST, OPERATOR_EXECUTION_COST, - OPERATOR_EXECUTION_COUNT; + OPERATOR_EXECUTION_COUNT, + SERIES_SCAN_COST; @Override public String toString() { diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Tag.java b/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Tag.java index 65280a22a2..178937bec4 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Tag.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Tag.java @@ -24,7 +24,8 @@ public enum Tag { NAME, REGION, STATUS, - STAGE; + STAGE, + FROM; @Override public String toString() { diff --git a/server/src/main/java/org/apache/iotdb/db/engine/cache/ChunkCache.java b/server/src/main/java/org/apache/iotdb/db/engine/cache/ChunkCache.java index 0fa63a5fdf..3cb94b4491 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/cache/ChunkCache.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/cache/ChunkCache.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.service.metric.MetricService; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.mpp.metric.QueryMetricsManager; import org.apache.iotdb.db.query.control.FileReaderManager; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; import org.apache.iotdb.tsfile.read.TsFileSequenceReader; @@ -38,6 +39,9 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.concurrent.atomic.AtomicLong; +import static org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet.READ_CHUNK_ALL; +import static org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet.READ_CHUNK_FILE; + /** * This class is used to cache <code>Chunk</code> of <code>ChunkMetaData</code> in IoTDB. The * caching strategy is LRU. @@ -51,6 +55,8 @@ public class ChunkCache { config.getAllocateMemoryForChunkCache(); private static final boolean CACHE_ENABLE = config.isMetaDataCacheEnable(); + private static final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance(); + private final LoadingCache<ChunkMetadata, Chunk> lruCache; private final AtomicLong entryAverageSize = new AtomicLong(0); @@ -71,6 +77,7 @@ public class ChunkCache { .recordStats() .build( chunkMetadata -> { + long startTime = System.nanoTime(); try { TsFileSequenceReader reader = FileReaderManager.getInstance() @@ -79,6 +86,9 @@ public class ChunkCache { } catch (IOException e) { logger.error("Something wrong happened in reading {}", chunkMetadata, e); throw e; + } finally { + QUERY_METRICS.recordSeriesScanCost( + READ_CHUNK_FILE, System.nanoTime() - startTime); } }); @@ -99,29 +109,34 @@ public class ChunkCache { } public Chunk get(ChunkMetadata chunkMetaData, boolean debug) throws IOException { - if (!CACHE_ENABLE) { - TsFileSequenceReader reader = - FileReaderManager.getInstance() - .get(chunkMetaData.getFilePath(), chunkMetaData.isClosed()); - Chunk chunk = reader.readMemChunk(chunkMetaData); + long startTime = System.nanoTime(); + try { + if (!CACHE_ENABLE) { + TsFileSequenceReader reader = + FileReaderManager.getInstance() + .get(chunkMetaData.getFilePath(), chunkMetaData.isClosed()); + Chunk chunk = reader.readMemChunk(chunkMetaData); + return new Chunk( + chunk.getHeader(), + chunk.getData().duplicate(), + chunkMetaData.getDeleteIntervalList(), + chunkMetaData.getStatistics()); + } + + Chunk chunk = lruCache.get(chunkMetaData); + + if (debug) { + DEBUG_LOGGER.info("get chunk from cache whose meta data is: " + chunkMetaData); + } + return new Chunk( chunk.getHeader(), chunk.getData().duplicate(), chunkMetaData.getDeleteIntervalList(), chunkMetaData.getStatistics()); + } finally { + QUERY_METRICS.recordSeriesScanCost(READ_CHUNK_ALL, System.nanoTime() - startTime); } - - Chunk chunk = lruCache.get(chunkMetaData); - - if (debug) { - DEBUG_LOGGER.info("get chunk from cache whose meta data is: " + chunkMetaData); - } - - return new Chunk( - chunk.getHeader(), - chunk.getData().duplicate(), - chunkMetaData.getDeleteIntervalList(), - chunkMetaData.getStatistics()); } public double calculateChunkHitRatio() { diff --git a/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java b/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java index d70b2faf41..936958dff4 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.service.metric.MetricService; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.mpp.metric.QueryMetricsManager; import org.apache.iotdb.db.query.control.FileReaderManager; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; @@ -50,6 +51,9 @@ import java.util.Set; import java.util.WeakHashMap; import java.util.concurrent.atomic.AtomicLong; +import static org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet.READ_TIMESERIES_METADATA_CACHE; +import static org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet.READ_TIMESERIES_METADATA_FILE; + /** * This class is used to cache <code>TimeSeriesMetadata</code> in IoTDB. The caching strategy is * LRU. @@ -63,6 +67,8 @@ public class TimeSeriesMetadataCache { config.getAllocateMemoryForTimeSeriesMetaDataCache(); private static final boolean CACHE_ENABLE = config.isMetaDataCacheEnable(); + private static final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance(); + private final Cache<TimeSeriesMetadataCacheKey, TimeseriesMetadata> lruCache; private final AtomicLong entryAverageSize = new AtomicLong(0); @@ -118,84 +124,96 @@ public class TimeSeriesMetadataCache { boolean ignoreNotExists, boolean debug) throws IOException { - if (!CACHE_ENABLE) { - // bloom filter part - TsFileSequenceReader reader = FileReaderManager.getInstance().get(key.filePath, true); - BloomFilter bloomFilter = reader.readBloomFilter(); - if (bloomFilter != null - && !bloomFilter.contains(key.device + IoTDBConstant.PATH_SEPARATOR + key.measurement)) { - return null; + long startTime = System.nanoTime(); + boolean cacheHit = true; + try { + if (!CACHE_ENABLE) { + cacheHit = false; + + // bloom filter part + TsFileSequenceReader reader = FileReaderManager.getInstance().get(key.filePath, true); + BloomFilter bloomFilter = reader.readBloomFilter(); + if (bloomFilter != null + && !bloomFilter.contains(key.device + IoTDBConstant.PATH_SEPARATOR + key.measurement)) { + return null; + } + TimeseriesMetadata timeseriesMetadata = + reader.readTimeseriesMetadata( + new Path(key.device, key.measurement, true), ignoreNotExists); + return (timeseriesMetadata == null || timeseriesMetadata.getStatistics().getCount() == 0) + ? null + : timeseriesMetadata; } - TimeseriesMetadata timeseriesMetadata = - reader.readTimeseriesMetadata( - new Path(key.device, key.measurement, true), ignoreNotExists); - return (timeseriesMetadata == null || timeseriesMetadata.getStatistics().getCount() == 0) - ? null - : timeseriesMetadata; - } - TimeseriesMetadata timeseriesMetadata = lruCache.getIfPresent(key); + TimeseriesMetadata timeseriesMetadata = lruCache.getIfPresent(key); - if (timeseriesMetadata == null) { - if (debug) { - DEBUG_LOGGER.info( - "Cache miss: {}.{} in file: {}", key.device, key.measurement, key.filePath); - DEBUG_LOGGER.info("Device: {}, all sensors: {}", key.device, allSensors); - } - // allow for the parallelism of different devices - synchronized ( - devices.computeIfAbsent(key.device + SEPARATOR + key.filePath, WeakReference::new)) { - // double check - timeseriesMetadata = lruCache.getIfPresent(key); - if (timeseriesMetadata == null) { - Path path = new Path(key.device, key.measurement, true); - // bloom filter part - BloomFilter bloomFilter = - BloomFilterCache.getInstance() - .get(new BloomFilterCache.BloomFilterCacheKey(key.filePath), debug); - if (bloomFilter != null) { - bloomFilterRequestCount.incrementAndGet(); - if (!bloomFilter.contains(path.getFullPath())) { - bloomFilterPreventCount.incrementAndGet(); - if (debug) { - DEBUG_LOGGER.info("TimeSeries meta data {} is filter by bloomFilter!", key); + if (timeseriesMetadata == null) { + if (debug) { + DEBUG_LOGGER.info( + "Cache miss: {}.{} in file: {}", key.device, key.measurement, key.filePath); + DEBUG_LOGGER.info("Device: {}, all sensors: {}", key.device, allSensors); + } + // allow for the parallelism of different devices + synchronized ( + devices.computeIfAbsent(key.device + SEPARATOR + key.filePath, WeakReference::new)) { + // double check + timeseriesMetadata = lruCache.getIfPresent(key); + if (timeseriesMetadata == null) { + cacheHit = false; + + Path path = new Path(key.device, key.measurement, true); + // bloom filter part + BloomFilter bloomFilter = + BloomFilterCache.getInstance() + .get(new BloomFilterCache.BloomFilterCacheKey(key.filePath), debug); + if (bloomFilter != null) { + bloomFilterRequestCount.incrementAndGet(); + if (!bloomFilter.contains(path.getFullPath())) { + bloomFilterPreventCount.incrementAndGet(); + if (debug) { + DEBUG_LOGGER.info("TimeSeries meta data {} is filter by bloomFilter!", key); + } + return null; } - return null; } - } - TsFileSequenceReader reader = FileReaderManager.getInstance().get(key.filePath, true); - List<TimeseriesMetadata> timeSeriesMetadataList = - reader.readTimeseriesMetadata(path, allSensors); - // put TimeSeriesMetadata of all sensors used in this query into cache - for (TimeseriesMetadata metadata : timeSeriesMetadataList) { - TimeSeriesMetadataCacheKey k = - new TimeSeriesMetadataCacheKey( - key.filePath, key.device, metadata.getMeasurementId()); - if (metadata.getStatistics().getCount() != 0) { - lruCache.put(k, metadata); - } - if (metadata.getMeasurementId().equals(key.measurement)) { - timeseriesMetadata = metadata.getStatistics().getCount() == 0 ? null : metadata; + TsFileSequenceReader reader = FileReaderManager.getInstance().get(key.filePath, true); + List<TimeseriesMetadata> timeSeriesMetadataList = + reader.readTimeseriesMetadata(path, allSensors); + // put TimeSeriesMetadata of all sensors used in this query into cache + for (TimeseriesMetadata metadata : timeSeriesMetadataList) { + TimeSeriesMetadataCacheKey k = + new TimeSeriesMetadataCacheKey( + key.filePath, key.device, metadata.getMeasurementId()); + if (metadata.getStatistics().getCount() != 0) { + lruCache.put(k, metadata); + } + if (metadata.getMeasurementId().equals(key.measurement)) { + timeseriesMetadata = metadata.getStatistics().getCount() == 0 ? null : metadata; + } } } } } - } - if (timeseriesMetadata == null) { - if (debug) { - DEBUG_LOGGER.info("The file doesn't have this time series {}.", key); - } - return null; - } else { - if (debug) { - DEBUG_LOGGER.info( - "Get timeseries: {}.{} metadata in file: {} from cache: {}.", - key.device, - key.measurement, - key.filePath, - timeseriesMetadata); + if (timeseriesMetadata == null) { + if (debug) { + DEBUG_LOGGER.info("The file doesn't have this time series {}.", key); + } + return null; + } else { + if (debug) { + DEBUG_LOGGER.info( + "Get timeseries: {}.{} metadata in file: {} from cache: {}.", + key.device, + key.measurement, + key.filePath, + timeseriesMetadata); + } + return new TimeseriesMetadata(timeseriesMetadata); } - return new TimeseriesMetadata(timeseriesMetadata); + } finally { + QUERY_METRICS.recordSeriesScanCost( + cacheHit ? READ_TIMESERIES_METADATA_CACHE : READ_TIMESERIES_METADATA_FILE, + System.nanoTime() - startTime); } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java index f41456b063..52e514eee4 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java @@ -57,6 +57,7 @@ public class AlignedSeriesScanUtil extends SeriesScanUtil { dataTypes = ((AlignedPath) seriesPath) .getSchemaList().stream().map(IMeasurementSchema::getType).collect(Collectors.toList()); + isAligned = true; } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java index 22fd2cbabc..35927976d5 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java @@ -24,7 +24,10 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.metadata.idtable.IDTable; import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext; +import org.apache.iotdb.db.mpp.metric.QueryMetricsManager; import org.apache.iotdb.db.query.context.QueryContext; +import org.apache.iotdb.db.query.reader.chunk.MemAlignedPageReader; +import org.apache.iotdb.db.query.reader.chunk.MemPageReader; import org.apache.iotdb.db.query.reader.universal.DescPriorityMergeReader; import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader; import org.apache.iotdb.db.utils.FileLoaderUtils; @@ -58,19 +61,26 @@ import java.util.function.ToLongFunction; import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet.BUILD_TSBLOCK_FROM_MERGE_READER_ALIGNED; +import static org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet.BUILD_TSBLOCK_FROM_MERGE_READER_NONALIGNED; +import static org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet.BUILD_TSBLOCK_FROM_PAGE_READER_ALIGNED_DISK; +import static org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet.BUILD_TSBLOCK_FROM_PAGE_READER_ALIGNED_MEM; +import static org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet.BUILD_TSBLOCK_FROM_PAGE_READER_NONALIGNED_DISK; +import static org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet.BUILD_TSBLOCK_FROM_PAGE_READER_NONALIGNED_MEM; public class SeriesScanUtil { private final FragmentInstanceContext context; // The path of the target series which will be scanned. private final PartialPath seriesPath; + protected boolean isAligned = false; // all the sensors in this device; protected final Set<String> allSensors; protected final TSDataType dataType; // inner class of SeriesReader for order purpose - private TimeOrderUtils orderUtils; + private final TimeOrderUtils orderUtils; /* * There is at most one is not null between timeFilter and valueFilter @@ -121,6 +131,8 @@ public class SeriesScanUtil { protected boolean hasCachedNextOverlappedPage; protected TsBlock cachedTsBlock; + private static final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance(); + public SeriesScanUtil( PartialPath seriesPath, Set<String> allSensors, @@ -642,177 +654,187 @@ public class SeriesScanUtil { */ @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning private boolean hasNextOverlappedPage() throws IOException { + long startTime = System.nanoTime(); + try { + if (hasCachedNextOverlappedPage) { + return true; + } - if (hasCachedNextOverlappedPage) { - return true; - } - - tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader(); - - while (true) { + tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader(); - // may has overlapped data - if (mergeReader.hasNextTimeValuePair()) { + while (true) { - // TODO we still need to consider data type, ascending and descending here - TsBlockBuilder builder = new TsBlockBuilder(getTsDataTypeList()); - TimeColumnBuilder timeBuilder = builder.getTimeColumnBuilder(); - long currentPageEndPointTime = mergeReader.getCurrentReadStopTime(); - while (mergeReader.hasNextTimeValuePair()) { + // may has overlapped data + if (mergeReader.hasNextTimeValuePair()) { - /* - * get current first point in mergeReader, this maybe overlapped later - */ - TimeValuePair timeValuePair = mergeReader.currentTimeValuePair(); + // TODO we still need to consider data type, ascending and descending here + TsBlockBuilder builder = new TsBlockBuilder(getTsDataTypeList()); + TimeColumnBuilder timeBuilder = builder.getTimeColumnBuilder(); + long currentPageEndPointTime = mergeReader.getCurrentReadStopTime(); + while (mergeReader.hasNextTimeValuePair()) { - if (orderUtils.isExcessEndpoint(timeValuePair.getTimestamp(), currentPageEndPointTime)) { /* - * when the merged point excesses the currentPageEndPointTime, we have read all overlapped data before currentPageEndPointTime - * 1. has cached batch data, we don't need to read more data, just use the cached data later - * 2. has first page reader, which means first page reader last endTime < currentTimeValuePair.getTimestamp(), - * we could just use the first page reader later - * 3. sequence page reader is not empty, which means first page reader last endTime < currentTimeValuePair.getTimestamp(), - * we could use the first sequence page reader later + * get current first point in mergeReader, this maybe overlapped later */ - if (!builder.isEmpty() || firstPageReader != null || !seqPageReaders.isEmpty()) { - break; + TimeValuePair timeValuePair = mergeReader.currentTimeValuePair(); + + if (orderUtils.isExcessEndpoint( + timeValuePair.getTimestamp(), currentPageEndPointTime)) { + /* + * when the merged point excesses the currentPageEndPointTime, we have read all overlapped data before currentPageEndPointTime + * 1. has cached batch data, we don't need to read more data, just use the cached data later + * 2. has first page reader, which means first page reader last endTime < currentTimeValuePair.getTimestamp(), + * we could just use the first page reader later + * 3. sequence page reader is not empty, which means first page reader last endTime < currentTimeValuePair.getTimestamp(), + * we could use the first sequence page reader later + */ + if (!builder.isEmpty() || firstPageReader != null || !seqPageReaders.isEmpty()) { + break; + } + // so, we don't have other data except mergeReader + currentPageEndPointTime = mergeReader.getCurrentReadStopTime(); } - // so, we don't have other data except mergeReader - currentPageEndPointTime = mergeReader.getCurrentReadStopTime(); - } - // unpack all overlapped data for the first timeValuePair - unpackAllOverlappedTsFilesToTimeSeriesMetadata(timeValuePair.getTimestamp()); - unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata( - timeValuePair.getTimestamp(), false); - unpackAllOverlappedChunkMetadataToPageReaders(timeValuePair.getTimestamp(), false); - unpackAllOverlappedUnseqPageReadersToMergeReader(timeValuePair.getTimestamp()); - - // update if there are unpacked unSeqPageReaders - timeValuePair = mergeReader.currentTimeValuePair(); - - // from now, the unsequence reader is all unpacked, so we don't need to consider it - // we has first page reader now - if (firstPageReader != null) { - // if current timeValuePair excesses the first page reader's end time, we just use the - // cached data - if ((orderUtils.getAscending() - && timeValuePair.getTimestamp() > firstPageReader.getStatistics().getEndTime()) - || (!orderUtils.getAscending() - && timeValuePair.getTimestamp() - < firstPageReader.getStatistics().getStartTime())) { - hasCachedNextOverlappedPage = !builder.isEmpty(); - cachedTsBlock = builder.build(); - return hasCachedNextOverlappedPage; - } else if (orderUtils.isOverlapped( - timeValuePair.getTimestamp(), firstPageReader.getStatistics())) { - // current timeValuePair is overlapped with firstPageReader, add it to merged reader - // and update endTime to the max end time - mergeReader.addReader( - getPointReader( - firstPageReader.getAllSatisfiedPageData(orderUtils.getAscending())), - firstPageReader.version, - orderUtils.getOverlapCheckTime(firstPageReader.getStatistics()), - context); - currentPageEndPointTime = - updateEndPointTime(currentPageEndPointTime, firstPageReader); - firstPageReader = null; + // unpack all overlapped data for the first timeValuePair + unpackAllOverlappedTsFilesToTimeSeriesMetadata(timeValuePair.getTimestamp()); + unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata( + timeValuePair.getTimestamp(), false); + unpackAllOverlappedChunkMetadataToPageReaders(timeValuePair.getTimestamp(), false); + unpackAllOverlappedUnseqPageReadersToMergeReader(timeValuePair.getTimestamp()); + + // update if there are unpacked unSeqPageReaders + timeValuePair = mergeReader.currentTimeValuePair(); + + // from now, the unsequence reader is all unpacked, so we don't need to consider it + // we has first page reader now + if (firstPageReader != null) { + // if current timeValuePair excesses the first page reader's end time, we just use the + // cached data + if ((orderUtils.getAscending() + && timeValuePair.getTimestamp() + > firstPageReader.getStatistics().getEndTime()) + || (!orderUtils.getAscending() + && timeValuePair.getTimestamp() + < firstPageReader.getStatistics().getStartTime())) { + hasCachedNextOverlappedPage = !builder.isEmpty(); + cachedTsBlock = builder.build(); + return hasCachedNextOverlappedPage; + } else if (orderUtils.isOverlapped( + timeValuePair.getTimestamp(), firstPageReader.getStatistics())) { + // current timeValuePair is overlapped with firstPageReader, add it to merged reader + // and update endTime to the max end time + mergeReader.addReader( + getPointReader( + firstPageReader.getAllSatisfiedPageData(orderUtils.getAscending())), + firstPageReader.version, + orderUtils.getOverlapCheckTime(firstPageReader.getStatistics()), + context); + currentPageEndPointTime = + updateEndPointTime(currentPageEndPointTime, firstPageReader); + firstPageReader = null; + } } - } - // the seq page readers is not empty, just like first page reader - if (!seqPageReaders.isEmpty()) { - if ((orderUtils.getAscending() - && timeValuePair.getTimestamp() - > seqPageReaders.get(0).getStatistics().getEndTime()) - || (!orderUtils.getAscending() - && timeValuePair.getTimestamp() - < seqPageReaders.get(0).getStatistics().getStartTime())) { - hasCachedNextOverlappedPage = !builder.isEmpty(); - cachedTsBlock = builder.build(); - return hasCachedNextOverlappedPage; - } else if (orderUtils.isOverlapped( - timeValuePair.getTimestamp(), seqPageReaders.get(0).getStatistics())) { - VersionPageReader pageReader = seqPageReaders.remove(0); - mergeReader.addReader( - getPointReader(pageReader.getAllSatisfiedPageData(orderUtils.getAscending())), - pageReader.version, - orderUtils.getOverlapCheckTime(pageReader.getStatistics()), - context); - currentPageEndPointTime = updateEndPointTime(currentPageEndPointTime, pageReader); + // the seq page readers is not empty, just like first page reader + if (!seqPageReaders.isEmpty()) { + if ((orderUtils.getAscending() + && timeValuePair.getTimestamp() + > seqPageReaders.get(0).getStatistics().getEndTime()) + || (!orderUtils.getAscending() + && timeValuePair.getTimestamp() + < seqPageReaders.get(0).getStatistics().getStartTime())) { + hasCachedNextOverlappedPage = !builder.isEmpty(); + cachedTsBlock = builder.build(); + return hasCachedNextOverlappedPage; + } else if (orderUtils.isOverlapped( + timeValuePair.getTimestamp(), seqPageReaders.get(0).getStatistics())) { + VersionPageReader pageReader = seqPageReaders.remove(0); + mergeReader.addReader( + getPointReader(pageReader.getAllSatisfiedPageData(orderUtils.getAscending())), + pageReader.version, + orderUtils.getOverlapCheckTime(pageReader.getStatistics()), + context); + currentPageEndPointTime = updateEndPointTime(currentPageEndPointTime, pageReader); + } } - } - /* - * get the latest first point in mergeReader - */ - timeValuePair = mergeReader.nextTimeValuePair(); + /* + * get the latest first point in mergeReader + */ + timeValuePair = mergeReader.nextTimeValuePair(); - Object valueForFilter = timeValuePair.getValue().getValue(); + Object valueForFilter = timeValuePair.getValue().getValue(); - // TODO fix value filter firstNotNullObject, currently, if it's a value filter, it will - // only accept AlignedPath with only one sub sensor - if (timeValuePair.getValue().getDataType() == TSDataType.VECTOR) { - for (TsPrimitiveType tsPrimitiveType : timeValuePair.getValue().getVector()) { - if (tsPrimitiveType != null) { - valueForFilter = tsPrimitiveType.getValue(); - break; + // TODO fix value filter firstNotNullObject, currently, if it's a value filter, it will + // only accept AlignedPath with only one sub sensor + if (timeValuePair.getValue().getDataType() == TSDataType.VECTOR) { + for (TsPrimitiveType tsPrimitiveType : timeValuePair.getValue().getVector()) { + if (tsPrimitiveType != null) { + valueForFilter = tsPrimitiveType.getValue(); + break; + } } } - } - if (valueFilter == null - || valueFilter.satisfy(timeValuePair.getTimestamp(), valueForFilter)) { - timeBuilder.writeLong(timeValuePair.getTimestamp()); - switch (dataType) { - case BOOLEAN: - builder.getColumnBuilder(0).writeBoolean(timeValuePair.getValue().getBoolean()); - break; - case INT32: - builder.getColumnBuilder(0).writeInt(timeValuePair.getValue().getInt()); - break; - case INT64: - builder.getColumnBuilder(0).writeLong(timeValuePair.getValue().getLong()); - break; - case FLOAT: - builder.getColumnBuilder(0).writeFloat(timeValuePair.getValue().getFloat()); - break; - case DOUBLE: - builder.getColumnBuilder(0).writeDouble(timeValuePair.getValue().getDouble()); - break; - case TEXT: - builder.getColumnBuilder(0).writeBinary(timeValuePair.getValue().getBinary()); - break; - case VECTOR: - TsPrimitiveType[] values = timeValuePair.getValue().getVector(); - for (int i = 0; i < values.length; i++) { - if (values[i] == null) { - builder.getColumnBuilder(i).appendNull(); - } else { - builder.getColumnBuilder(i).writeTsPrimitiveType(values[i]); + if (valueFilter == null + || valueFilter.satisfy(timeValuePair.getTimestamp(), valueForFilter)) { + timeBuilder.writeLong(timeValuePair.getTimestamp()); + switch (dataType) { + case BOOLEAN: + builder.getColumnBuilder(0).writeBoolean(timeValuePair.getValue().getBoolean()); + break; + case INT32: + builder.getColumnBuilder(0).writeInt(timeValuePair.getValue().getInt()); + break; + case INT64: + builder.getColumnBuilder(0).writeLong(timeValuePair.getValue().getLong()); + break; + case FLOAT: + builder.getColumnBuilder(0).writeFloat(timeValuePair.getValue().getFloat()); + break; + case DOUBLE: + builder.getColumnBuilder(0).writeDouble(timeValuePair.getValue().getDouble()); + break; + case TEXT: + builder.getColumnBuilder(0).writeBinary(timeValuePair.getValue().getBinary()); + break; + case VECTOR: + TsPrimitiveType[] values = timeValuePair.getValue().getVector(); + for (int i = 0; i < values.length; i++) { + if (values[i] == null) { + builder.getColumnBuilder(i).appendNull(); + } else { + builder.getColumnBuilder(i).writeTsPrimitiveType(values[i]); + } } - } - break; - default: - throw new UnSupportedDataTypeException(String.valueOf(dataType)); + break; + default: + throw new UnSupportedDataTypeException(String.valueOf(dataType)); + } + builder.declarePosition(); } - builder.declarePosition(); } - } - hasCachedNextOverlappedPage = !builder.isEmpty(); - cachedTsBlock = builder.build(); - /* - * if current overlapped page has valid data, return, otherwise read next overlapped page - */ - if (hasCachedNextOverlappedPage) { - return true; - } else if (mergeReader.hasNextTimeValuePair()) { - // condition: seqPage.endTime < mergeReader.currentTime + hasCachedNextOverlappedPage = !builder.isEmpty(); + cachedTsBlock = builder.build(); + /* + * if current overlapped page has valid data, return, otherwise read next overlapped page + */ + if (hasCachedNextOverlappedPage) { + return true; + } else if (mergeReader.hasNextTimeValuePair()) { + // condition: seqPage.endTime < mergeReader.currentTime + return false; + } + } else { return false; } - } else { - return false; } + } finally { + QUERY_METRICS.recordSeriesScanCost( + isAligned + ? BUILD_TSBLOCK_FROM_MERGE_READER_ALIGNED + : BUILD_TSBLOCK_FROM_MERGE_READER_NONALIGNED, + System.nanoTime() - startTime); } } @@ -1084,25 +1106,21 @@ public class SeriesScanUtil { return timeFilter; } - public TimeOrderUtils getOrderUtils() { - return orderUtils; - } + protected static class VersionPageReader { - protected class VersionPageReader { + private final PriorityMergeReader.MergeReaderPriority version; + private final IPageReader data; - protected PriorityMergeReader.MergeReaderPriority version; - protected IPageReader data; - - protected boolean isSeq; + private final boolean isSeq; + private final boolean isAligned; + private final boolean isMem; VersionPageReader(long version, long offset, IPageReader data, boolean isSeq) { this.version = new PriorityMergeReader.MergeReaderPriority(version, offset); this.data = data; this.isSeq = isSeq; - } - - public boolean isAlignedPageReader() { - return data instanceof IAlignedPageReader; + this.isAligned = data instanceof IAlignedPageReader; + this.isMem = data instanceof MemPageReader || data instanceof MemAlignedPageReader; } Statistics getStatistics() { @@ -1124,11 +1142,24 @@ public class SeriesScanUtil { } TsBlock getAllSatisfiedPageData(boolean ascending) throws IOException { - TsBlock tsBlock = data.getAllSatisfiedData(); - if (!ascending) { - tsBlock.reverse(); + long startTime = System.nanoTime(); + try { + TsBlock tsBlock = data.getAllSatisfiedData(); + if (!ascending) { + tsBlock.reverse(); + } + return tsBlock; + } finally { + QUERY_METRICS.recordSeriesScanCost( + isAligned + ? (isMem + ? BUILD_TSBLOCK_FROM_PAGE_READER_ALIGNED_MEM + : BUILD_TSBLOCK_FROM_PAGE_READER_ALIGNED_DISK) + : (isMem + ? BUILD_TSBLOCK_FROM_PAGE_READER_NONALIGNED_MEM + : BUILD_TSBLOCK_FROM_PAGE_READER_NONALIGNED_DISK), + System.nanoTime() - startTime); } - return tsBlock; } void setFilter(Filter filter) { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/metric/QueryMetricsManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/metric/QueryMetricsManager.java index dac48b1680..9292af0885 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/metric/QueryMetricsManager.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/metric/QueryMetricsManager.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.mpp.metric; import org.apache.iotdb.commons.service.metric.MetricService; import org.apache.iotdb.commons.service.metric.enums.Metric; import org.apache.iotdb.commons.service.metric.enums.Tag; +import org.apache.iotdb.metrics.utils.MetricInfo; import org.apache.iotdb.metrics.utils.MetricLevel; import java.util.concurrent.TimeUnit; @@ -59,6 +60,16 @@ public class QueryMetricsManager { operatorType); } + public void recordSeriesScanCost(String stage, long costTimeInNanos) { + MetricInfo metricInfo = SeriesScanCostMetricSet.metricInfoMap.get(stage); + metricService.timer( + costTimeInNanos, + TimeUnit.NANOSECONDS, + metricInfo.getName(), + MetricLevel.IMPORTANT, + metricInfo.getTagsInArray()); + } + public static QueryMetricsManager getInstance() { return QueryMetricsManager.QueryMetricsManagerHolder.INSTANCE; } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/metric/SeriesScanCostMetricSet.java b/server/src/main/java/org/apache/iotdb/db/mpp/metric/SeriesScanCostMetricSet.java new file mode 100644 index 0000000000..06920a69ab --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/mpp/metric/SeriesScanCostMetricSet.java @@ -0,0 +1,542 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.mpp.metric; + +import org.apache.iotdb.commons.service.metric.enums.Metric; +import org.apache.iotdb.commons.service.metric.enums.Tag; +import org.apache.iotdb.metrics.AbstractMetricService; +import org.apache.iotdb.metrics.metricsets.IMetricSet; +import org.apache.iotdb.metrics.utils.MetricInfo; +import org.apache.iotdb.metrics.utils.MetricLevel; +import org.apache.iotdb.metrics.utils.MetricType; + +import java.util.HashMap; +import java.util.Map; + +public class SeriesScanCostMetricSet implements IMetricSet { + + private static final String metric = Metric.SERIES_SCAN_COST.toString(); + + public static final Map<String, MetricInfo> metricInfoMap = new HashMap<>(); + + public static final String LOAD_TIMESERIES_METADATA_ALIGNED_MEM = + "load_timeseries_metadata_aligned_mem"; + public static final String LOAD_TIMESERIES_METADATA_ALIGNED_DISK = + "load_timeseries_metadata_aligned_disk"; + public static final String LOAD_TIMESERIES_METADATA_NONALIGNED_MEM = + "load_timeseries_metadata_nonaligned_mem"; + public static final String LOAD_TIMESERIES_METADATA_NONALIGNED_DISK = + "load_timeseries_metadata_nonaligned_disk"; + + static { + metricInfoMap.put( + LOAD_TIMESERIES_METADATA_ALIGNED_MEM, + new MetricInfo( + MetricType.TIMER, + metric, + Tag.STAGE.toString(), + "load_timeseries_metadata", + Tag.TYPE.toString(), + "aligned", + Tag.FROM.toString(), + "mem")); + metricInfoMap.put( + LOAD_TIMESERIES_METADATA_ALIGNED_DISK, + new MetricInfo( + MetricType.TIMER, + metric, + Tag.STAGE.toString(), + "load_timeseries_metadata", + Tag.TYPE.toString(), + "aligned", + Tag.FROM.toString(), + "disk")); + metricInfoMap.put( + LOAD_TIMESERIES_METADATA_NONALIGNED_MEM, + new MetricInfo( + MetricType.TIMER, + metric, + Tag.STAGE.toString(), + "load_timeseries_metadata", + Tag.TYPE.toString(), + "non_aligned", + Tag.FROM.toString(), + "mem")); + metricInfoMap.put( + LOAD_TIMESERIES_METADATA_NONALIGNED_DISK, + new MetricInfo( + MetricType.TIMER, + metric, + Tag.STAGE.toString(), + "load_timeseries_metadata", + Tag.TYPE.toString(), + "non_aligned", + Tag.FROM.toString(), + "disk")); + } + + public static final String READ_TIMESERIES_METADATA_CACHE = "read_timeseries_metadata_cache"; + public static final String READ_TIMESERIES_METADATA_FILE = "read_timeseries_metadata_file"; + + static { + metricInfoMap.put( + READ_TIMESERIES_METADATA_CACHE, + new MetricInfo( + MetricType.TIMER, + metric, + Tag.STAGE.toString(), + "read_timeseries_metadata", + Tag.FROM.toString(), + "cache")); + metricInfoMap.put( + READ_TIMESERIES_METADATA_FILE, + new MetricInfo( + MetricType.TIMER, + metric, + Tag.STAGE.toString(), + "read_timeseries_metadata", + Tag.FROM.toString(), + "file")); + } + + public static final String TIMESERIES_METADATA_MODIFICATION_ALIGNED = + "timeseries_metadata_modification_aligned"; + public static final String TIMESERIES_METADATA_MODIFICATION_NONALIGNED = + "timeseries_metadata_modification_nonaligned"; + + static { + metricInfoMap.put( + TIMESERIES_METADATA_MODIFICATION_ALIGNED, + new MetricInfo( + MetricType.TIMER, + metric, + Tag.STAGE.toString(), + "timeseries_metadata_modification", + Tag.TYPE.toString(), + "aligned")); + metricInfoMap.put( + TIMESERIES_METADATA_MODIFICATION_NONALIGNED, + new MetricInfo( + MetricType.TIMER, + metric, + Tag.STAGE.toString(), + "timeseries_metadata_modification", + Tag.TYPE.toString(), + "non_aligned")); + } + + public static final String LOAD_CHUNK_METADATA_LIST_ALIGNED_MEM = + "load_chunk_metadata_list_aligned_mem"; + public static final String LOAD_CHUNK_METADATA_LIST_ALIGNED_DISK = + "load_chunk_metadata_list_aligned_disk"; + public static final String LOAD_CHUNK_METADATA_LIST_NONALIGNED_MEM = + "load_chunk_metadata_list_nonaligned_mem"; + public static final String LOAD_CHUNK_METADATA_LIST_NONALIGNED_DISK = + "load_chunk_metadata_list_nonaligned_disk"; + + static { + metricInfoMap.put( + LOAD_CHUNK_METADATA_LIST_ALIGNED_MEM, + new MetricInfo( + MetricType.TIMER, + metric, + Tag.STAGE.toString(), + "load_chunk_metadata_list", + Tag.TYPE.toString(), + "aligned", + Tag.FROM.toString(), + "mem")); + metricInfoMap.put( + LOAD_CHUNK_METADATA_LIST_ALIGNED_DISK, + new MetricInfo( + MetricType.TIMER, + metric, + Tag.STAGE.toString(), + "load_chunk_metadata_list", + Tag.TYPE.toString(), + "aligned", + Tag.FROM.toString(), + "disk")); + metricInfoMap.put( + LOAD_CHUNK_METADATA_LIST_NONALIGNED_MEM, + new MetricInfo( + MetricType.TIMER, + metric, + Tag.STAGE.toString(), + "load_chunk_metadata_list", + Tag.TYPE.toString(), + "non_aligned", + Tag.FROM.toString(), + "mem")); + metricInfoMap.put( + LOAD_CHUNK_METADATA_LIST_NONALIGNED_DISK, + new MetricInfo( + MetricType.TIMER, + metric, + Tag.STAGE.toString(), + "load_chunk_metadata_list", + Tag.TYPE.toString(), + "non_aligned", + Tag.FROM.toString(), + "disk")); + } + + public static final String CHUNK_METADATA_MODIFICATION_ALIGNED_MEM = + "chunk_metadata_modification_aligned_mem"; + public static final String CHUNK_METADATA_MODIFICATION_ALIGNED_DISK = + "chunk_metadata_modification_aligned_disk"; + public static final String CHUNK_METADATA_MODIFICATION_NONALIGNED_MEM = + "chunk_metadata_modification_nonaligned_mem"; + public static final String CHUNK_METADATA_MODIFICATION_NONALIGNED_DISK = + "chunk_metadata_modification_nonaligned_disk"; + + static { + metricInfoMap.put( + CHUNK_METADATA_MODIFICATION_ALIGNED_MEM, + new MetricInfo( + MetricType.TIMER, + metric, + Tag.STAGE.toString(), + "chunk_metadata_modification", + Tag.TYPE.toString(), + "aligned", + Tag.FROM.toString(), + "mem")); + metricInfoMap.put( + CHUNK_METADATA_MODIFICATION_ALIGNED_DISK, + new MetricInfo( + MetricType.TIMER, + metric, + Tag.STAGE.toString(), + "chunk_metadata_modification", + Tag.TYPE.toString(), + "aligned", + Tag.FROM.toString(), + "disk")); + metricInfoMap.put( + CHUNK_METADATA_MODIFICATION_NONALIGNED_MEM, + new MetricInfo( + MetricType.TIMER, + metric, + Tag.STAGE.toString(), + "chunk_metadata_modification", + Tag.TYPE.toString(), + "non_aligned", + Tag.FROM.toString(), + "mem")); + metricInfoMap.put( + CHUNK_METADATA_MODIFICATION_NONALIGNED_DISK, + new MetricInfo( + MetricType.TIMER, + metric, + Tag.STAGE.toString(), + "chunk_metadata_modification", + Tag.TYPE.toString(), + "non_aligned", + Tag.FROM.toString(), + "disk")); + } + + public static final String CHUNK_METADATA_FILTER_ALIGNED_MEM = + "chunk_metadata_filter_aligned_mem"; + public static final String CHUNK_METADATA_FILTER_ALIGNED_DISK = + "chunk_metadata_filter_aligned_disk"; + public static final String CHUNK_METADATA_FILTER_NONALIGNED_MEM = + "chunk_metadata_filter_nonaligned_mem"; + public static final String CHUNK_METADATA_FILTER_NONALIGNED_DISK = + "chunk_metadata_filter_nonaligned_disk"; + + static { + metricInfoMap.put( + CHUNK_METADATA_FILTER_ALIGNED_MEM, + new MetricInfo( + MetricType.TIMER, + metric, + Tag.STAGE.toString(), + "chunk_metadata_filter", + Tag.TYPE.toString(), + "aligned", + Tag.FROM.toString(), + "mem")); + metricInfoMap.put( + CHUNK_METADATA_FILTER_ALIGNED_DISK, + new MetricInfo( + MetricType.TIMER, + metric, + Tag.STAGE.toString(), + "chunk_metadata_filter", + Tag.TYPE.toString(), + "aligned", + Tag.FROM.toString(), + "disk")); + metricInfoMap.put( + CHUNK_METADATA_FILTER_NONALIGNED_MEM, + new MetricInfo( + MetricType.TIMER, + metric, + Tag.STAGE.toString(), + "chunk_metadata_filter", + Tag.TYPE.toString(), + "non_aligned", + Tag.FROM.toString(), + "mem")); + metricInfoMap.put( + CHUNK_METADATA_FILTER_NONALIGNED_DISK, + new MetricInfo( + MetricType.TIMER, + metric, + Tag.STAGE.toString(), + "chunk_metadata_filter", + Tag.TYPE.toString(), + "non_aligned", + Tag.FROM.toString(), + "disk")); + } + + public static final String CONSTRUCT_CHUNK_READER_ALIGNED_MEM = + "construct_chunk_reader_aligned_mem"; + public static final String CONSTRUCT_CHUNK_READER_ALIGNED_DISK = + "construct_chunk_reader_aligned_disk"; + public static final String CONSTRUCT_CHUNK_READER_NONALIGNED_MEM = + "construct_chunk_reader_nonaligned_mem"; + public static final String CONSTRUCT_CHUNK_READER_NONALIGNED_DISK = + "construct_chunk_reader_nonaligned_disk"; + + static { + metricInfoMap.put( + CONSTRUCT_CHUNK_READER_ALIGNED_MEM, + new MetricInfo( + MetricType.TIMER, + metric, + Tag.STAGE.toString(), + "construct_chunk_reader", + Tag.TYPE.toString(), + "aligned", + Tag.FROM.toString(), + "mem")); + metricInfoMap.put( + CONSTRUCT_CHUNK_READER_ALIGNED_DISK, + new MetricInfo( + MetricType.TIMER, + metric, + Tag.STAGE.toString(), + "construct_chunk_reader", + Tag.TYPE.toString(), + "aligned", + Tag.FROM.toString(), + "disk")); + metricInfoMap.put( + CONSTRUCT_CHUNK_READER_NONALIGNED_MEM, + new MetricInfo( + MetricType.TIMER, + metric, + Tag.STAGE.toString(), + "construct_chunk_reader", + Tag.TYPE.toString(), + "non_aligned", + Tag.FROM.toString(), + "mem")); + metricInfoMap.put( + CONSTRUCT_CHUNK_READER_NONALIGNED_DISK, + new MetricInfo( + MetricType.TIMER, + metric, + Tag.STAGE.toString(), + "construct_chunk_reader", + Tag.TYPE.toString(), + "non_aligned", + Tag.FROM.toString(), + "disk")); + } + + public static final String READ_CHUNK_ALL = "read_chunk_all"; + public static final String READ_CHUNK_FILE = "read_chunk_file"; + + static { + metricInfoMap.put( + READ_CHUNK_ALL, + new MetricInfo( + MetricType.TIMER, + metric, + Tag.STAGE.toString(), + "read_chunk", + Tag.FROM.toString(), + "all")); + metricInfoMap.put( + READ_CHUNK_FILE, + new MetricInfo( + MetricType.TIMER, + metric, + Tag.STAGE.toString(), + "read_chunk", + Tag.FROM.toString(), + "file")); + } + + public static final String INIT_CHUNK_READER_ALIGNED_MEM = "init_chunk_reader_aligned_mem"; + public static final String INIT_CHUNK_READER_ALIGNED_DISK = "init_chunk_reader_aligned_disk"; + public static final String INIT_CHUNK_READER_NONALIGNED_MEM = "init_chunk_reader_nonaligned_mem"; + public static final String INIT_CHUNK_READER_NONALIGNED_DISK = + "init_chunk_reader_nonaligned_disk"; + + static { + metricInfoMap.put( + INIT_CHUNK_READER_ALIGNED_MEM, + new MetricInfo( + MetricType.TIMER, + metric, + Tag.STAGE.toString(), + "init_chunk_reader", + Tag.TYPE.toString(), + "aligned", + Tag.FROM.toString(), + "mem")); + metricInfoMap.put( + INIT_CHUNK_READER_ALIGNED_DISK, + new MetricInfo( + MetricType.TIMER, + metric, + Tag.STAGE.toString(), + "init_chunk_reader", + Tag.TYPE.toString(), + "aligned", + Tag.FROM.toString(), + "disk")); + metricInfoMap.put( + INIT_CHUNK_READER_NONALIGNED_MEM, + new MetricInfo( + MetricType.TIMER, + metric, + Tag.STAGE.toString(), + "init_chunk_reader", + Tag.TYPE.toString(), + "non_aligned", + Tag.FROM.toString(), + "mem")); + metricInfoMap.put( + INIT_CHUNK_READER_NONALIGNED_DISK, + new MetricInfo( + MetricType.TIMER, + metric, + Tag.STAGE.toString(), + "init_chunk_reader", + Tag.TYPE.toString(), + "non_aligned", + Tag.FROM.toString(), + "disk")); + } + + public static final String BUILD_TSBLOCK_FROM_PAGE_READER_ALIGNED_MEM = + "build_tsblock_from_page_reader_aligned_mem"; + public static final String BUILD_TSBLOCK_FROM_PAGE_READER_ALIGNED_DISK = + "build_tsblock_from_page_reader_aligned_disk"; + public static final String BUILD_TSBLOCK_FROM_PAGE_READER_NONALIGNED_MEM = + "build_tsblock_from_page_reader_nonaligned_mem"; + public static final String BUILD_TSBLOCK_FROM_PAGE_READER_NONALIGNED_DISK = + "build_tsblock_from_page_reader_nonaligned_disk"; + + static { + metricInfoMap.put( + BUILD_TSBLOCK_FROM_PAGE_READER_ALIGNED_MEM, + new MetricInfo( + MetricType.TIMER, + metric, + Tag.STAGE.toString(), + "build_tsblock_from_page_reader", + Tag.TYPE.toString(), + "aligned", + Tag.FROM.toString(), + "mem")); + metricInfoMap.put( + BUILD_TSBLOCK_FROM_PAGE_READER_ALIGNED_DISK, + new MetricInfo( + MetricType.TIMER, + metric, + Tag.STAGE.toString(), + "build_tsblock_from_page_reader", + Tag.TYPE.toString(), + "aligned", + Tag.FROM.toString(), + "disk")); + metricInfoMap.put( + BUILD_TSBLOCK_FROM_PAGE_READER_NONALIGNED_MEM, + new MetricInfo( + MetricType.TIMER, + metric, + Tag.STAGE.toString(), + "build_tsblock_from_page_reader", + Tag.TYPE.toString(), + "non_aligned", + Tag.FROM.toString(), + "mem")); + metricInfoMap.put( + BUILD_TSBLOCK_FROM_PAGE_READER_NONALIGNED_DISK, + new MetricInfo( + MetricType.TIMER, + metric, + Tag.STAGE.toString(), + "build_tsblock_from_page_reader", + Tag.TYPE.toString(), + "non_aligned", + Tag.FROM.toString(), + "disk")); + } + + public static final String BUILD_TSBLOCK_FROM_MERGE_READER_ALIGNED = + "build_tsblock_from_merge_reader_aligned"; + public static final String BUILD_TSBLOCK_FROM_MERGE_READER_NONALIGNED = + "build_tsblock_from_merge_reader_nonaligned"; + + static { + metricInfoMap.put( + BUILD_TSBLOCK_FROM_MERGE_READER_ALIGNED, + new MetricInfo( + MetricType.TIMER, + metric, + Tag.STAGE.toString(), + "build_tsblock_from_merge_reader", + Tag.TYPE.toString(), + "aligned")); + metricInfoMap.put( + BUILD_TSBLOCK_FROM_MERGE_READER_NONALIGNED, + new MetricInfo( + MetricType.TIMER, + metric, + Tag.STAGE.toString(), + "build_tsblock_from_merge_reader", + Tag.TYPE.toString(), + "non_aligned")); + } + + @Override + public void bindTo(AbstractMetricService metricService) { + for (MetricInfo metricInfo : metricInfoMap.values()) { + metricService.getOrCreateTimer( + metricInfo.getName(), MetricLevel.IMPORTANT, metricInfo.getTagsInArray()); + } + } + + @Override + public void unbindFrom(AbstractMetricService metricService) { + for (MetricInfo metricInfo : metricInfoMap.values()) { + metricService.remove(MetricType.TIMER, metric, metricInfo.getTagsInArray()); + } + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskAlignedChunkLoader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskAlignedChunkLoader.java index 3487682a2e..b433641af0 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskAlignedChunkLoader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskAlignedChunkLoader.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.query.reader.chunk; import org.apache.iotdb.db.engine.cache.ChunkCache; +import org.apache.iotdb.db.mpp.metric.QueryMetricsManager; import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; @@ -32,9 +33,13 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import static org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet.CONSTRUCT_CHUNK_READER_ALIGNED_DISK; +import static org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet.INIT_CHUNK_READER_ALIGNED_DISK; + public class DiskAlignedChunkLoader implements IChunkLoader { private final boolean debug; + private static final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance(); public DiskAlignedChunkLoader(boolean debug) { this.debug = debug; @@ -51,17 +56,28 @@ public class DiskAlignedChunkLoader implements IChunkLoader { @Override public IChunkReader getChunkReader(IChunkMetadata chunkMetaData, Filter timeFilter) throws IOException { - AlignedChunkMetadata alignedChunkMetadata = (AlignedChunkMetadata) chunkMetaData; - Chunk timeChunk = - ChunkCache.getInstance() - .get((ChunkMetadata) alignedChunkMetadata.getTimeChunkMetadata(), debug); - List<Chunk> valueChunkList = new ArrayList<>(); - for (IChunkMetadata valueChunkMetadata : alignedChunkMetadata.getValueChunkMetadataList()) { - valueChunkList.add( - valueChunkMetadata == null - ? null - : ChunkCache.getInstance().get((ChunkMetadata) valueChunkMetadata, debug)); + long t1 = System.nanoTime(); + try { + AlignedChunkMetadata alignedChunkMetadata = (AlignedChunkMetadata) chunkMetaData; + Chunk timeChunk = + ChunkCache.getInstance() + .get((ChunkMetadata) alignedChunkMetadata.getTimeChunkMetadata(), debug); + List<Chunk> valueChunkList = new ArrayList<>(); + for (IChunkMetadata valueChunkMetadata : alignedChunkMetadata.getValueChunkMetadataList()) { + valueChunkList.add( + valueChunkMetadata == null + ? null + : ChunkCache.getInstance().get((ChunkMetadata) valueChunkMetadata, debug)); + } + + long t2 = System.nanoTime(); + IChunkReader chunkReader = new AlignedChunkReader(timeChunk, valueChunkList, timeFilter); + QUERY_METRICS.recordSeriesScanCost(INIT_CHUNK_READER_ALIGNED_DISK, System.nanoTime() - t2); + + return chunkReader; + } finally { + QUERY_METRICS.recordSeriesScanCost( + CONSTRUCT_CHUNK_READER_ALIGNED_DISK, System.nanoTime() - t1); } - return new AlignedChunkReader(timeChunk, valueChunkList, timeFilter); } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskChunkLoader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskChunkLoader.java index 2f926a7ce0..85ee83bee4 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskChunkLoader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskChunkLoader.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.query.reader.chunk; import org.apache.iotdb.db.engine.cache.ChunkCache; +import org.apache.iotdb.db.mpp.metric.QueryMetricsManager; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; import org.apache.iotdb.tsfile.read.common.Chunk; @@ -30,11 +31,16 @@ import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader; import java.io.IOException; +import static org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet.CONSTRUCT_CHUNK_READER_NONALIGNED_DISK; +import static org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet.INIT_CHUNK_READER_NONALIGNED_DISK; + /** To read one chunk from disk, and only used in iotdb server module */ public class DiskChunkLoader implements IChunkLoader { private final boolean debug; + private static final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance(); + public DiskChunkLoader(boolean debug) { this.debug = debug; } @@ -52,8 +58,19 @@ public class DiskChunkLoader implements IChunkLoader { @Override public IChunkReader getChunkReader(IChunkMetadata chunkMetaData, Filter timeFilter) throws IOException { - Chunk chunk = ChunkCache.getInstance().get((ChunkMetadata) chunkMetaData, debug); - chunk.setFromOldFile(chunkMetaData.isFromOldTsFile()); - return new ChunkReader(chunk, timeFilter); + long t1 = System.nanoTime(); + try { + Chunk chunk = ChunkCache.getInstance().get((ChunkMetadata) chunkMetaData, debug); + chunk.setFromOldFile(chunkMetaData.isFromOldTsFile()); + + long t2 = System.nanoTime(); + IChunkReader chunkReader = new ChunkReader(chunk, timeFilter); + QUERY_METRICS.recordSeriesScanCost(INIT_CHUNK_READER_NONALIGNED_DISK, System.nanoTime() - t2); + + return chunkReader; + } finally { + QUERY_METRICS.recordSeriesScanCost( + CONSTRUCT_CHUNK_READER_NONALIGNED_DISK, System.nanoTime() - t1); + } } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedChunkLoader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedChunkLoader.java index b307af1b06..f134618025 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedChunkLoader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedChunkLoader.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.query.reader.chunk; import org.apache.iotdb.db.engine.querycontext.AlignedReadOnlyMemChunk; +import org.apache.iotdb.db.mpp.metric.QueryMetricsManager; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; import org.apache.iotdb.tsfile.read.common.Chunk; @@ -26,11 +27,16 @@ import org.apache.iotdb.tsfile.read.controller.IChunkLoader; import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.read.reader.IChunkReader; +import static org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet.CONSTRUCT_CHUNK_READER_ALIGNED_MEM; +import static org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet.INIT_CHUNK_READER_ALIGNED_MEM; + /** To read one aligned chunk from memory, and only used in iotdb server module */ public class MemAlignedChunkLoader implements IChunkLoader { private final AlignedReadOnlyMemChunk chunk; + private static final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance(); + public MemAlignedChunkLoader(AlignedReadOnlyMemChunk chunk) { this.chunk = chunk; } @@ -47,6 +53,13 @@ public class MemAlignedChunkLoader implements IChunkLoader { @Override public IChunkReader getChunkReader(IChunkMetadata chunkMetaData, Filter timeFilter) { - return new MemAlignedChunkReader(chunk, timeFilter); + long startTime = System.nanoTime(); + try { + return new MemAlignedChunkReader(chunk, timeFilter); + } finally { + long duration = System.nanoTime() - startTime; + QUERY_METRICS.recordSeriesScanCost(CONSTRUCT_CHUNK_READER_ALIGNED_MEM, duration); + QUERY_METRICS.recordSeriesScanCost(INIT_CHUNK_READER_ALIGNED_MEM, duration); + } } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemChunkLoader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemChunkLoader.java index eff76f62db..842d2bd394 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemChunkLoader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemChunkLoader.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.query.reader.chunk; import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk; +import org.apache.iotdb.db.mpp.metric.QueryMetricsManager; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; import org.apache.iotdb.tsfile.read.common.Chunk; @@ -27,11 +28,16 @@ import org.apache.iotdb.tsfile.read.controller.IChunkLoader; import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.read.reader.IChunkReader; +import static org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet.CONSTRUCT_CHUNK_READER_NONALIGNED_MEM; +import static org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet.INIT_CHUNK_READER_NONALIGNED_MEM; + /** To read one chunk from memory, and only used in iotdb server module */ public class MemChunkLoader implements IChunkLoader { private final ReadOnlyMemChunk chunk; + private static final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance(); + public MemChunkLoader(ReadOnlyMemChunk chunk) { this.chunk = chunk; } @@ -48,6 +54,13 @@ public class MemChunkLoader implements IChunkLoader { @Override public IChunkReader getChunkReader(IChunkMetadata chunkMetaData, Filter timeFilter) { - return new MemChunkReader(chunk, timeFilter); + long startTime = System.nanoTime(); + try { + return new MemChunkReader(chunk, timeFilter); + } finally { + long duration = System.nanoTime() - startTime; + QUERY_METRICS.recordSeriesScanCost(CONSTRUCT_CHUNK_READER_NONALIGNED_MEM, duration); + QUERY_METRICS.recordSeriesScanCost(INIT_CHUNK_READER_NONALIGNED_MEM, duration); + } } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskAlignedChunkMetadataLoader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskAlignedChunkMetadataLoader.java index 58d4a1c98a..9e8ec7ae4e 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskAlignedChunkMetadataLoader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskAlignedChunkMetadataLoader.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.query.reader.chunk.metadata; import org.apache.iotdb.commons.path.AlignedPath; import org.apache.iotdb.db.engine.modification.Modification; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.mpp.metric.QueryMetricsManager; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.reader.chunk.DiskAlignedChunkLoader; import org.apache.iotdb.db.utils.QueryUtils; @@ -37,6 +38,10 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; +import static org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet.CHUNK_METADATA_FILTER_ALIGNED_DISK; +import static org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet.CHUNK_METADATA_MODIFICATION_ALIGNED_DISK; +import static org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet.LOAD_CHUNK_METADATA_LIST_ALIGNED_DISK; + public class DiskAlignedChunkMetadataLoader implements IChunkMetadataLoader { private final TsFileResource resource; @@ -46,6 +51,7 @@ public class DiskAlignedChunkMetadataLoader implements IChunkMetadataLoader { private final Filter filter; private static final Logger DEBUG_LOGGER = LoggerFactory.getLogger("QUERY_DEBUG"); + private static final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance(); public DiskAlignedChunkMetadataLoader( TsFileResource resource, AlignedPath seriesPath, QueryContext context, Filter filter) { @@ -57,53 +63,65 @@ public class DiskAlignedChunkMetadataLoader implements IChunkMetadataLoader { @Override public List<IChunkMetadata> loadChunkMetadataList(ITimeSeriesMetadata timeSeriesMetadata) { - List<AlignedChunkMetadata> alignedChunkMetadataList = - ((AlignedTimeSeriesMetadata) timeSeriesMetadata).getCopiedChunkMetadataList(); - - // get all sub sensors' modifications - List<List<Modification>> pathModifications = - context.getPathModifications(resource.getModFile(), seriesPath); - - if (context.isDebug()) { - DEBUG_LOGGER.info( - "Modifications size is {} for file Path: {} ", - pathModifications.size(), - resource.getTsFilePath()); - pathModifications.forEach(c -> DEBUG_LOGGER.info(c.toString())); - } - - // remove ChunkMetadata that have been deleted - QueryUtils.modifyAlignedChunkMetaData(alignedChunkMetadataList, pathModifications); - - if (context.isDebug()) { - DEBUG_LOGGER.info("After modification Chunk meta data list is: "); - alignedChunkMetadataList.forEach(c -> DEBUG_LOGGER.info(c.toString())); + long t1 = System.nanoTime(); + try { + List<AlignedChunkMetadata> alignedChunkMetadataList = + ((AlignedTimeSeriesMetadata) timeSeriesMetadata).getCopiedChunkMetadataList(); + + long t2 = System.nanoTime(); + // get all sub sensors' modifications + List<List<Modification>> pathModifications = + context.getPathModifications(resource.getModFile(), seriesPath); + + if (context.isDebug()) { + DEBUG_LOGGER.info( + "Modifications size is {} for file Path: {} ", + pathModifications.size(), + resource.getTsFilePath()); + pathModifications.forEach(c -> DEBUG_LOGGER.info(c.toString())); + } + + // remove ChunkMetadata that have been deleted + QueryUtils.modifyAlignedChunkMetaData(alignedChunkMetadataList, pathModifications); + + if (context.isDebug()) { + DEBUG_LOGGER.info("After modification Chunk meta data list is: "); + alignedChunkMetadataList.forEach(c -> DEBUG_LOGGER.info(c.toString())); + } + QUERY_METRICS.recordSeriesScanCost( + CHUNK_METADATA_MODIFICATION_ALIGNED_DISK, System.nanoTime() - t2); + + // remove not satisfied ChunkMetaData + long t3 = System.nanoTime(); + alignedChunkMetadataList.removeIf( + alignedChunkMetaData -> + (filter != null + && !filter.satisfyStartEndTime( + alignedChunkMetaData.getStartTime(), alignedChunkMetaData.getEndTime())) + || alignedChunkMetaData.getStartTime() > alignedChunkMetaData.getEndTime()); + QUERY_METRICS.recordSeriesScanCost( + CHUNK_METADATA_FILTER_ALIGNED_DISK, System.nanoTime() - t3); + + // it is ok, even if it is not thread safe, because the cost of creating a DiskChunkLoader is + // very cheap. + alignedChunkMetadataList.forEach( + chunkMetadata -> { + if (chunkMetadata.needSetChunkLoader()) { + chunkMetadata.setFilePath(resource.getTsFilePath()); + chunkMetadata.setClosed(resource.isClosed()); + chunkMetadata.setChunkLoader(new DiskAlignedChunkLoader(context.isDebug())); + } + }); + + if (context.isDebug()) { + DEBUG_LOGGER.info("After removed by filter Chunk meta data list is: "); + alignedChunkMetadataList.forEach(c -> DEBUG_LOGGER.info(c.toString())); + } + + return new ArrayList<>(alignedChunkMetadataList); + } finally { + QUERY_METRICS.recordSeriesScanCost( + LOAD_CHUNK_METADATA_LIST_ALIGNED_DISK, System.nanoTime() - t1); } - - // remove not satisfied ChunkMetaData - alignedChunkMetadataList.removeIf( - alignedChunkMetaData -> - (filter != null - && !filter.satisfyStartEndTime( - alignedChunkMetaData.getStartTime(), alignedChunkMetaData.getEndTime())) - || alignedChunkMetaData.getStartTime() > alignedChunkMetaData.getEndTime()); - - // it is ok, even if it is not thread safe, because the cost of creating a DiskChunkLoader is - // very cheap. - alignedChunkMetadataList.forEach( - chunkMetadata -> { - if (chunkMetadata.needSetChunkLoader()) { - chunkMetadata.setFilePath(resource.getTsFilePath()); - chunkMetadata.setClosed(resource.isClosed()); - chunkMetadata.setChunkLoader(new DiskAlignedChunkLoader(context.isDebug())); - } - }); - - if (context.isDebug()) { - DEBUG_LOGGER.info("After removed by filter Chunk meta data list is: "); - alignedChunkMetadataList.forEach(c -> DEBUG_LOGGER.info(c.toString())); - } - - return new ArrayList<>(alignedChunkMetadataList); } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java index 7b8856ec69..e45d976caa 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.query.reader.chunk.metadata; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.engine.modification.Modification; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.mpp.metric.QueryMetricsManager; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.reader.chunk.DiskChunkLoader; import org.apache.iotdb.db.utils.QueryUtils; @@ -35,6 +36,10 @@ import org.slf4j.LoggerFactory; import java.util.List; +import static org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet.CHUNK_METADATA_FILTER_NONALIGNED_DISK; +import static org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet.CHUNK_METADATA_MODIFICATION_NONALIGNED_DISK; +import static org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet.LOAD_CHUNK_METADATA_LIST_NONALIGNED_DISK; + public class DiskChunkMetadataLoader implements IChunkMetadataLoader { private final TsFileResource resource; @@ -44,6 +49,7 @@ public class DiskChunkMetadataLoader implements IChunkMetadataLoader { private final Filter filter; private static final Logger DEBUG_LOGGER = LoggerFactory.getLogger("QUERY_DEBUG"); + private static final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance(); public DiskChunkMetadataLoader( TsFileResource resource, PartialPath seriesPath, QueryContext context, Filter filter) { @@ -55,63 +61,73 @@ public class DiskChunkMetadataLoader implements IChunkMetadataLoader { @Override public List<IChunkMetadata> loadChunkMetadataList(ITimeSeriesMetadata timeSeriesMetadata) { + long t1 = System.nanoTime(); + try { + List<IChunkMetadata> chunkMetadataList = + ((TimeseriesMetadata) timeSeriesMetadata).getCopiedChunkMetadataList(); + + long t2 = System.nanoTime(); + List<Modification> pathModifications = + context.getPathModifications(resource.getModFile(), seriesPath); + + if (context.isDebug()) { + DEBUG_LOGGER.info( + "Modifications size is {} for file Path: {} ", + pathModifications.size(), + resource.getTsFilePath()); + pathModifications.forEach(c -> DEBUG_LOGGER.info(c.toString())); + } - List<IChunkMetadata> chunkMetadataList = - ((TimeseriesMetadata) timeSeriesMetadata).getCopiedChunkMetadataList(); - - List<Modification> pathModifications = - context.getPathModifications(resource.getModFile(), seriesPath); - - if (context.isDebug()) { - DEBUG_LOGGER.info( - "Modifications size is {} for file Path: {} ", - pathModifications.size(), - resource.getTsFilePath()); - pathModifications.forEach(c -> DEBUG_LOGGER.info(c.toString())); - } + if (!pathModifications.isEmpty()) { + QueryUtils.modifyChunkMetaData(chunkMetadataList, pathModifications); + } - if (!pathModifications.isEmpty()) { - QueryUtils.modifyChunkMetaData(chunkMetadataList, pathModifications); - } + if (context.isDebug()) { + DEBUG_LOGGER.info("After modification Chunk meta data list is: "); + chunkMetadataList.forEach(c -> DEBUG_LOGGER.info(c.toString())); + } - if (context.isDebug()) { - DEBUG_LOGGER.info("After modification Chunk meta data list is: "); - chunkMetadataList.forEach(c -> DEBUG_LOGGER.info(c.toString())); - } + QUERY_METRICS.recordSeriesScanCost( + CHUNK_METADATA_MODIFICATION_NONALIGNED_DISK, System.nanoTime() - t2); + + // it is ok, even if it is not thread safe, because the cost of creating a DiskChunkLoader is + // very cheap. + chunkMetadataList.forEach( + chunkMetadata -> { + if (chunkMetadata.needSetChunkLoader()) { + chunkMetadata.setFilePath(resource.getTsFilePath()); + chunkMetadata.setClosed(resource.isClosed()); + chunkMetadata.setChunkLoader(new DiskChunkLoader(context.isDebug())); + } + }); + + // remove not satisfied ChunkMetaData + long t3 = System.nanoTime(); + chunkMetadataList.removeIf( + chunkMetaData -> + (filter != null + && !filter.satisfyStartEndTime( + chunkMetaData.getStartTime(), chunkMetaData.getEndTime())) + || chunkMetaData.getStartTime() > chunkMetaData.getEndTime()); + QUERY_METRICS.recordSeriesScanCost( + CHUNK_METADATA_FILTER_NONALIGNED_DISK, System.nanoTime() - t3); + + // For chunkMetadata from old TsFile, do not set version + for (IChunkMetadata metadata : chunkMetadataList) { + if (!metadata.isFromOldTsFile()) { + metadata.setVersion(resource.getVersion()); + } + } - // it is ok, even if it is not thread safe, because the cost of creating a DiskChunkLoader is - // very cheap. - chunkMetadataList.forEach( - chunkMetadata -> { - if (chunkMetadata.needSetChunkLoader()) { - chunkMetadata.setFilePath(resource.getTsFilePath()); - chunkMetadata.setClosed(resource.isClosed()); - chunkMetadata.setChunkLoader(new DiskChunkLoader(context.isDebug())); - } - }); - - /* - * remove not satisfied ChunkMetaData - */ - chunkMetadataList.removeIf( - chunkMetaData -> - (filter != null - && !filter.satisfyStartEndTime( - chunkMetaData.getStartTime(), chunkMetaData.getEndTime())) - || chunkMetaData.getStartTime() > chunkMetaData.getEndTime()); - - // For chunkMetadata from old TsFile, do not set version - for (IChunkMetadata metadata : chunkMetadataList) { - if (!metadata.isFromOldTsFile()) { - metadata.setVersion(resource.getVersion()); + if (context.isDebug()) { + DEBUG_LOGGER.info("After removed by filter Chunk meta data list is: "); + chunkMetadataList.forEach(c -> DEBUG_LOGGER.info(c.toString())); } - } - if (context.isDebug()) { - DEBUG_LOGGER.info("After removed by filter Chunk meta data list is: "); - chunkMetadataList.forEach(c -> DEBUG_LOGGER.info(c.toString())); + return chunkMetadataList; + } finally { + QUERY_METRICS.recordSeriesScanCost( + LOAD_CHUNK_METADATA_LIST_NONALIGNED_DISK, System.nanoTime() - t1); } - - return chunkMetadataList; } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/MemAlignedChunkMetadataLoader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/MemAlignedChunkMetadataLoader.java index 6595b2f0c1..8b6dfda2d3 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/MemAlignedChunkMetadataLoader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/MemAlignedChunkMetadataLoader.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.query.reader.chunk.metadata; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.mpp.metric.QueryMetricsManager; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.reader.chunk.DiskAlignedChunkLoader; import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; @@ -30,6 +31,9 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter; import java.util.List; +import static org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet.CHUNK_METADATA_FILTER_ALIGNED_MEM; +import static org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet.LOAD_CHUNK_METADATA_LIST_ALIGNED_MEM; + public class MemAlignedChunkMetadataLoader implements IChunkMetadataLoader { private final TsFileResource resource; @@ -37,6 +41,8 @@ public class MemAlignedChunkMetadataLoader implements IChunkMetadataLoader { private final QueryContext context; private final Filter timeFilter; + private static final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance(); + public MemAlignedChunkMetadataLoader( TsFileResource resource, PartialPath seriesPath, QueryContext context, Filter timeFilter) { this.resource = resource; @@ -47,43 +53,50 @@ public class MemAlignedChunkMetadataLoader implements IChunkMetadataLoader { @Override public List<IChunkMetadata> loadChunkMetadataList(ITimeSeriesMetadata timeSeriesMetadata) { + long t1 = System.nanoTime(); + try { + // There is no need to apply modifications to these, because we already do that while + // generating it in TSP + List<IChunkMetadata> chunkMetadataList = resource.getChunkMetadataList(seriesPath); - // There is no need to apply modifications to these, because we already do that while generating - // it in TSP - List<IChunkMetadata> chunkMetadataList = resource.getChunkMetadataList(seriesPath); + chunkMetadataList.forEach( + chunkMetadata -> { + if (chunkMetadata.needSetChunkLoader()) { + chunkMetadata.setFilePath(resource.getTsFilePath()); + chunkMetadata.setClosed(resource.isClosed()); + chunkMetadata.setChunkLoader(new DiskAlignedChunkLoader(context.isDebug())); + } + }); - chunkMetadataList.forEach( - chunkMetadata -> { - if (chunkMetadata.needSetChunkLoader()) { - chunkMetadata.setFilePath(resource.getTsFilePath()); - chunkMetadata.setClosed(resource.isClosed()); - chunkMetadata.setChunkLoader(new DiskAlignedChunkLoader(context.isDebug())); + // There is no need to set IChunkLoader for it, because the MemChunkLoader has already been + // set + // while creating ReadOnlyMemChunk + List<ReadOnlyMemChunk> memChunks = resource.getReadOnlyMemChunk(seriesPath); + if (memChunks != null) { + for (ReadOnlyMemChunk readOnlyMemChunk : memChunks) { + if (!memChunks.isEmpty()) { + chunkMetadataList.add(readOnlyMemChunk.getChunkMetaData()); } - }); - - // There is no need to set IChunkLoader for it, because the MemChunkLoader has already been set - // while creating ReadOnlyMemChunk - List<ReadOnlyMemChunk> memChunks = resource.getReadOnlyMemChunk(seriesPath); - if (memChunks != null) { - for (ReadOnlyMemChunk readOnlyMemChunk : memChunks) { - if (!memChunks.isEmpty()) { - chunkMetadataList.add(readOnlyMemChunk.getChunkMetaData()); } } - } - /* - * remove not satisfied ChunkMetaData - */ - chunkMetadataList.removeIf( - chunkMetaData -> - (timeFilter != null - && !timeFilter.satisfyStartEndTime( - chunkMetaData.getStartTime(), chunkMetaData.getEndTime())) - || chunkMetaData.getStartTime() > chunkMetaData.getEndTime()); - for (IChunkMetadata metadata : chunkMetadataList) { - metadata.setVersion(resource.getVersion()); + // remove not satisfied ChunkMetaData + long t2 = System.nanoTime(); + chunkMetadataList.removeIf( + chunkMetaData -> + (timeFilter != null + && !timeFilter.satisfyStartEndTime( + chunkMetaData.getStartTime(), chunkMetaData.getEndTime())) + || chunkMetaData.getStartTime() > chunkMetaData.getEndTime()); + QUERY_METRICS.recordSeriesScanCost(CHUNK_METADATA_FILTER_ALIGNED_MEM, System.nanoTime() - t2); + + for (IChunkMetadata metadata : chunkMetadataList) { + metadata.setVersion(resource.getVersion()); + } + return chunkMetadataList; + } finally { + QUERY_METRICS.recordSeriesScanCost( + LOAD_CHUNK_METADATA_LIST_ALIGNED_MEM, System.nanoTime() - t1); } - return chunkMetadataList; } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/MemChunkMetadataLoader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/MemChunkMetadataLoader.java index 4e88609901..7e60df653d 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/MemChunkMetadataLoader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/MemChunkMetadataLoader.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.query.reader.chunk.metadata; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.mpp.metric.QueryMetricsManager; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.reader.chunk.DiskChunkLoader; import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; @@ -30,6 +31,9 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter; import java.util.List; +import static org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet.CHUNK_METADATA_FILTER_NONALIGNED_MEM; +import static org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet.LOAD_CHUNK_METADATA_LIST_NONALIGNED_MEM; + public class MemChunkMetadataLoader implements IChunkMetadataLoader { private final TsFileResource resource; @@ -37,6 +41,8 @@ public class MemChunkMetadataLoader implements IChunkMetadataLoader { private final QueryContext context; private final Filter timeFilter; + private static final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance(); + public MemChunkMetadataLoader( TsFileResource resource, PartialPath seriesPath, QueryContext context, Filter timeFilter) { this.resource = resource; @@ -47,42 +53,50 @@ public class MemChunkMetadataLoader implements IChunkMetadataLoader { @Override public List<IChunkMetadata> loadChunkMetadataList(ITimeSeriesMetadata timeSeriesMetadata) { - // There is no need to apply modifications to these, because we already do that while generating - // it in TSP - List<IChunkMetadata> chunkMetadataList = resource.getChunkMetadataList(seriesPath); + long t1 = System.nanoTime(); + try { + // There is no need to apply modifications to these, because we already do that while + // generating it in TSP + List<IChunkMetadata> chunkMetadataList = resource.getChunkMetadataList(seriesPath); - // it is ok, even if it is not thread safe, because the cost of creating a DiskChunkLoader is - // very cheap. - chunkMetadataList.forEach( - chunkMetadata -> { - if (chunkMetadata.needSetChunkLoader()) { - chunkMetadata.setFilePath(resource.getTsFilePath()); - chunkMetadata.setClosed(resource.isClosed()); - chunkMetadata.setChunkLoader(new DiskChunkLoader(context.isDebug())); - } - }); + // it is ok, even if it is not thread safe, because the cost of creating a DiskChunkLoader is + // very cheap. + chunkMetadataList.forEach( + chunkMetadata -> { + if (chunkMetadata.needSetChunkLoader()) { + chunkMetadata.setFilePath(resource.getTsFilePath()); + chunkMetadata.setClosed(resource.isClosed()); + chunkMetadata.setChunkLoader(new DiskChunkLoader(context.isDebug())); + } + }); - List<ReadOnlyMemChunk> memChunks = resource.getReadOnlyMemChunk(seriesPath); - if (memChunks != null) { - for (ReadOnlyMemChunk readOnlyMemChunk : memChunks) { - if (!memChunks.isEmpty()) { - chunkMetadataList.add(readOnlyMemChunk.getChunkMetaData()); + List<ReadOnlyMemChunk> memChunks = resource.getReadOnlyMemChunk(seriesPath); + if (memChunks != null) { + for (ReadOnlyMemChunk readOnlyMemChunk : memChunks) { + if (!memChunks.isEmpty()) { + chunkMetadataList.add(readOnlyMemChunk.getChunkMetaData()); + } } } - } - /* - * remove not satisfied ChunkMetaData - */ - chunkMetadataList.removeIf( - chunkMetaData -> - (timeFilter != null - && !timeFilter.satisfyStartEndTime( - chunkMetaData.getStartTime(), chunkMetaData.getEndTime())) - || chunkMetaData.getStartTime() > chunkMetaData.getEndTime()); - for (IChunkMetadata metadata : chunkMetadataList) { - metadata.setVersion(resource.getVersion()); + // remove not satisfied ChunkMetaData + long t2 = System.nanoTime(); + chunkMetadataList.removeIf( + chunkMetaData -> + (timeFilter != null + && !timeFilter.satisfyStartEndTime( + chunkMetaData.getStartTime(), chunkMetaData.getEndTime())) + || chunkMetaData.getStartTime() > chunkMetaData.getEndTime()); + QUERY_METRICS.recordSeriesScanCost( + CHUNK_METADATA_FILTER_NONALIGNED_MEM, System.nanoTime() - t2); + + for (IChunkMetadata metadata : chunkMetadataList) { + metadata.setVersion(resource.getVersion()); + } + return chunkMetadataList; + } finally { + QUERY_METRICS.recordSeriesScanCost( + LOAD_CHUNK_METADATA_LIST_NONALIGNED_MEM, System.nanoTime() - t1); } - return chunkMetadataList; } } diff --git a/server/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java b/server/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java index 920080ec39..22096f8986 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java +++ b/server/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.service.metrics; import org.apache.iotdb.commons.service.metric.MetricService; import org.apache.iotdb.db.mpp.metric.QueryPlanCostMetricSet; +import org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet; import org.apache.iotdb.metrics.metricsets.jvm.JvmMetrics; import org.apache.iotdb.metrics.metricsets.logback.LogbackMetrics; @@ -35,5 +36,6 @@ public class DataNodeMetricsHelper { // bind query related metrics MetricService.getInstance().addMetricSet(new QueryPlanCostMetricSet()); + MetricService.getInstance().addMetricSet(new SeriesScanCostMetricSet()); } } diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java index bfd6a0bf80..093fe15c5d 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java @@ -25,6 +25,7 @@ import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache.TimeSeriesMetada import org.apache.iotdb.db.engine.modification.Modification; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus; +import org.apache.iotdb.db.mpp.metric.QueryMetricsManager; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.reader.chunk.metadata.DiskAlignedChunkMetadataLoader; import org.apache.iotdb.db.query.reader.chunk.metadata.DiskChunkMetadataLoader; @@ -51,8 +52,17 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import static org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet.LOAD_TIMESERIES_METADATA_ALIGNED_DISK; +import static org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet.LOAD_TIMESERIES_METADATA_ALIGNED_MEM; +import static org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet.LOAD_TIMESERIES_METADATA_NONALIGNED_DISK; +import static org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet.LOAD_TIMESERIES_METADATA_NONALIGNED_MEM; +import static org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet.TIMESERIES_METADATA_MODIFICATION_ALIGNED; +import static org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet.TIMESERIES_METADATA_MODIFICATION_NONALIGNED; + public class FileLoaderUtils { + private static final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance(); + private FileLoaderUtils() {} public static void loadOrGenerateResource(TsFileResource tsFileResource) throws IOException { @@ -124,51 +134,68 @@ public class FileLoaderUtils { Filter filter, Set<String> allSensors) throws IOException { + long t1 = System.nanoTime(); + boolean loadFromMem = false; + try { + // common path + TimeseriesMetadata timeSeriesMetadata; + // If the tsfile is closed, we need to load from tsfile + if (resource.isClosed()) { + // when resource.getTimeIndexType() == 1, TsFileResource.timeIndexType is deviceTimeIndex + // we should not ignore the non-exist of device in TsFileMetadata + timeSeriesMetadata = + TimeSeriesMetadataCache.getInstance() + .get( + new TimeSeriesMetadataCache.TimeSeriesMetadataCacheKey( + resource.getTsFilePath(), + seriesPath.getDevice(), + seriesPath.getMeasurement()), + allSensors, + resource.getTimeIndexType() != 1, + context.isDebug()); + if (timeSeriesMetadata != null) { + timeSeriesMetadata.setChunkMetadataLoader( + new DiskChunkMetadataLoader(resource, seriesPath, context, filter)); + } + } else { // if the tsfile is unclosed, we just get it directly from TsFileResource + loadFromMem = true; - // common path - TimeseriesMetadata timeSeriesMetadata; - // If the tsfile is closed, we need to load from tsfile - if (resource.isClosed()) { - // when resource.getTimeIndexType() == 1, TsFileResource.timeIndexType is deviceTimeIndex - // we should not ignore the non-exist of device in TsFileMetadata - timeSeriesMetadata = - TimeSeriesMetadataCache.getInstance() - .get( - new TimeSeriesMetadataCache.TimeSeriesMetadataCacheKey( - resource.getTsFilePath(), - seriesPath.getDevice(), - seriesPath.getMeasurement()), - allSensors, - resource.getTimeIndexType() != 1, - context.isDebug()); - if (timeSeriesMetadata != null) { - timeSeriesMetadata.setChunkMetadataLoader( - new DiskChunkMetadataLoader(resource, seriesPath, context, filter)); - } - } else { // if the tsfile is unclosed, we just get it directly from TsFileResource - timeSeriesMetadata = (TimeseriesMetadata) resource.getTimeSeriesMetadata(seriesPath); - if (timeSeriesMetadata != null) { - timeSeriesMetadata.setChunkMetadataLoader( - new MemChunkMetadataLoader(resource, seriesPath, context, filter)); + timeSeriesMetadata = (TimeseriesMetadata) resource.getTimeSeriesMetadata(seriesPath); + if (timeSeriesMetadata != null) { + timeSeriesMetadata.setChunkMetadataLoader( + new MemChunkMetadataLoader(resource, seriesPath, context, filter)); + } } - } - if (timeSeriesMetadata != null) { - List<Modification> pathModifications = - context.getPathModifications(resource.getModFile(), seriesPath); - timeSeriesMetadata.setModified(!pathModifications.isEmpty()); - if (timeSeriesMetadata.getStatistics().getStartTime() - > timeSeriesMetadata.getStatistics().getEndTime()) { - return null; - } - if (filter != null - && !filter.satisfyStartEndTime( - timeSeriesMetadata.getStatistics().getStartTime(), - timeSeriesMetadata.getStatistics().getEndTime())) { - return null; + if (timeSeriesMetadata != null) { + long t2 = System.nanoTime(); + try { + List<Modification> pathModifications = + context.getPathModifications(resource.getModFile(), seriesPath); + timeSeriesMetadata.setModified(!pathModifications.isEmpty()); + if (timeSeriesMetadata.getStatistics().getStartTime() + > timeSeriesMetadata.getStatistics().getEndTime()) { + return null; + } + if (filter != null + && !filter.satisfyStartEndTime( + timeSeriesMetadata.getStatistics().getStartTime(), + timeSeriesMetadata.getStatistics().getEndTime())) { + return null; + } + } finally { + QUERY_METRICS.recordSeriesScanCost( + TIMESERIES_METADATA_MODIFICATION_NONALIGNED, System.nanoTime() - t2); + } } + return timeSeriesMetadata; + } finally { + QUERY_METRICS.recordSeriesScanCost( + loadFromMem + ? LOAD_TIMESERIES_METADATA_NONALIGNED_MEM + : LOAD_TIMESERIES_METADATA_NONALIGNED_DISK, + System.nanoTime() - t1); } - return timeSeriesMetadata; } /** @@ -181,87 +208,105 @@ public class FileLoaderUtils { public static AlignedTimeSeriesMetadata loadTimeSeriesMetadata( TsFileResource resource, AlignedPath vectorPath, QueryContext context, Filter filter) throws IOException { - AlignedTimeSeriesMetadata alignedTimeSeriesMetadata = null; - // If the tsfile is closed, we need to load from tsfile - if (resource.isClosed()) { - // load all the TimeseriesMetadata of vector, the first one is for time column and the - // remaining is for sub sensors - // the order of timeSeriesMetadata list is same as subSensorList's order - TimeSeriesMetadataCache cache = TimeSeriesMetadataCache.getInstance(); - List<String> valueMeasurementList = vectorPath.getMeasurementList(); - Set<String> allSensors = new HashSet<>(valueMeasurementList); - allSensors.add(""); - boolean isDebug = context.isDebug(); - String filePath = resource.getTsFilePath(); - String deviceId = vectorPath.getDevice(); + long t1 = System.nanoTime(); + boolean loadFromMem = false; + try { + AlignedTimeSeriesMetadata alignedTimeSeriesMetadata = null; + // If the tsfile is closed, we need to load from tsfile + if (resource.isClosed()) { + // load all the TimeseriesMetadata of vector, the first one is for time column and the + // remaining is for sub sensors + // the order of timeSeriesMetadata list is same as subSensorList's order + TimeSeriesMetadataCache cache = TimeSeriesMetadataCache.getInstance(); + List<String> valueMeasurementList = vectorPath.getMeasurementList(); + Set<String> allSensors = new HashSet<>(valueMeasurementList); + allSensors.add(""); + boolean isDebug = context.isDebug(); + String filePath = resource.getTsFilePath(); + String deviceId = vectorPath.getDevice(); - // when resource.getTimeIndexType() == 1, TsFileResource.timeIndexType is deviceTimeIndex - // we should not ignore the non-exist of device in TsFileMetadata - TimeseriesMetadata timeColumn = - cache.get( - new TimeSeriesMetadataCacheKey(filePath, deviceId, ""), - allSensors, - resource.getTimeIndexType() != 1, - isDebug); - if (timeColumn != null) { - List<TimeseriesMetadata> valueTimeSeriesMetadataList = - new ArrayList<>(valueMeasurementList.size()); - // if all the queried aligned sensors does not exist, we will return null - boolean exist = false; - for (String valueMeasurement : valueMeasurementList) { - TimeseriesMetadata valueColumn = - cache.get( - new TimeSeriesMetadataCacheKey(filePath, deviceId, valueMeasurement), - allSensors, - resource.getTimeIndexType() != 1, - isDebug); - exist = (exist || (valueColumn != null)); - valueTimeSeriesMetadataList.add(valueColumn); + // when resource.getTimeIndexType() == 1, TsFileResource.timeIndexType is deviceTimeIndex + // we should not ignore the non-exist of device in TsFileMetadata + TimeseriesMetadata timeColumn = + cache.get( + new TimeSeriesMetadataCacheKey(filePath, deviceId, ""), + allSensors, + resource.getTimeIndexType() != 1, + isDebug); + if (timeColumn != null) { + List<TimeseriesMetadata> valueTimeSeriesMetadataList = + new ArrayList<>(valueMeasurementList.size()); + // if all the queried aligned sensors does not exist, we will return null + boolean exist = false; + for (String valueMeasurement : valueMeasurementList) { + TimeseriesMetadata valueColumn = + cache.get( + new TimeSeriesMetadataCacheKey(filePath, deviceId, valueMeasurement), + allSensors, + resource.getTimeIndexType() != 1, + isDebug); + exist = (exist || (valueColumn != null)); + valueTimeSeriesMetadataList.add(valueColumn); + } + if (exist) { + alignedTimeSeriesMetadata = + new AlignedTimeSeriesMetadata(timeColumn, valueTimeSeriesMetadataList); + alignedTimeSeriesMetadata.setChunkMetadataLoader( + new DiskAlignedChunkMetadataLoader(resource, vectorPath, context, filter)); + } } - if (exist) { - alignedTimeSeriesMetadata = - new AlignedTimeSeriesMetadata(timeColumn, valueTimeSeriesMetadataList); + } else { // if the tsfile is unclosed, we just get it directly from TsFileResource + loadFromMem = true; + + alignedTimeSeriesMetadata = + (AlignedTimeSeriesMetadata) resource.getTimeSeriesMetadata(vectorPath); + if (alignedTimeSeriesMetadata != null) { alignedTimeSeriesMetadata.setChunkMetadataLoader( - new DiskAlignedChunkMetadataLoader(resource, vectorPath, context, filter)); + new MemAlignedChunkMetadataLoader(resource, vectorPath, context, filter)); } } - } else { // if the tsfile is unclosed, we just get it directly from TsFileResource - alignedTimeSeriesMetadata = - (AlignedTimeSeriesMetadata) resource.getTimeSeriesMetadata(vectorPath); - if (alignedTimeSeriesMetadata != null) { - alignedTimeSeriesMetadata.setChunkMetadataLoader( - new MemAlignedChunkMetadataLoader(resource, vectorPath, context, filter)); - } - } - if (alignedTimeSeriesMetadata != null) { - if (alignedTimeSeriesMetadata.getTimeseriesMetadata().getStatistics().getStartTime() - > alignedTimeSeriesMetadata.getTimeseriesMetadata().getStatistics().getEndTime()) { - return null; - } - if (filter != null - && !filter.satisfyStartEndTime( - alignedTimeSeriesMetadata.getTimeseriesMetadata().getStatistics().getStartTime(), - alignedTimeSeriesMetadata.getTimeseriesMetadata().getStatistics().getEndTime())) { - return null; - } + if (alignedTimeSeriesMetadata != null) { + long t2 = System.nanoTime(); + try { + if (alignedTimeSeriesMetadata.getTimeseriesMetadata().getStatistics().getStartTime() + > alignedTimeSeriesMetadata.getTimeseriesMetadata().getStatistics().getEndTime()) { + return null; + } + if (filter != null + && !filter.satisfyStartEndTime( + alignedTimeSeriesMetadata.getTimeseriesMetadata().getStatistics().getStartTime(), + alignedTimeSeriesMetadata.getTimeseriesMetadata().getStatistics().getEndTime())) { + return null; + } - // set modifications to each aligned path - List<TimeseriesMetadata> valueTimeSeriesMetadataList = - alignedTimeSeriesMetadata.getValueTimeseriesMetadataList(); - boolean modified = false; - for (int i = 0; i < valueTimeSeriesMetadataList.size(); i++) { - if (valueTimeSeriesMetadataList.get(i) != null) { - List<Modification> pathModifications = - context.getPathModifications( - resource.getModFile(), vectorPath.getPathWithMeasurement(i)); - valueTimeSeriesMetadataList.get(i).setModified(!pathModifications.isEmpty()); - modified = (modified || !pathModifications.isEmpty()); + // set modifications to each aligned path + List<TimeseriesMetadata> valueTimeSeriesMetadataList = + alignedTimeSeriesMetadata.getValueTimeseriesMetadataList(); + boolean modified = false; + for (int i = 0; i < valueTimeSeriesMetadataList.size(); i++) { + if (valueTimeSeriesMetadataList.get(i) != null) { + List<Modification> pathModifications = + context.getPathModifications( + resource.getModFile(), vectorPath.getPathWithMeasurement(i)); + valueTimeSeriesMetadataList.get(i).setModified(!pathModifications.isEmpty()); + modified = (modified || !pathModifications.isEmpty()); + } + } + alignedTimeSeriesMetadata.getTimeseriesMetadata().setModified(modified); + } finally { + QUERY_METRICS.recordSeriesScanCost( + TIMESERIES_METADATA_MODIFICATION_ALIGNED, System.nanoTime() - t2); } } - alignedTimeSeriesMetadata.getTimeseriesMetadata().setModified(modified); + return alignedTimeSeriesMetadata; + } finally { + QUERY_METRICS.recordSeriesScanCost( + loadFromMem + ? LOAD_TIMESERIES_METADATA_ALIGNED_MEM + : LOAD_TIMESERIES_METADATA_ALIGNED_DISK, + System.nanoTime() - t1); } - return alignedTimeSeriesMetadata; } /** diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/AlignedChunkReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/AlignedChunkReader.java index d1173ab6b3..b65b8a12a0 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/AlignedChunkReader.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/AlignedChunkReader.java @@ -73,21 +73,25 @@ public class AlignedChunkReader implements IChunkReader { */ public AlignedChunkReader(Chunk timeChunk, List<Chunk> valueChunkList, Filter filter) throws IOException { - this.filter = filter; - this.timeChunkDataBuffer = timeChunk.getData(); - this.valueDeleteIntervalList = new ArrayList<>(); - this.timeChunkHeader = timeChunk.getHeader(); - this.unCompressor = IUnCompressor.getUnCompressor(timeChunkHeader.getCompressionType()); - this.currentTimestamp = Long.MIN_VALUE; - List<Statistics> valueChunkStatisticsList = new ArrayList<>(); - valueChunkList.forEach( - chunk -> { - valueChunkHeaderList.add(chunk == null ? null : chunk.getHeader()); - valueChunkDataBufferList.add(chunk == null ? null : chunk.getData()); - valueChunkStatisticsList.add(chunk == null ? null : chunk.getChunkStatistic()); - valueDeleteIntervalList.add(chunk == null ? null : chunk.getDeleteIntervalList()); - }); - initAllPageReaders(timeChunk.getChunkStatistic(), valueChunkStatisticsList); + try { + this.filter = filter; + this.timeChunkDataBuffer = timeChunk.getData(); + this.valueDeleteIntervalList = new ArrayList<>(); + this.timeChunkHeader = timeChunk.getHeader(); + this.unCompressor = IUnCompressor.getUnCompressor(timeChunkHeader.getCompressionType()); + this.currentTimestamp = Long.MIN_VALUE; + List<Statistics> valueChunkStatisticsList = new ArrayList<>(); + valueChunkList.forEach( + chunk -> { + valueChunkHeaderList.add(chunk == null ? null : chunk.getHeader()); + valueChunkDataBufferList.add(chunk == null ? null : chunk.getData()); + valueChunkStatisticsList.add(chunk == null ? null : chunk.getChunkStatistic()); + valueDeleteIntervalList.add(chunk == null ? null : chunk.getDeleteIntervalList()); + }); + initAllPageReaders(timeChunk.getChunkStatistic(), valueChunkStatisticsList); + } finally { + + } } /**
