This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch IOTDB-6115-1.2 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f7a30b9746a3e1e2f7908d2690712050b48a7f62 Author: Jackie Tien <[email protected]> AuthorDate: Fri Aug 18 09:47:17 2023 +0800 [IOTDB-6115] Fix Limit & Offset push down doesn't take effect while there exist null value (cherry picked from commit 97e083efcec556c3ae1e3ba3609496fd4dbb1cbe) --- .../operator/source/AlignedSeriesScanOperator.java | 9 ++- .../operator/source/AlignedSeriesScanUtil.java | 67 +++++++++++++++++----- .../db/queryengine/plan/analyze/Analysis.java | 12 ++++ .../queryengine/plan/analyze/AnalyzeVisitor.java | 1 + .../db/queryengine/plan/parser/ASTVisitor.java | 8 +++ .../plan/planner/LogicalPlanBuilder.java | 10 +++- .../plan/planner/LogicalPlanVisitor.java | 6 +- .../plan/planner/OperatorTreeGenerator.java | 3 +- .../plan/planner/plan/node/PlanGraphPrinter.java | 1 + .../plan/node/source/AlignedSeriesScanNode.java | 37 ++++++++++-- .../plan/statement/crud/QueryStatement.java | 12 ++++ .../memtable/AlignedReadOnlyMemChunk.java | 12 +++- .../read/reader/chunk/DiskAlignedChunkLoader.java | 13 ++++- .../read/reader/chunk/MemAlignedChunkLoader.java | 11 +++- .../read/reader/chunk/MemAlignedChunkReader.java | 8 ++- .../read/reader/chunk/MemAlignedPageReader.java | 31 +++++++--- .../metadata/DiskAlignedChunkMetadataLoader.java | 16 +++++- .../metadata/MemAlignedChunkMetadataLoader.java | 19 +++++- .../org/apache/iotdb/db/utils/FileLoaderUtils.java | 21 +++++-- .../operator/AlignedSeriesScanOperatorTest.java | 15 +++-- .../execution/operator/OperatorMemoryTest.java | 3 +- .../plan/plan/QueryLogicalPlanUtil.java | 5 +- .../reader/chunk/MemAlignedChunkLoaderTest.java | 2 +- .../read/TsFileAlignedSeriesReaderIterator.java | 2 +- .../read/reader/chunk/AlignedChunkReader.java | 22 ++++--- .../tsfile/read/reader/page/AlignedPageReader.java | 29 +++++++--- .../read/reader/series/FileSeriesReader.java | 2 +- .../tsfile/write/TsFileIntegrityCheckingTool.java | 3 +- 28 files changed, 300 insertions(+), 80 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java index fcf04159acb..06e7953b715 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java @@ -46,12 +46,17 @@ public class AlignedSeriesScanOperator extends AbstractDataSourceOperator { PlanNodeId sourceId, AlignedPath seriesPath, Ordering scanOrder, - SeriesScanOptions seriesScanOptions) { + SeriesScanOptions seriesScanOptions, + boolean queryAllSensors) { this.sourceId = sourceId; this.operatorContext = context; this.seriesScanUtil = new AlignedSeriesScanUtil( - seriesPath, scanOrder, seriesScanOptions, context.getInstanceContext()); + seriesPath, + scanOrder, + seriesScanOptions, + context.getInstanceContext(), + queryAllSensors); // time + all value columns this.builder = new TsBlockBuilder(seriesScanUtil.getTsDataTypeList()); this.valueColumnCount = seriesPath.getColumnNum(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanUtil.java index d36de2e4bf7..300412081f4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanUtil.java @@ -49,16 +49,32 @@ public class AlignedSeriesScanUtil extends SeriesScanUtil { private final List<TSDataType> dataTypes; + // only used for limit and offset push down optimizer, if we select all columns from aligned + // device, we + // can use statistics to skip. + // it's only exact while using limit & offset push down + private final boolean queryAllSensors; + public AlignedSeriesScanUtil( PartialPath seriesPath, Ordering scanOrder, SeriesScanOptions scanOptions, FragmentInstanceContext context) { + this(seriesPath, scanOrder, scanOptions, context, false); + } + + public AlignedSeriesScanUtil( + PartialPath seriesPath, + Ordering scanOrder, + SeriesScanOptions scanOptions, + FragmentInstanceContext context, + boolean queryAllSensors) { super(seriesPath, scanOrder, scanOptions, context); dataTypes = ((AlignedPath) seriesPath) .getSchemaList().stream().map(IMeasurementSchema::getType).collect(Collectors.toList()); isAligned = true; + this.queryAllSensors = queryAllSensors; } @SuppressWarnings("squid:S3740") @@ -122,7 +138,7 @@ public class AlignedSeriesScanUtil extends SeriesScanUtil { Set<String> allSensors) throws IOException { return FileLoaderUtils.loadTimeSeriesMetadata( - resource, (AlignedPath) seriesPath, context, filter); + resource, (AlignedPath) seriesPath, context, filter, queryAllSensors); } @Override @@ -152,19 +168,31 @@ public class AlignedSeriesScanUtil extends SeriesScanUtil { @SuppressWarnings("squid:S3740") private void skipOffsetByTimeSeriesMetadata() { - // For aligned series, When we only query some measurements under an aligned device, if the - // values of these queried measurements at a timestamp are all null, the timestamp will not - // be selected. + // For aligned series, When we only query some measurements under an aligned device, if any + // values of these queried measurements has the same value count as the time column, the + // timestamp will be selected. // NOTE: if we change the query semantic in the future for aligned series, we need to remove // this check here. long rowCount = ((AlignedTimeSeriesMetadata) firstTimeSeriesMetadata).getTimeStatistics().getCount(); - for (Statistics statistics : - ((AlignedTimeSeriesMetadata) firstTimeSeriesMetadata).getValueStatisticsList()) { - if (statistics == null || statistics.hasNullValue(rowCount)) { - return; + boolean canUse = + queryAllSensors + || ((AlignedTimeSeriesMetadata) firstTimeSeriesMetadata) + .getValueStatisticsList() + .isEmpty(); + if (!canUse) { + for (Statistics statistics : + ((AlignedTimeSeriesMetadata) firstTimeSeriesMetadata).getValueStatisticsList()) { + if (statistics != null && !statistics.hasNullValue(rowCount)) { + canUse = true; + break; + } } } + + if (!canUse) { + return; + } // When the number of points in all value chunk groups is the same as that in the time chunk // group, it means that there is no null value, and all timestamps will be selected. if (paginationController.hasCurOffset(rowCount)) { @@ -188,18 +216,27 @@ public class AlignedSeriesScanUtil extends SeriesScanUtil { @SuppressWarnings("squid:S3740") private void skipOffsetByChunkMetadata() { - // For aligned series, When we only query some measurements under an aligned device, if the - // values of these queried measurements at a timestamp are all null, the timestamp will not - // be selected. + // For aligned series, When we only query some measurements under an aligned device, if any + // values of these queried measurements has the same value count as the time column, the + // timestamp will be selected. // NOTE: if we change the query semantic in the future for aligned series, we need to remove // this check here. long rowCount = firstChunkMetadata.getStatistics().getCount(); - for (Statistics statistics : - ((AlignedChunkMetadata) firstChunkMetadata).getValueStatisticsList()) { - if (statistics == null || statistics.hasNullValue(rowCount)) { - return; + boolean canUse = + queryAllSensors + || ((AlignedChunkMetadata) firstChunkMetadata).getValueStatisticsList().isEmpty(); + if (!canUse) { + for (Statistics statistics : + ((AlignedChunkMetadata) firstChunkMetadata).getValueStatisticsList()) { + if (statistics != null && !statistics.hasNullValue(rowCount)) { + canUse = true; + break; + } } } + if (!canUse) { + return; + } // When the number of points in all value chunks is the same as that in the time chunk, it // means that there is no null value, and all timestamps will be selected. if (paginationController.hasCurOffset(rowCount)) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java index 05329d849e2..1b900c0e425 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java @@ -255,6 +255,10 @@ public class Analysis { // extra message from config node, queries wll be sent to these Running DataNodes private List<TDataNodeLocation> runningDataNodeLocations; + // used for limit and offset push down optimizer, if we select all columns from aligned device, we + // can use statistics to skip + private boolean lastLevelUseWildcard = false; + public Analysis() { this.finishQueryAfterAnalyze = false; } @@ -732,4 +736,12 @@ public class Analysis { public Map<String, Set<Expression>> getDeviceToOutputExpressions() { return deviceToOutputExpressions; } + + public boolean isLastLevelUseWildcard() { + return lastLevelUseWildcard; + } + + public void setLastLevelUseWildcard(boolean lastLevelUseWildcard) { + this.lastLevelUseWildcard = lastLevelUseWildcard; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java index 50fa6b50988..5f56411ac91 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java @@ -226,6 +226,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> @Override public Analysis visitQuery(QueryStatement queryStatement, MPPQueryContext context) { Analysis analysis = new Analysis(); + analysis.setLastLevelUseWildcard(queryStatement.isLastLevelUseWildcard()); try { // check for semantic errors queryStatement.semanticCheck(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java index b57af1905f3..c03775b3d39 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java @@ -263,6 +263,8 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> { private boolean useWildcard = false; + private boolean lastLevelUseWildcard = false; + public void setZoneId(ZoneId zoneId) { this.zoneId = zoneId; } @@ -1390,6 +1392,7 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> { } queryStatement.setUseWildcard(useWildcard); + queryStatement.setLastLevelUseWildcard(lastLevelUseWildcard); return queryStatement; } @@ -1938,6 +1941,11 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> { path[i] = parseNodeName(nodeNames.get(i)); } } + if (!lastLevelUseWildcard + && !nodeNames.isEmpty() + && !nodeNames.get(nodeNames.size() - 1).wildcard().isEmpty()) { + lastLevelUseWildcard = true; + } return new PartialPath(path); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java index 0b80f58961c..346888d7ac9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java @@ -168,7 +168,10 @@ public class LogicalPlanBuilder { } public LogicalPlanBuilder planRawDataSource( - Set<Expression> sourceExpressions, Ordering scanOrder, Filter timeFilter) { + Set<Expression> sourceExpressions, + Ordering scanOrder, + Filter timeFilter, + boolean lastLevelUseWildcard) { List<PlanNode> sourceNodeList = new ArrayList<>(); List<PartialPath> selectedPaths = sourceExpressions.stream() @@ -187,7 +190,10 @@ public class LogicalPlanBuilder { } else if (path instanceof AlignedPath) { // aligned series AlignedSeriesScanNode alignedSeriesScanNode = new AlignedSeriesScanNode( - context.getQueryId().genPlanNodeId(), (AlignedPath) path, scanOrder); + context.getQueryId().genPlanNodeId(), + (AlignedPath) path, + scanOrder, + lastLevelUseWildcard); alignedSeriesScanNode.setTimeFilter(timeFilter); // TODO: push down value filter alignedSeriesScanNode.setValueFilter(timeFilter); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java index 2ab1fac6bbc..86b98eda96a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java @@ -234,7 +234,8 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte .planRawDataSource( sourceExpressions, queryStatement.getResultTimeOrder(), - analysis.getGlobalTimeFilter()) + analysis.getGlobalTimeFilter(), + analysis.isLastLevelUseWildcard()) .planWhereAndSourceTransform( whereExpression, sourceTransformExpressions, @@ -255,7 +256,8 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte .planRawDataSource( sourceExpressions, queryStatement.getResultTimeOrder(), - analysis.getGlobalTimeFilter()) + analysis.getGlobalTimeFilter(), + analysis.isLastLevelUseWildcard()) .planWhereAndSourceTransform( whereExpression, sourceTransformExpressions, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java index 4dfec56a44e..48d2417f8a5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java @@ -344,7 +344,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP node.getPlanNodeId(), seriesPath, node.getScanOrder(), - seriesScanOptionsBuilder.build()); + seriesScanOptionsBuilder.build(), + node.isQueryAllSensors()); ((DataDriverContext) context.getDriverContext()).addSourceOperator(seriesScanOperator); ((DataDriverContext) context.getDriverContext()).addPath(seriesPath); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java index 08607d09cff..ec63414d2f7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java @@ -125,6 +125,7 @@ public class PlanGraphPrinter extends PlanVisitor<List<String>, PlanGraphPrinter if (offset > 0) { boxValue.add(String.format("Offset: %s", offset)); } + boxValue.add(String.format("QueryAllSensors: %s", node.isQueryAllSensors())); boxValue.add(printRegion(node.getRegionReplicaSet())); return render(node, boxValue, context); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesScanNode.java index 655aa15c7c4..28cecd5f4e1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesScanNode.java @@ -67,6 +67,10 @@ public class AlignedSeriesScanNode extends SeriesSourceNode { // offset for result set. The default value is 0 private long offset; + // used for limit and offset push down optimizer, if we select all columns from aligned device, we + // can use statistics to skip + private boolean queryAllSensors = false; + // The id of DataRegion where the node will run private TRegionReplicaSet regionReplicaSet; @@ -75,9 +79,11 @@ public class AlignedSeriesScanNode extends SeriesSourceNode { this.alignedPath = alignedPath; } - public AlignedSeriesScanNode(PlanNodeId id, AlignedPath alignedPath, Ordering scanOrder) { + public AlignedSeriesScanNode( + PlanNodeId id, AlignedPath alignedPath, Ordering scanOrder, boolean lastLevelUseWildcard) { this(id, alignedPath); this.scanOrder = scanOrder; + this.queryAllSensors = lastLevelUseWildcard; } public AlignedSeriesScanNode( @@ -88,8 +94,9 @@ public class AlignedSeriesScanNode extends SeriesSourceNode { @Nullable Filter valueFilter, long limit, long offset, - TRegionReplicaSet dataRegionReplicaSet) { - this(id, alignedPath, scanOrder); + TRegionReplicaSet dataRegionReplicaSet, + boolean lastLevelUseWildcard) { + this(id, alignedPath, scanOrder, lastLevelUseWildcard); this.timeFilter = timeFilter; this.valueFilter = valueFilter; this.limit = limit; @@ -169,6 +176,10 @@ public class AlignedSeriesScanNode extends SeriesSourceNode { return NO_CHILD_ALLOWED; } + public boolean isQueryAllSensors() { + return queryAllSensors; + } + @Override public void addChild(PlanNode child) { throw new UnsupportedOperationException("no child is allowed for AlignedSeriesScanNode"); @@ -184,7 +195,8 @@ public class AlignedSeriesScanNode extends SeriesSourceNode { getValueFilter(), getLimit(), getOffset(), - this.regionReplicaSet); + this.regionReplicaSet, + this.queryAllSensors); } @Override @@ -221,6 +233,7 @@ public class AlignedSeriesScanNode extends SeriesSourceNode { } ReadWriteIOUtils.write(limit, byteBuffer); ReadWriteIOUtils.write(offset, byteBuffer); + ReadWriteIOUtils.write(queryAllSensors, byteBuffer); } @Override @@ -242,6 +255,7 @@ public class AlignedSeriesScanNode extends SeriesSourceNode { } ReadWriteIOUtils.write(limit, stream); ReadWriteIOUtils.write(offset, stream); + ReadWriteIOUtils.write(queryAllSensors, stream); } public static AlignedSeriesScanNode deserialize(ByteBuffer byteBuffer) { @@ -259,9 +273,18 @@ public class AlignedSeriesScanNode extends SeriesSourceNode { } long limit = ReadWriteIOUtils.readLong(byteBuffer); long offset = ReadWriteIOUtils.readLong(byteBuffer); + boolean queryAllSensors = ReadWriteIOUtils.readBool(byteBuffer); PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); return new AlignedSeriesScanNode( - planNodeId, alignedPath, scanOrder, timeFilter, valueFilter, limit, offset, null); + planNodeId, + alignedPath, + scanOrder, + timeFilter, + valueFilter, + limit, + offset, + null, + queryAllSensors); } @Override @@ -282,6 +305,7 @@ public class AlignedSeriesScanNode extends SeriesSourceNode { && scanOrder == that.scanOrder && Objects.equals(timeFilter, that.timeFilter) && Objects.equals(valueFilter, that.valueFilter) + && Objects.equals(queryAllSensors, that.queryAllSensors) && Objects.equals(regionReplicaSet, that.regionReplicaSet); } @@ -295,7 +319,8 @@ public class AlignedSeriesScanNode extends SeriesSourceNode { valueFilter, limit, offset, - regionReplicaSet); + regionReplicaSet, + queryAllSensors); } public String toString() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/QueryStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/QueryStatement.java index 973414be8e1..8179eda9fbd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/QueryStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/QueryStatement.java @@ -114,6 +114,10 @@ public class QueryStatement extends Statement { private boolean useWildcard = true; + // used for limit and offset push down optimizer, if we select all columns from aligned device, we + // can use statistics to skip + private boolean lastLevelUseWildcard = false; + public QueryStatement() { this.statementType = StatementType.QUERY; } @@ -465,6 +469,14 @@ public class QueryStatement extends Statement { return useWildcard; } + public boolean isLastLevelUseWildcard() { + return lastLevelUseWildcard; + } + + public void setLastLevelUseWildcard(boolean lastLevelUseWildcard) { + this.lastLevelUseWildcard = lastLevelUseWildcard; + } + public static final String RAW_AGGREGATION_HYBRID_QUERY_ERROR_MSG = "Raw data and aggregation hybrid query is not supported."; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java index 04d9a2b367b..cf899441d49 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java @@ -45,6 +45,12 @@ public class AlignedReadOnlyMemChunk extends ReadOnlyMemChunk { private final List<TSDataType> dataTypes; + // only used for limit and offset push down optimizer, if we select all columns from aligned + // device, we + // can use statistics to skip. + // it's only exact while using limit & offset push down + private boolean queryAllSensors; + /** * The constructor for Aligned type. * @@ -146,7 +152,7 @@ public class AlignedReadOnlyMemChunk extends ReadOnlyMemChunk { } IChunkMetadata alignedChunkMetadata = new AlignedChunkMetadata(timeChunkMetadata, valueChunkMetadataList); - alignedChunkMetadata.setChunkLoader(new MemAlignedChunkLoader(this)); + alignedChunkMetadata.setChunkLoader(new MemAlignedChunkLoader(this, queryAllSensors)); alignedChunkMetadata.setVersion(Long.MAX_VALUE); cachedMetaData = alignedChunkMetadata; } @@ -160,4 +166,8 @@ public class AlignedReadOnlyMemChunk extends ReadOnlyMemChunk { public IPointReader getPointReader() { return tsBlock.getTsBlockAlignedRowIterator(); } + + public void setQueryAllSensors(boolean queryAllSensors) { + this.queryAllSensors = queryAllSensors; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/DiskAlignedChunkLoader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/DiskAlignedChunkLoader.java index 18907c22b3e..5b0165b2f7c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/DiskAlignedChunkLoader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/DiskAlignedChunkLoader.java @@ -40,11 +40,19 @@ import static org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet.INI public class DiskAlignedChunkLoader implements IChunkLoader { private final boolean debug; + + // only used for limit and offset push down optimizer, if we select all columns from aligned + // device, we + // can use statistics to skip. + // it's only exact while using limit & offset push down + private final boolean queryAllSensors; + private static final SeriesScanCostMetricSet SERIES_SCAN_COST_METRIC_SET = SeriesScanCostMetricSet.getInstance(); - public DiskAlignedChunkLoader(boolean debug) { + public DiskAlignedChunkLoader(boolean debug, boolean queryAllSensors) { this.debug = debug; + this.queryAllSensors = queryAllSensors; } @Override @@ -75,7 +83,8 @@ public class DiskAlignedChunkLoader implements IChunkLoader { } long t2 = System.nanoTime(); - IChunkReader chunkReader = new AlignedChunkReader(timeChunk, valueChunkList, timeFilter); + IChunkReader chunkReader = + new AlignedChunkReader(timeChunk, valueChunkList, timeFilter, queryAllSensors); SERIES_SCAN_COST_METRIC_SET.recordSeriesScanCost( INIT_CHUNK_READER_ALIGNED_DISK, System.nanoTime() - t2); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/MemAlignedChunkLoader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/MemAlignedChunkLoader.java index a93a25bb7b1..63feb2220f1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/MemAlignedChunkLoader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/MemAlignedChunkLoader.java @@ -36,11 +36,18 @@ public class MemAlignedChunkLoader implements IChunkLoader { private final AlignedReadOnlyMemChunk chunk; + // only used for limit and offset push down optimizer, if we select all columns from aligned + // device, we + // can use statistics to skip. + // it's only exact while using limit & offset push down + private final boolean queryAllSensors; + private static final SeriesScanCostMetricSet SERIES_SCAN_COST_METRIC_SET = SeriesScanCostMetricSet.getInstance(); - public MemAlignedChunkLoader(AlignedReadOnlyMemChunk chunk) { + public MemAlignedChunkLoader(AlignedReadOnlyMemChunk chunk, boolean queryAllSensors) { this.chunk = chunk; + this.queryAllSensors = queryAllSensors; } @Override @@ -57,7 +64,7 @@ public class MemAlignedChunkLoader implements IChunkLoader { public IChunkReader getChunkReader(IChunkMetadata chunkMetaData, Filter timeFilter) { long startTime = System.nanoTime(); try { - return new MemAlignedChunkReader(chunk, timeFilter); + return new MemAlignedChunkReader(chunk, timeFilter, queryAllSensors); } finally { long duration = System.nanoTime() - startTime; SERIES_SCAN_COST_METRIC_SET.recordSeriesScanCost( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/MemAlignedChunkReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/MemAlignedChunkReader.java index b7ff4cc89ed..66f36fea0b2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/MemAlignedChunkReader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/MemAlignedChunkReader.java @@ -33,16 +33,18 @@ import java.util.List; /** To read aligned chunk data in memory. */ public class MemAlignedChunkReader implements IChunkReader { - private List<IPageReader> pageReaderList; + private final List<IPageReader> pageReaderList; - public MemAlignedChunkReader(AlignedReadOnlyMemChunk readableChunk, Filter filter) { + public MemAlignedChunkReader( + AlignedReadOnlyMemChunk readableChunk, Filter filter, boolean queryAllSensors) { // we treat one ReadOnlyMemChunk as one Page this.pageReaderList = Collections.singletonList( new MemAlignedPageReader( readableChunk.getTsBlock(), (AlignedChunkMetadata) readableChunk.getChunkMetaData(), - filter)); + filter, + queryAllSensors)); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/MemAlignedPageReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/MemAlignedPageReader.java index b1bba47f129..3f305da610d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/MemAlignedPageReader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/MemAlignedPageReader.java @@ -46,15 +46,23 @@ public class MemAlignedPageReader implements IPageReader, IAlignedPageReader { private final TsBlock tsBlock; private final AlignedChunkMetadata chunkMetadata; + // only used for limit and offset push down optimizer, if we select all columns from aligned + // device, we + // can use statistics to skip. + // it's only exact while using limit & offset push down + private final boolean queryAllSensors; + private Filter valueFilter; private PaginationController paginationController = UNLIMITED_PAGINATION_CONTROLLER; private TsBlockBuilder builder; - public MemAlignedPageReader(TsBlock tsBlock, AlignedChunkMetadata chunkMetadata, Filter filter) { + public MemAlignedPageReader( + TsBlock tsBlock, AlignedChunkMetadata chunkMetadata, Filter filter, boolean queryAllSensors) { this.tsBlock = tsBlock; this.chunkMetadata = chunkMetadata; this.valueFilter = filter; + this.queryAllSensors = queryAllSensors; } @Override @@ -101,17 +109,24 @@ public class MemAlignedPageReader implements IPageReader, IAlignedPageReader { private boolean pageSatisfy() { Statistics<? extends Serializable> statistics = getStatistics(); if (valueFilter == null || valueFilter.allSatisfy(statistics)) { - // For aligned series, When we only read some measurements under an aligned device, if the - // values of these queried measurements at a timestamp are all null, the timestamp will not be - // selected. - // NOTE: if we change the read semantic in the future for aligned series, we need to remove + // For aligned series, When we only query some measurements under an aligned device, if any + // values of these queried measurements has the same value count as the time column, the + // timestamp will be selected. + // NOTE: if we change the query semantic in the future for aligned series, we need to remove // this check here. long rowCount = getTimeStatistics().getCount(); - for (Statistics<? extends Serializable> vStatistics : getValueStatisticsList()) { - if (vStatistics == null || vStatistics.hasNullValue(rowCount)) { - return true; + boolean canUse = queryAllSensors || getValueStatisticsList().isEmpty(); + if (!canUse) { + for (Statistics<? extends Serializable> vStatistics : getValueStatisticsList()) { + if (vStatistics != null && !vStatistics.hasNullValue(rowCount)) { + canUse = true; + break; + } } } + if (!canUse) { + return true; + } // When the number of points in all value pages is the same as that in the time page, it means // that there is no null value, and all timestamps will be selected. if (paginationController.hasCurOffset(rowCount)) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/metadata/DiskAlignedChunkMetadataLoader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/metadata/DiskAlignedChunkMetadataLoader.java index 96326fb9443..95dbfd6e4e2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/metadata/DiskAlignedChunkMetadataLoader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/metadata/DiskAlignedChunkMetadataLoader.java @@ -51,16 +51,27 @@ public class DiskAlignedChunkMetadataLoader implements IChunkMetadataLoader { // time filter or value filter, only used to check time range private final Filter filter; + // only used for limit and offset push down optimizer, if we select all columns from aligned + // device, we + // can use statistics to skip. + // it's only exact while using limit & offset push down + private final boolean queryAllSensors; + private static final Logger DEBUG_LOGGER = LoggerFactory.getLogger("QUERY_DEBUG"); private static final SeriesScanCostMetricSet SERIES_SCAN_COST_METRIC_SET = SeriesScanCostMetricSet.getInstance(); public DiskAlignedChunkMetadataLoader( - TsFileResource resource, AlignedPath seriesPath, QueryContext context, Filter filter) { + TsFileResource resource, + AlignedPath seriesPath, + QueryContext context, + Filter filter, + boolean queryAllSensors) { this.resource = resource; this.seriesPath = seriesPath; this.context = context; this.filter = filter; + this.queryAllSensors = queryAllSensors; } @Override @@ -111,7 +122,8 @@ public class DiskAlignedChunkMetadataLoader implements IChunkMetadataLoader { if (chunkMetadata.needSetChunkLoader()) { chunkMetadata.setFilePath(resource.getTsFilePath()); chunkMetadata.setClosed(resource.isClosed()); - chunkMetadata.setChunkLoader(new DiskAlignedChunkLoader(context.isDebug())); + chunkMetadata.setChunkLoader( + new DiskAlignedChunkLoader(context.isDebug(), queryAllSensors)); } }); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/metadata/MemAlignedChunkMetadataLoader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/metadata/MemAlignedChunkMetadataLoader.java index 90bbc3136ad..17bd551c7ab 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/metadata/MemAlignedChunkMetadataLoader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/metadata/MemAlignedChunkMetadataLoader.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.storageengine.dataregion.read.reader.chunk.metadata; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext; import org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet; +import org.apache.iotdb.db.storageengine.dataregion.memtable.AlignedReadOnlyMemChunk; import org.apache.iotdb.db.storageengine.dataregion.memtable.ReadOnlyMemChunk; import org.apache.iotdb.db.storageengine.dataregion.read.reader.chunk.DiskAlignedChunkLoader; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; @@ -41,16 +42,26 @@ public class MemAlignedChunkMetadataLoader implements IChunkMetadataLoader { private final PartialPath seriesPath; private final QueryContext context; private final Filter timeFilter; + // only used for limit and offset push down optimizer, if we select all columns from aligned + // device, we + // can use statistics to skip. + // it's only exact while using limit & offset push down + private final boolean queryAllSensors; private static final SeriesScanCostMetricSet SERIES_SCAN_COST_METRIC_SET = SeriesScanCostMetricSet.getInstance(); public MemAlignedChunkMetadataLoader( - TsFileResource resource, PartialPath seriesPath, QueryContext context, Filter timeFilter) { + TsFileResource resource, + PartialPath seriesPath, + QueryContext context, + Filter timeFilter, + boolean queryAllSensors) { this.resource = resource; this.seriesPath = seriesPath; this.context = context; this.timeFilter = timeFilter; + this.queryAllSensors = queryAllSensors; } @Override @@ -66,7 +77,8 @@ public class MemAlignedChunkMetadataLoader implements IChunkMetadataLoader { if (chunkMetadata.needSetChunkLoader()) { chunkMetadata.setFilePath(resource.getTsFilePath()); chunkMetadata.setClosed(resource.isClosed()); - chunkMetadata.setChunkLoader(new DiskAlignedChunkLoader(context.isDebug())); + chunkMetadata.setChunkLoader( + new DiskAlignedChunkLoader(context.isDebug(), queryAllSensors)); } }); @@ -76,7 +88,8 @@ public class MemAlignedChunkMetadataLoader implements IChunkMetadataLoader { List<ReadOnlyMemChunk> memChunks = resource.getReadOnlyMemChunk(seriesPath); if (memChunks != null) { for (ReadOnlyMemChunk readOnlyMemChunk : memChunks) { - if (!memChunks.isEmpty()) { + if (!readOnlyMemChunk.isEmpty()) { + ((AlignedReadOnlyMemChunk) readOnlyMemChunk).setQueryAllSensors(queryAllSensors); chunkMetadataList.add(readOnlyMemChunk.getChunkMetaData()); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java index 2f243b2b508..55d9f04e30e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java @@ -201,7 +201,11 @@ public class FileLoaderUtils { * @throws IOException IOException may be thrown while reading it from disk. */ public static AlignedTimeSeriesMetadata loadTimeSeriesMetadata( - TsFileResource resource, AlignedPath alignedPath, QueryContext context, Filter filter) + TsFileResource resource, + AlignedPath alignedPath, + QueryContext context, + Filter filter, + boolean queryAllSensors) throws IOException { final long t1 = System.nanoTime(); boolean loadFromMem = false; @@ -209,14 +213,16 @@ public class FileLoaderUtils { AlignedTimeSeriesMetadata alignedTimeSeriesMetadata; // If the tsfile is closed, we need to load from tsfile if (resource.isClosed()) { - alignedTimeSeriesMetadata = loadFromDisk(resource, alignedPath, context, filter); + alignedTimeSeriesMetadata = + loadFromDisk(resource, alignedPath, context, filter, queryAllSensors); } else { // if the tsfile is unclosed, we just get it directly from TsFileResource loadFromMem = true; alignedTimeSeriesMetadata = (AlignedTimeSeriesMetadata) resource.getTimeSeriesMetadata(alignedPath); if (alignedTimeSeriesMetadata != null) { alignedTimeSeriesMetadata.setChunkMetadataLoader( - new MemAlignedChunkMetadataLoader(resource, alignedPath, context, filter)); + new MemAlignedChunkMetadataLoader( + resource, alignedPath, context, filter, queryAllSensors)); } } @@ -252,7 +258,11 @@ public class FileLoaderUtils { } private static AlignedTimeSeriesMetadata loadFromDisk( - TsFileResource resource, AlignedPath alignedPath, QueryContext context, Filter filter) + TsFileResource resource, + AlignedPath alignedPath, + QueryContext context, + Filter filter, + boolean queryAllSensors) throws IOException { AlignedTimeSeriesMetadata alignedTimeSeriesMetadata = null; // load all the TimeseriesMetadata of vector, the first one is for time column and the @@ -293,7 +303,8 @@ public class FileLoaderUtils { alignedTimeSeriesMetadata = new AlignedTimeSeriesMetadata(timeColumn, valueTimeSeriesMetadataList); alignedTimeSeriesMetadata.setChunkMetadataLoader( - new DiskAlignedChunkMetadataLoader(resource, alignedPath, context, filter)); + new DiskAlignedChunkMetadataLoader( + resource, alignedPath, context, filter, queryAllSensors)); } } return alignedTimeSeriesMetadata; diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesScanOperatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesScanOperatorTest.java index d01f187d40f..bc3dbe0cf90 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesScanOperatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesScanOperatorTest.java @@ -127,7 +127,8 @@ public class AlignedSeriesScanOperatorTest { planNodeId, alignedPath, Ordering.ASC, - getDefaultSeriesScanOptions(alignedPath)); + getDefaultSeriesScanOptions(alignedPath), + false); seriesScanOperator.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); seriesScanOperator .getOperatorContext() @@ -220,7 +221,8 @@ public class AlignedSeriesScanOperatorTest { planNodeId1, alignedPath1, Ordering.ASC, - getDefaultSeriesScanOptions(alignedPath1)); + getDefaultSeriesScanOptions(alignedPath1), + false); seriesScanOperator1.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); seriesScanOperator1 .getOperatorContext() @@ -241,7 +243,8 @@ public class AlignedSeriesScanOperatorTest { planNodeId2, alignedPath2, Ordering.ASC, - getDefaultSeriesScanOptions(alignedPath2)); + getDefaultSeriesScanOptions(alignedPath2), + false); seriesScanOperator2.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); seriesScanOperator2 .getOperatorContext() @@ -509,7 +512,8 @@ public class AlignedSeriesScanOperatorTest { planNodeId1, alignedPath1, Ordering.DESC, - getDefaultSeriesScanOptions(alignedPath1)); + getDefaultSeriesScanOptions(alignedPath1), + false); seriesScanOperator1.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); seriesScanOperator1 .getOperatorContext() @@ -530,7 +534,8 @@ public class AlignedSeriesScanOperatorTest { planNodeId2, alignedPath2, Ordering.DESC, - getDefaultSeriesScanOptions(alignedPath2)); + getDefaultSeriesScanOptions(alignedPath2), + false); seriesScanOperator2.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); seriesScanOperator2 .getOperatorContext() diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java index ba76afd9f58..ad3065bc6ee 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java @@ -191,7 +191,8 @@ public class OperatorMemoryTest { planNodeId, alignedPath, Ordering.ASC, - SeriesScanOptions.getDefaultSeriesScanOptions(alignedPath)); + SeriesScanOptions.getDefaultSeriesScanOptions(alignedPath), + false); long maxPeekMemory = Math.max( diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/QueryLogicalPlanUtil.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/QueryLogicalPlanUtil.java index 0b2ea440c5c..b36feab3cff 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/QueryLogicalPlanUtil.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/QueryLogicalPlanUtil.java @@ -172,7 +172,10 @@ public class QueryLogicalPlanUtil { Ordering.ASC)); sourceNodeList.add( new AlignedSeriesScanNode( - queryId.genPlanNodeId(), (AlignedPath) schemaMap.get("root.sg.d2.a"), Ordering.ASC)); + queryId.genPlanNodeId(), + (AlignedPath) schemaMap.get("root.sg.d2.a"), + Ordering.ASC, + false)); for (PlanNode sourceNode : sourceNodeList) { if (sourceNode instanceof SeriesScanNode) { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/MemAlignedChunkLoaderTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/MemAlignedChunkLoaderTest.java index e6f4b4475e6..8e1ba4f1bbf 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/MemAlignedChunkLoaderTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/MemAlignedChunkLoaderTest.java @@ -52,7 +52,7 @@ public class MemAlignedChunkLoaderTest { AlignedReadOnlyMemChunk chunk = Mockito.mock(AlignedReadOnlyMemChunk.class); ChunkMetadata chunkMetadata = Mockito.mock(ChunkMetadata.class); - MemAlignedChunkLoader memAlignedChunkLoader = new MemAlignedChunkLoader(chunk); + MemAlignedChunkLoader memAlignedChunkLoader = new MemAlignedChunkLoader(chunk, false); try { memAlignedChunkLoader.loadChunk(chunkMetadata); fail(); diff --git a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileAlignedSeriesReaderIterator.java b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileAlignedSeriesReaderIterator.java index 2113a523fcd..f150bfd19e4 100644 --- a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileAlignedSeriesReaderIterator.java +++ b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileAlignedSeriesReaderIterator.java @@ -82,7 +82,7 @@ public class TsFileAlignedSeriesReaderIterator { } AlignedChunkReader chunkReader = - new AlignedChunkReader(timeChunk, Arrays.asList(valueChunks), null); + new AlignedChunkReader(timeChunk, Arrays.asList(valueChunks), null, false); return new NextAlignedChunkInfo(chunkReader, totalSize, notNullChunkNum, totalPointNum); } diff --git a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/AlignedChunkReader.java b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/AlignedChunkReader.java index 86bd53f5616..d2751df96c9 100644 --- a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/AlignedChunkReader.java +++ b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/AlignedChunkReader.java @@ -92,7 +92,8 @@ public class AlignedChunkReader implements IChunkReader { * * @param filter filter */ - public AlignedChunkReader(Chunk timeChunk, List<Chunk> valueChunkList, Filter filter) + public AlignedChunkReader( + Chunk timeChunk, List<Chunk> valueChunkList, Filter filter, boolean queryAllSensors) throws IOException { this.filter = filter; this.timeChunkDataBuffer = timeChunk.getData(); @@ -108,7 +109,7 @@ public class AlignedChunkReader implements IChunkReader { valueChunkStatisticsList.add(chunk == null ? null : chunk.getChunkStatistic()); valueDeleteIntervalList.add(chunk == null ? null : chunk.getDeleteIntervalList()); }); - initAllPageReaders(timeChunk.getChunkStatistic(), valueChunkStatisticsList); + initAllPageReaders(timeChunk.getChunkStatistic(), valueChunkStatisticsList, queryAllSensors); } /** @@ -132,12 +133,14 @@ public class AlignedChunkReader implements IChunkReader { valueChunkStatisticsList.add(chunk == null ? null : chunk.getChunkStatistic()); valueDeleteIntervalList.add(chunk == null ? null : chunk.getDeleteIntervalList()); }); - initAllPageReaders(timeChunk.getChunkStatistic(), valueChunkStatisticsList); + initAllPageReaders(timeChunk.getChunkStatistic(), valueChunkStatisticsList, false); } /** construct all the page readers in this chunk */ private void initAllPageReaders( - Statistics timeChunkStatistics, List<Statistics> valueChunkStatisticsList) + Statistics timeChunkStatistics, + List<Statistics> valueChunkStatisticsList, + boolean queryAllSensors) throws IOException { // construct next satisfied page header while (timeChunkDataBuffer.remaining() > 0) { @@ -176,7 +179,7 @@ public class AlignedChunkReader implements IChunkReader { // if the current page satisfies if (exits && timePageSatisfied(timePageHeader)) { AlignedPageReader alignedPageReader = - constructPageReaderForNextPage(timePageHeader, valuePageHeaderList); + constructPageReaderForNextPage(timePageHeader, valuePageHeaderList, queryAllSensors); if (alignedPageReader != null) { pageReaderList.add(alignedPageReader); } @@ -213,7 +216,8 @@ public class AlignedChunkReader implements IChunkReader { } private AlignedPageReader constructPageReaderForNextPage( - PageHeader timePageHeader, List<PageHeader> valuePageHeader) throws IOException { + PageHeader timePageHeader, List<PageHeader> valuePageHeader, boolean queryAllSensors) + throws IOException { PageInfo timePageInfo = new PageInfo(); getPageInfo(timePageHeader, timeChunkDataBuffer, timeChunkHeader, timePageInfo); PageInfo valuePageInfo = new PageInfo(); @@ -266,7 +270,8 @@ public class AlignedChunkReader implements IChunkReader { valuePageDataList, valueDataTypeList, valueDecoderList, - filter); + filter, + queryAllSensors); alignedPageReader.setDeleteIntervalList(valueDeleteIntervalList); return alignedPageReader; } @@ -314,7 +319,8 @@ public class AlignedChunkReader implements IChunkReader { uncompressedValuePageDatas, valueTypes, valueDecoders, - null); + null, + false); alignedPageReader.initTsBlockBuilder(valueTypes); alignedPageReader.setDeleteIntervalList(valueDeleteIntervalList); return alignedPageReader.getAllSatisfiedData(); diff --git a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java index 0a93e92f423..8aaa3484f83 100644 --- a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java +++ b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java @@ -50,6 +50,12 @@ public class AlignedPageReader implements IPageReader, IAlignedPageReader { private final List<ValuePageReader> valuePageReaderList; private final int valueCount; + // only used for limit and offset push down optimizer, if we select all columns from aligned + // device, we + // can use statistics to skip. + // it's only exact while using limit & offset push down + private final boolean queryAllSensors; + private Filter filter; private PaginationController paginationController = UNLIMITED_PAGINATION_CONTROLLER; @@ -67,7 +73,8 @@ public class AlignedPageReader implements IPageReader, IAlignedPageReader { List<ByteBuffer> valuePageDataList, List<TSDataType> valueDataTypeList, List<Decoder> valueDecoderList, - Filter filter) { + Filter filter, + boolean queryAllSensors) { timePageReader = new TimePageReader(timePageHeader, timePageData, timeDecoder); isModified = timePageReader.isModified(); valuePageReaderList = new ArrayList<>(valuePageHeaderList.size()); @@ -87,6 +94,7 @@ public class AlignedPageReader implements IPageReader, IAlignedPageReader { } this.filter = filter; this.valueCount = valuePageReaderList.size(); + this.queryAllSensors = queryAllSensors; } @Override @@ -120,17 +128,24 @@ public class AlignedPageReader implements IPageReader, IAlignedPageReader { private boolean pageSatisfy() { Statistics statistics = getStatistics(); if (filter == null || filter.allSatisfy(statistics)) { - // For aligned series, When we only query some measurements under an aligned device, if the - // values of these queried measurements at a timestamp are all null, the timestamp will not be - // selected. + // For aligned series, When we only query some measurements under an aligned device, if any + // values of these queried measurements has the same value count as the time column, the + // timestamp will be selected. // NOTE: if we change the query semantic in the future for aligned series, we need to remove // this check here. long rowCount = getTimeStatistics().getCount(); - for (Statistics vStatistics : getValueStatisticsList()) { - if (vStatistics == null || vStatistics.hasNullValue(rowCount)) { - return true; + boolean canUse = queryAllSensors || getValueStatisticsList().isEmpty(); + if (!canUse) { + for (Statistics vStatistics : getValueStatisticsList()) { + if (vStatistics != null && !vStatistics.hasNullValue(rowCount)) { + canUse = true; + break; + } } } + if (!canUse) { + return true; + } // When the number of points in all value pages is the same as that in the time page, it means // that there is no null value, and all timestamps will be selected. if (paginationController.hasCurOffset(rowCount)) { diff --git a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReader.java b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReader.java index 222cc59a5c9..2e1e5090bd8 100644 --- a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReader.java +++ b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReader.java @@ -56,7 +56,7 @@ public class FileSeriesReader extends AbstractFileSeriesReader { for (IChunkMetadata metadata : alignedChunkMetadata.getValueChunkMetadataList()) { valueChunkList.add(chunkLoader.loadChunk((ChunkMetadata) metadata)); } - this.chunkReader = new AlignedChunkReader(timeChunk, valueChunkList, filter); + this.chunkReader = new AlignedChunkReader(timeChunk, valueChunkList, filter, false); } } diff --git a/iotdb-core/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIntegrityCheckingTool.java b/iotdb-core/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIntegrityCheckingTool.java index be55f4a0449..c753508b8f0 100644 --- a/iotdb-core/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIntegrityCheckingTool.java +++ b/iotdb-core/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIntegrityCheckingTool.java @@ -224,7 +224,8 @@ public class TsFileIntegrityCheckingTool { reader.readMemChunk((ChunkMetadata) valueChunkMetadataList.get(chunkIdx)); // construct an aligned chunk reader using time chunk and value chunk IChunkReader chunkReader = - new AlignedChunkReader(timeChunk, Collections.singletonList(valueChunk), null); + new AlignedChunkReader( + timeChunk, Collections.singletonList(valueChunk), null, false); // verify the values List<Pair<Long, TsPrimitiveType>> originValue = originDataChunks.get(chunkIdx); for (int valIdx = 0; chunkReader.hasNextSatisfiedPage(); ) {
