This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch lmh/refactorFilter in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8e84bf8011263eb392004ed8cde6e1ee9f8a064d Author: Minghui Liu <[email protected]> AuthorDate: Sun Nov 19 21:07:32 2023 +0800 rename node --- .../queryengine/plan/analyze/PredicateUtils.java | 6 ++ .../visitor/predicate/ReversePredicateVisitor.java | 12 +-- .../predicate/SchemaCompatibilityValidator.java | 43 +++++----- .../plan/optimization/LimitOffsetPushDown.java | 8 +- .../plan/planner/OperatorTreeGenerator.java | 91 ++++++++-------------- .../plan/planner/plan/node/PlanGraphPrinter.java | 4 +- .../plan/node/source/AlignedLastQueryScanNode.java | 2 +- .../plan/node/source/AlignedSeriesScanNode.java | 61 ++++++++------- .../plan/node/source/LastQueryScanNode.java | 2 +- .../plan/node/source/LastSeriesSourceNode.java | 7 ++ .../node/source/SeriesAggregationSourceNode.java | 1 + .../planner/plan/node/source/SeriesScanNode.java | 63 +++++++-------- .../planner/plan/node/source/SeriesSourceNode.java | 3 + .../planner/plan/parameter/SeriesScanOptions.java | 36 ++++----- .../plan/optimization/TestPlanBuilder.java | 8 +- .../AlignByDeviceOrderByLimitOffsetTest.java | 4 +- .../AlignedSeriesScanLimitOffsetPushDownTest.java | 4 +- .../series/SeriesScanLimitOffsetPushDownTest.java | 4 +- 18 files changed, 170 insertions(+), 189 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/PredicateUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/PredicateUtils.java index 06230f7cdc3..6b72c27e9a7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/PredicateUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/PredicateUtils.java @@ -19,8 +19,10 @@ package org.apache.iotdb.db.queryengine.plan.analyze; +import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.expression.visitor.predicate.ConvertExpressionToFilterVisitor; +import org.apache.iotdb.db.queryengine.plan.expression.visitor.predicate.SchemaCompatibilityValidator; import org.apache.iotdb.tsfile.read.filter.basic.Filter; public class PredicateUtils { @@ -29,6 +31,10 @@ public class PredicateUtils { // util class } + public static void validateSchemaCompatibility(Expression predicate, PartialPath path) { + SchemaCompatibilityValidator.validate(predicate, path); + } + public static Filter convertPredicateToFilter(Expression predicate, TypeProvider typeProvider) { return predicate.accept(new ConvertExpressionToFilterVisitor(), typeProvider); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/predicate/ReversePredicateVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/predicate/ReversePredicateVisitor.java index 6718f5ff9d0..0188a247d57 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/predicate/ReversePredicateVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/predicate/ReversePredicateVisitor.java @@ -50,12 +50,6 @@ public class ReversePredicateVisitor extends PredicateVisitor<Expression, Void> inExpression.getExpression(), !inExpression.isNotIn(), inExpression.getValues()); } - @Override - public Expression visitGroupByTimeExpression( - GroupByTimeExpression groupByTimeExpression, Void context) { - throw new UnsupportedOperationException("GROUP BY TIME doesn't support reverse predicate"); - } - @Override public Expression visitIsNullExpression(IsNullExpression isNullExpression, Void context) { return new IsNullExpression(isNullExpression.getExpression(), !isNullExpression.isNot()); @@ -145,4 +139,10 @@ public class ReversePredicateVisitor extends PredicateVisitor<Expression, Void> betweenExpression.getThirdExpression(), !betweenExpression.isNotBetween()); } + + @Override + public Expression visitGroupByTimeExpression( + GroupByTimeExpression groupByTimeExpression, Void context) { + throw new UnsupportedOperationException("GROUP BY TIME cannot be reversed"); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/predicate/SchemaCompatibilityValidator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/predicate/SchemaCompatibilityValidator.java index d4fe7424896..57c8d0c9f0e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/predicate/SchemaCompatibilityValidator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/predicate/SchemaCompatibilityValidator.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.queryengine.plan.expression.visitor.predicate; +import org.apache.iotdb.commons.path.AlignedPath; import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.queryengine.plan.expression.Expression; @@ -44,35 +45,29 @@ import org.apache.iotdb.db.queryengine.plan.expression.unary.RegularExpression; public class SchemaCompatibilityValidator extends PredicateVisitor<Void, SchemaCompatibilityValidator.Context> { - public static void validate(Expression predicate) { - predicate.accept(new SchemaCompatibilityValidator(), new Context()); + public static void validate(Expression predicate, PartialPath path) { + predicate.accept(new SchemaCompatibilityValidator(), new Context(path)); } protected static class Context { - private PartialPath checkedPath; - private boolean isAligned; + private final PartialPath checkedPath; + private final boolean isAligned; - public Context() { - // needn't init + public Context(PartialPath path) { + this.isAligned = path instanceof AlignedPath; + this.checkedPath = this.isAligned ? path.getDevicePath() : path; } public void check(MeasurementPath path) { - if (checkedPath == null) { - // init checkedPath - isAligned = path.isUnderAlignedEntity(); - checkedPath = getComparePath(path); - } else { - if (isAligned != path.isUnderAlignedEntity() || !checkedPath.equals(getComparePath(path))) { - throw new IllegalArgumentException( - "The paths in the predicate are not compatible with each other."); - } + if (this.isAligned != path.isUnderAlignedEntity() || !this.checkedPath.equals(getComparePath(path))) { + throw new IllegalArgumentException( + "The paths in the predicate are not compatible with each other."); } } private PartialPath getComparePath(MeasurementPath path) { - if (path.isUnderAlignedEntity()) return path.getDevicePath(); - return path; + return path.isUnderAlignedEntity() ? path.getDevicePath() : path; } } @@ -94,13 +89,6 @@ public class SchemaCompatibilityValidator return null; } - @Override - public Void visitGroupByTimeExpression( - GroupByTimeExpression groupByTimeExpression, Context context) { - // needn't check - return null; - } - @Override public Void visitInExpression(InExpression inExpression, Context context) { return inExpression.getExpression().accept(this, context); @@ -191,4 +179,11 @@ public class SchemaCompatibilityValidator betweenExpression.getThirdExpression().accept(this, context); return null; } + + @Override + public Void visitGroupByTimeExpression( + GroupByTimeExpression groupByTimeExpression, Context context) { + // needn't check + return null; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/LimitOffsetPushDown.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/LimitOffsetPushDown.java index c4d6815c904..f74755bbb4d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/LimitOffsetPushDown.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/LimitOffsetPushDown.java @@ -180,8 +180,8 @@ public class LimitOffsetPushDown implements PlanOptimizer { @Override public PlanNode visitSeriesScan(SeriesScanNode node, RewriterContext context) { if (context.isEnablePushDown()) { - node.setLimit(context.getLimit()); - node.setOffset(context.getOffset()); + node.setPushDownLimit(context.getLimit()); + node.setPushDownOffset(context.getOffset()); } return node; } @@ -189,8 +189,8 @@ public class LimitOffsetPushDown implements PlanOptimizer { @Override public PlanNode visitAlignedSeriesScan(AlignedSeriesScanNode node, RewriterContext context) { if (context.isEnablePushDown()) { - node.setLimit(context.getLimit()); - node.setOffset(context.getOffset()); + node.setPushDownLimit(context.getLimit()); + node.setPushDownOffset(context.getOffset()); } return node; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java index 5a76dca686d..d38a4c0e2cb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java @@ -196,6 +196,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeri import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesSourceNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowQueriesNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.CrossSeriesAggregationDescriptor; @@ -289,24 +290,13 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP @Override public Operator visitSeriesScan(SeriesScanNode node, LocalExecutionPlanContext context) { - - SeriesScanOptions.Builder seriesScanOptionsBuilder = new SeriesScanOptions.Builder(); - Filter globalTimeFilter = context.getGlobalTimeFilter(); - seriesScanOptionsBuilder.withGlobalTimeFilter(globalTimeFilter); - - Filter pushDownFilter = null; - Expression pushDownPredicate = node.getPushDownPredicate(); - if (pushDownPredicate != null) { - pushDownFilter = - PredicateUtils.convertPredicateToFilter(pushDownPredicate, context.getTypeProvider()); - } - seriesScanOptionsBuilder.withPushDownFilter(pushDownFilter); - PartialPath seriesPath = node.getSeriesPath(); - seriesScanOptionsBuilder.withAllSensors( + + SeriesScanOptions.Builder scanOptionsBuilder = getSeriesScanOptionsBuilder(node, context); + scanOptionsBuilder.withAllSensors( context.getAllSensors(seriesPath.getDevice(), seriesPath.getMeasurement())); - seriesScanOptionsBuilder.withLimit(node.getLimit()); - seriesScanOptionsBuilder.withOffset(node.getOffset()); + scanOptionsBuilder.withPushDownLimit(node.getPushDownLimit()); + scanOptionsBuilder.withPushDownOffset(node.getPushDownOffset()); OperatorContext operatorContext = context @@ -321,7 +311,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP node.getPlanNodeId(), seriesPath, node.getScanOrder(), - seriesScanOptionsBuilder.build()); + scanOptionsBuilder.build()); ((DataDriverContext) context.getDriverContext()).addSourceOperator(seriesScanOperator); ((DataDriverContext) context.getDriverContext()).addPath(seriesPath); @@ -333,23 +323,12 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP @Override public Operator visitAlignedSeriesScan( AlignedSeriesScanNode node, LocalExecutionPlanContext context) { - SeriesScanOptions.Builder seriesScanOptionsBuilder = new SeriesScanOptions.Builder(); - - Filter globalTimeFilter = context.getGlobalTimeFilter(); - seriesScanOptionsBuilder.withGlobalTimeFilter(globalTimeFilter); - - Filter pushDownFilter = null; - Expression pushDownPredicate = node.getPushDownPredicate(); - if (pushDownPredicate != null) { - pushDownFilter = - PredicateUtils.convertPredicateToFilter(pushDownPredicate, context.getTypeProvider()); - } - seriesScanOptionsBuilder.withPushDownFilter(pushDownFilter); - - seriesScanOptionsBuilder.withLimit(node.getLimit()); - seriesScanOptionsBuilder.withOffset(node.getOffset()); AlignedPath seriesPath = node.getAlignedPath(); - seriesScanOptionsBuilder.withAllSensors(new HashSet<>(seriesPath.getMeasurementList())); + + SeriesScanOptions.Builder scanOptionsBuilder = getSeriesScanOptionsBuilder(node, context); + scanOptionsBuilder.withPushDownLimit(node.getPushDownLimit()); + scanOptionsBuilder.withPushDownOffset(node.getPushDownOffset()); + scanOptionsBuilder.withAllSensors(new HashSet<>(seriesPath.getMeasurementList())); OperatorContext operatorContext = context @@ -364,7 +343,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP node.getPlanNodeId(), seriesPath, node.getScanOrder(), - seriesScanOptionsBuilder.build(), + scanOptionsBuilder.build(), node.isQueryAllSensors()); ((DataDriverContext) context.getDriverContext()).addSourceOperator(seriesScanOperator); @@ -402,18 +381,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP AggregationUtil.calculateMaxAggregationResultSize( node.getAggregationDescriptorList(), timeRangeIterator, context.getTypeProvider()); - SeriesScanOptions.Builder scanOptionsBuilder = new SeriesScanOptions.Builder(); - Filter globalTimeFilter = context.getGlobalTimeFilter(); - scanOptionsBuilder.withGlobalTimeFilter(globalTimeFilter); - - Filter pushDownFilter = null; - Expression pushDownPredicate = node.getPushDownPredicate(); - if (pushDownPredicate != null) { - pushDownFilter = - PredicateUtils.convertPredicateToFilter(pushDownPredicate, context.getTypeProvider()); - } - scanOptionsBuilder.withPushDownFilter(pushDownFilter); - + SeriesScanOptions.Builder scanOptionsBuilder = getSeriesScanOptionsBuilder(node, context); scanOptionsBuilder.withAllSensors( context.getAllSensors(seriesPath.getDevice(), seriesPath.getMeasurement())); @@ -498,18 +466,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP AggregationUtil.calculateMaxAggregationResultSize( node.getAggregationDescriptorList(), timeRangeIterator, context.getTypeProvider()); - SeriesScanOptions.Builder scanOptionsBuilder = new SeriesScanOptions.Builder(); - Filter globalTimeFilter = context.getGlobalTimeFilter(); - scanOptionsBuilder.withGlobalTimeFilter(globalTimeFilter); - - Filter pushDownFilter = null; - Expression pushDownPredicate = node.getPushDownPredicate(); - if (pushDownPredicate != null) { - pushDownFilter = - PredicateUtils.convertPredicateToFilter(pushDownPredicate, context.getTypeProvider()); - } - scanOptionsBuilder.withPushDownFilter(pushDownFilter); - + SeriesScanOptions.Builder scanOptionsBuilder = getSeriesScanOptionsBuilder(node, context); scanOptionsBuilder.withAllSensors(new HashSet<>(seriesPath.getMeasurementList())); OperatorContext operatorContext = @@ -539,6 +496,24 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP return seriesAggregationScanOperator; } + private SeriesScanOptions.Builder getSeriesScanOptionsBuilder( + SeriesSourceNode node, LocalExecutionPlanContext context) { + SeriesScanOptions.Builder scanOptionsBuilder = new SeriesScanOptions.Builder(); + + Filter globalTimeFilter = context.getGlobalTimeFilter(); + scanOptionsBuilder.withGlobalTimeFilter(globalTimeFilter); + + Filter pushDownFilter = null; + Expression pushDownPredicate = node.getPushDownPredicate(); + if (pushDownPredicate != null) { + PredicateUtils.validateSchemaCompatibility(pushDownPredicate, node.getPartitionPath()); + pushDownFilter = + PredicateUtils.convertPredicateToFilter(pushDownPredicate, context.getTypeProvider()); + } + scanOptionsBuilder.withPushDownFilter(pushDownFilter); + return scanOptionsBuilder; + } + @Override public Operator visitSchemaQueryOrderByHeat( SchemaQueryOrderByHeatNode node, LocalExecutionPlanContext context) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java index ee6f2525a22..4d516f720ea 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java @@ -99,7 +99,7 @@ public class PlanGraphPrinter extends PlanVisitor<List<String>, PlanGraphPrinter boxValue.add(String.format("SeriesScan-%s", node.getPlanNodeId().getId())); boxValue.add(String.format("Series: %s", node.getSeriesPath())); - long limit = node.getLimit(), offset = node.getOffset(); + long limit = node.getPushDownLimit(), offset = node.getPushDownOffset(); if (limit > 0) { boxValue.add(String.format("Limit: %s", limit)); } @@ -119,7 +119,7 @@ public class PlanGraphPrinter extends PlanVisitor<List<String>, PlanGraphPrinter "Series: %s%s", node.getAlignedPath().getDevice(), node.getAlignedPath().getMeasurementList())); - long limit = node.getLimit(), offset = node.getOffset(); + long limit = node.getPushDownLimit(), offset = node.getPushDownOffset(); if (limit > 0) { boxValue.add(String.format("Limit: %s", limit)); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedLastQueryScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedLastQueryScanNode.java index 496fe275941..4038ae2d29a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedLastQueryScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedLastQueryScanNode.java @@ -222,6 +222,6 @@ public class AlignedLastQueryScanNode extends LastSeriesSourceNode { @Override public PartialPath getPartitionPath() { - return seriesPath; + return getSeriesPath(); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesScanNode.java index 3e22b13a5f4..5787490580e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesScanNode.java @@ -57,11 +57,11 @@ public class AlignedSeriesScanNode extends SeriesSourceNode { // push down predicate for current series, could be null if it doesn't exist @Nullable private Expression pushDownPredicate; - // Limit for result set. The default value is -1, which means no limit - private long limit; + // push down limit for result set. The default value is -1, which means no limit + private long pushDownLimit; - // offset for result set. The default value is 0 - private long offset; + // push down offset for result set. The default value is 0 + private long pushDownOffset; // used for limit and offset push down optimizer, if we select all columns from aligned device, we // can use statistics to skip @@ -86,13 +86,13 @@ public class AlignedSeriesScanNode extends SeriesSourceNode { PlanNodeId id, AlignedPath alignedPath, Ordering scanOrder, - long limit, - long offset, + long pushDownLimit, + long pushDownOffset, TRegionReplicaSet dataRegionReplicaSet, boolean lastLevelUseWildcard) { this(id, alignedPath, scanOrder, lastLevelUseWildcard); - this.limit = limit; - this.offset = offset; + this.pushDownLimit = pushDownLimit; + this.pushDownOffset = pushDownOffset; this.regionReplicaSet = dataRegionReplicaSet; } @@ -101,14 +101,14 @@ public class AlignedSeriesScanNode extends SeriesSourceNode { AlignedPath alignedPath, Ordering scanOrder, @Nullable Expression pushDownPredicate, - long limit, - long offset, + long pushDownLimit, + long pushDownOffset, TRegionReplicaSet dataRegionReplicaSet, boolean lastLevelUseWildcard) { this(id, alignedPath, scanOrder, lastLevelUseWildcard); this.pushDownPredicate = pushDownPredicate; - this.limit = limit; - this.offset = offset; + this.pushDownLimit = pushDownLimit; + this.pushDownOffset = pushDownOffset; this.regionReplicaSet = dataRegionReplicaSet; } @@ -121,6 +121,7 @@ public class AlignedSeriesScanNode extends SeriesSourceNode { } @Nullable + @Override public Expression getPushDownPredicate() { return pushDownPredicate; } @@ -129,20 +130,20 @@ public class AlignedSeriesScanNode extends SeriesSourceNode { this.pushDownPredicate = pushDownPredicate; } - public long getLimit() { - return limit; + public long getPushDownLimit() { + return pushDownLimit; } - public long getOffset() { - return offset; + public long getPushDownOffset() { + return pushDownOffset; } - public void setLimit(long limit) { - this.limit = limit; + public void setPushDownLimit(long pushDownLimit) { + this.pushDownLimit = pushDownLimit; } - public void setOffset(long offset) { - this.offset = offset; + public void setPushDownOffset(long pushDownOffset) { + this.pushDownOffset = pushDownOffset; } @Override @@ -191,8 +192,8 @@ public class AlignedSeriesScanNode extends SeriesSourceNode { getAlignedPath(), getScanOrder(), getPushDownPredicate(), - getLimit(), - getOffset(), + getPushDownLimit(), + getPushDownOffset(), this.regionReplicaSet, this.queryAllSensors); } @@ -223,8 +224,8 @@ public class AlignedSeriesScanNode extends SeriesSourceNode { ReadWriteIOUtils.write((byte) 1, byteBuffer); Expression.serialize(pushDownPredicate, byteBuffer); } - ReadWriteIOUtils.write(limit, byteBuffer); - ReadWriteIOUtils.write(offset, byteBuffer); + ReadWriteIOUtils.write(pushDownLimit, byteBuffer); + ReadWriteIOUtils.write(pushDownOffset, byteBuffer); ReadWriteIOUtils.write(queryAllSensors, byteBuffer); } @@ -239,8 +240,8 @@ public class AlignedSeriesScanNode extends SeriesSourceNode { ReadWriteIOUtils.write((byte) 1, stream); Expression.serialize(pushDownPredicate, stream); } - ReadWriteIOUtils.write(limit, stream); - ReadWriteIOUtils.write(offset, stream); + ReadWriteIOUtils.write(pushDownLimit, stream); + ReadWriteIOUtils.write(pushDownOffset, stream); ReadWriteIOUtils.write(queryAllSensors, stream); } @@ -279,8 +280,8 @@ public class AlignedSeriesScanNode extends SeriesSourceNode { return false; } AlignedSeriesScanNode that = (AlignedSeriesScanNode) o; - return limit == that.limit - && offset == that.offset + return pushDownLimit == that.pushDownLimit + && pushDownOffset == that.pushDownOffset && alignedPath.equals(that.alignedPath) && scanOrder == that.scanOrder && Objects.equals(pushDownPredicate, that.pushDownPredicate) @@ -295,8 +296,8 @@ public class AlignedSeriesScanNode extends SeriesSourceNode { alignedPath, scanOrder, pushDownPredicate, - limit, - offset, + pushDownLimit, + pushDownOffset, regionReplicaSet, queryAllSensors); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/LastQueryScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/LastQueryScanNode.java index b1bfc1af008..9c59f44c373 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/LastQueryScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/LastQueryScanNode.java @@ -214,7 +214,7 @@ public class LastQueryScanNode extends LastSeriesSourceNode { @Override public PartialPath getPartitionPath() { - return seriesPath; + return getSeriesPath(); } public String outputPathSymbol() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/LastSeriesSourceNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/LastSeriesSourceNode.java index e3c313f6c56..fc54cd2f09a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/LastSeriesSourceNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/LastSeriesSourceNode.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.queryengine.plan.planner.plan.node.source; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import java.util.Objects; @@ -60,4 +61,10 @@ public abstract class LastSeriesSourceNode extends SeriesSourceNode { public int hashCode() { return Objects.hash(super.hashCode(), dataNodeSeriesScanNum); } + + @Override + public Expression getPushDownPredicate() { + // unsupported + return null; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesAggregationSourceNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesAggregationSourceNode.java index cadc0735f87..764e258e75a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesAggregationSourceNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesAggregationSourceNode.java @@ -67,6 +67,7 @@ public abstract class SeriesAggregationSourceNode extends SeriesSourceNode { } @Nullable + @Override public Expression getPushDownPredicate() { return pushDownPredicate; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesScanNode.java index 64eb9d3612d..1d16d658e63 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesScanNode.java @@ -58,14 +58,14 @@ public class SeriesScanNode extends SeriesSourceNode { // The default order is TIMESTAMP_ASC, which means "order by timestamp asc" private Ordering scanOrder = Ordering.ASC; - // value filter for current series, could be null if doesn't exist + // push down predicate for current series, could be null if doesn't exist @Nullable private Expression pushDownPredicate; - // Limit for result set. The default value is -1, which means no limit - private long limit; + // push down limit for result set. The default value is -1, which means no limit + private long pushDownLimit; - // offset for result set. The default value is 0 - private long offset; + // push down offset for result set. The default value is 0 + private long pushDownOffset; // The id of DataRegion where the node will run private TRegionReplicaSet regionReplicaSet; @@ -84,12 +84,12 @@ public class SeriesScanNode extends SeriesSourceNode { PlanNodeId id, MeasurementPath seriesPath, Ordering scanOrder, - long limit, - long offset, + long pushDownLimit, + long pushDownOffset, TRegionReplicaSet dataRegionReplicaSet) { this(id, seriesPath, scanOrder); - this.limit = limit; - this.offset = offset; + this.pushDownLimit = pushDownLimit; + this.pushDownOffset = pushDownOffset; this.regionReplicaSet = dataRegionReplicaSet; } @@ -98,13 +98,13 @@ public class SeriesScanNode extends SeriesSourceNode { MeasurementPath seriesPath, Ordering scanOrder, @Nullable Expression pushDownPredicate, - long limit, - long offset, + long pushDownLimit, + long pushDownOffset, TRegionReplicaSet dataRegionReplicaSet) { this(id, seriesPath, scanOrder); this.pushDownPredicate = pushDownPredicate; - this.limit = limit; - this.offset = offset; + this.pushDownLimit = pushDownLimit; + this.pushDownOffset = pushDownOffset; this.regionReplicaSet = dataRegionReplicaSet; } @@ -124,20 +124,20 @@ public class SeriesScanNode extends SeriesSourceNode { this.regionReplicaSet = dataRegion; } - public long getLimit() { - return limit; + public long getPushDownLimit() { + return pushDownLimit; } - public long getOffset() { - return offset; + public long getPushDownOffset() { + return pushDownOffset; } - public void setLimit(long limit) { - this.limit = limit; + public void setPushDownLimit(long pushDownLimit) { + this.pushDownLimit = pushDownLimit; } - public void setOffset(long offset) { - this.offset = offset; + public void setPushDownOffset(long pushDownOffset) { + this.pushDownOffset = pushDownOffset; } public Ordering getScanOrder() { @@ -153,6 +153,7 @@ public class SeriesScanNode extends SeriesSourceNode { } @Nullable + @Override public Expression getPushDownPredicate() { return pushDownPredicate; } @@ -183,8 +184,8 @@ public class SeriesScanNode extends SeriesSourceNode { getSeriesPath(), getScanOrder(), getPushDownPredicate(), - getLimit(), - getOffset(), + getPushDownLimit(), + getPushDownOffset(), this.regionReplicaSet); } @@ -209,8 +210,8 @@ public class SeriesScanNode extends SeriesSourceNode { ReadWriteIOUtils.write((byte) 1, byteBuffer); Expression.serialize(pushDownPredicate, byteBuffer); } - ReadWriteIOUtils.write(limit, byteBuffer); - ReadWriteIOUtils.write(offset, byteBuffer); + ReadWriteIOUtils.write(pushDownLimit, byteBuffer); + ReadWriteIOUtils.write(pushDownOffset, byteBuffer); } @Override @@ -224,8 +225,8 @@ public class SeriesScanNode extends SeriesSourceNode { ReadWriteIOUtils.write((byte) 1, stream); Expression.serialize(pushDownPredicate, stream); } - ReadWriteIOUtils.write(limit, stream); - ReadWriteIOUtils.write(offset, stream); + ReadWriteIOUtils.write(pushDownLimit, stream); + ReadWriteIOUtils.write(pushDownOffset, stream); } public static SeriesScanNode deserialize(ByteBuffer byteBuffer) { @@ -264,8 +265,8 @@ public class SeriesScanNode extends SeriesSourceNode { return false; } SeriesScanNode that = (SeriesScanNode) o; - return limit == that.limit - && offset == that.offset + return pushDownLimit == that.pushDownLimit + && pushDownOffset == that.pushDownOffset && seriesPath.equals(that.seriesPath) && scanOrder == that.scanOrder && Objects.equals(pushDownPredicate, that.pushDownPredicate) @@ -279,8 +280,8 @@ public class SeriesScanNode extends SeriesSourceNode { seriesPath, scanOrder, pushDownPredicate, - limit, - offset, + pushDownLimit, + pushDownOffset, regionReplicaSet); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesSourceNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesSourceNode.java index 18ebb71679e..a87d5970766 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesSourceNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesSourceNode.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.queryengine.plan.planner.plan.node.source; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; public abstract class SeriesSourceNode extends SourceNode { @@ -28,4 +29,6 @@ public abstract class SeriesSourceNode extends SourceNode { } public abstract PartialPath getPartitionPath(); + + public abstract Expression getPushDownPredicate(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/SeriesScanOptions.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/SeriesScanOptions.java index 8cd69d57255..e509a15385d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/SeriesScanOptions.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/SeriesScanOptions.java @@ -37,21 +37,21 @@ public class SeriesScanOptions { private Filter pushDownFilter; - private final long limit; - private final long offset; + private final long pushDownLimit; + private final long pushDownOffset; private final Set<String> allSensors; public SeriesScanOptions( Filter globalTimeFilter, Filter pushDownFilter, - long limit, - long offset, + long pushDownLimit, + long pushDownOffset, Set<String> allSensors) { this.globalTimeFilter = globalTimeFilter; this.pushDownFilter = pushDownFilter; - this.limit = limit; - this.offset = offset; + this.pushDownLimit = pushDownLimit; + this.pushDownOffset = pushDownOffset; this.allSensors = allSensors; } @@ -73,20 +73,12 @@ public class SeriesScanOptions { return pushDownFilter; } - public long getLimit() { - return limit; - } - - public long getOffset() { - return offset; - } - public Set<String> getAllSensors() { return allSensors; } public PaginationController getPaginationController() { - return new PaginationController(limit, offset); + return new PaginationController(pushDownLimit, pushDownOffset); } public void setTTL(long dataTTL) { @@ -113,8 +105,8 @@ public class SeriesScanOptions { private Filter globalTimeFilter = null; private Filter pushDownFilter = null; - private long limit = 0L; - private long offset = 0L; + private long pushDownLimit = 0L; + private long pushDownOffset = 0L; private Set<String> allSensors; @@ -128,13 +120,13 @@ public class SeriesScanOptions { return this; } - public Builder withLimit(long limit) { - this.limit = limit; + public Builder withPushDownLimit(long pushDownLimit) { + this.pushDownLimit = pushDownLimit; return this; } - public Builder withOffset(long offset) { - this.offset = offset; + public Builder withPushDownOffset(long pushDownOffset) { + this.pushDownOffset = pushDownOffset; return this; } @@ -143,7 +135,7 @@ public class SeriesScanOptions { } public SeriesScanOptions build() { - return new SeriesScanOptions(globalTimeFilter, pushDownFilter, limit, offset, allSensors); + return new SeriesScanOptions(globalTimeFilter, pushDownFilter, pushDownLimit, pushDownOffset, allSensors); } } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/TestPlanBuilder.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/TestPlanBuilder.java index eda0995d7c0..699e7b265be 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/TestPlanBuilder.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/TestPlanBuilder.java @@ -76,16 +76,16 @@ public class TestPlanBuilder { public TestPlanBuilder scan(String id, PartialPath path, int limit, int offset) { SeriesScanNode node = new SeriesScanNode(new PlanNodeId(id), (MeasurementPath) path); - node.setLimit(limit); - node.setOffset(offset); + node.setPushDownLimit(limit); + node.setPushDownOffset(offset); this.root = node; return this; } public TestPlanBuilder scanAligned(String id, PartialPath path, int limit, int offset) { AlignedSeriesScanNode node = new AlignedSeriesScanNode(new PlanNodeId(id), (AlignedPath) path); - node.setLimit(limit); - node.setOffset(offset); + node.setPushDownLimit(limit); + node.setPushDownOffset(offset); this.root = node; return this; } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/AlignByDeviceOrderByLimitOffsetTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/AlignByDeviceOrderByLimitOffsetTest.java index 18953d210d5..6da7ef8ae69 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/AlignByDeviceOrderByLimitOffsetTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/AlignByDeviceOrderByLimitOffsetTest.java @@ -267,9 +267,9 @@ public class AlignByDeviceOrderByLimitOffsetTest { private void assertScanNodeLimitValue(PlanNode root, long limitValue) { for (PlanNode node : root.getChildren()) { if (node instanceof SeriesScanNode) { - assertEquals(limitValue, ((SeriesScanNode) node).getLimit()); + assertEquals(limitValue, ((SeriesScanNode) node).getPushDownLimit()); } else if (node instanceof AlignedSeriesScanNode) { - assertEquals(limitValue, ((AlignedSeriesScanNode) node).getLimit()); + assertEquals(limitValue, ((AlignedSeriesScanNode) node).getPushDownLimit()); } else { assertScanNodeLimitValue(node, limitValue); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/series/AlignedSeriesScanLimitOffsetPushDownTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/series/AlignedSeriesScanLimitOffsetPushDownTest.java index d9b621e9c3f..dc54256eff7 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/series/AlignedSeriesScanLimitOffsetPushDownTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/series/AlignedSeriesScanLimitOffsetPushDownTest.java @@ -453,8 +453,8 @@ public class AlignedSeriesScanLimitOffsetPushDownTest { SeriesScanOptions.Builder scanOptionsBuilder = new SeriesScanOptions.Builder(); scanOptionsBuilder.withAllSensors(new HashSet<>(scanPath.getMeasurementList())); - scanOptionsBuilder.withLimit(limit); - scanOptionsBuilder.withOffset(offset); + scanOptionsBuilder.withPushDownLimit(limit); + scanOptionsBuilder.withPushDownOffset(offset); AlignedSeriesScanUtil seriesScanUtil = new AlignedSeriesScanUtil( scanPath, diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/series/SeriesScanLimitOffsetPushDownTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/series/SeriesScanLimitOffsetPushDownTest.java index 14dcc3832f6..b95f844b076 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/series/SeriesScanLimitOffsetPushDownTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/series/SeriesScanLimitOffsetPushDownTest.java @@ -247,8 +247,8 @@ public class SeriesScanLimitOffsetPushDownTest { SeriesScanOptions.Builder scanOptionsBuilder = new SeriesScanOptions.Builder(); scanOptionsBuilder.withAllSensors(Collections.singleton(scanPath.getMeasurement())); - scanOptionsBuilder.withLimit(limit); - scanOptionsBuilder.withOffset(offset); + scanOptionsBuilder.withPushDownLimit(limit); + scanOptionsBuilder.withPushDownOffset(offset); SeriesScanUtil seriesScanUtil = new SeriesScanUtil( scanPath,
