This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch lmh/refactorFilter in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ea2d091e3f3c33a5acab78756837c611dd011388 Author: Minghui Liu <[email protected]> AuthorDate: Sun Nov 19 16:56:36 2023 +0800 self-review --- .../protocol/thrift/impl/ClientRPCServiceImpl.java | 6 +- .../db/queryengine/common/MPPQueryContext.java | 2 +- .../fragment/FragmentInstanceContext.java | 29 +++-- .../fragment/FragmentInstanceManager.java | 2 +- .../operator/source/AlignedSeriesScanUtil.java | 4 +- .../execution/operator/source/SeriesScanUtil.java | 8 +- .../db/queryengine/plan/analyze/Analysis.java | 15 +-- .../queryengine/plan/analyze/AnalyzeVisitor.java | 35 +++--- .../plan/analyze/ExpressionAnalyzer.java | 19 +-- .../queryengine/plan/analyze/ExpressionUtils.java | 4 +- .../queryengine/plan/analyze/PredicateUtils.java | 35 ++++++ .../ExtractGlobalTimePredicateVisitor.java | 136 +++++++++++++++++++++ .../plan/planner/LocalExecutionPlanContext.java | 2 +- .../plan/planner/OperatorTreeGenerator.java | 22 ++-- .../SimpleFragmentParallelPlanner.java | 4 +- .../distribution/WriteFragmentParallelPlanner.java | 4 +- .../plan/planner/plan/FragmentInstance.java | 34 +++--- .../source/AlignedSeriesAggregationScanNode.java | 12 +- .../plan/node/source/AlignedSeriesScanNode.java | 15 ++- .../node/source/SeriesAggregationScanNode.java | 12 +- .../planner/plan/node/source/SeriesScanNode.java | 7 +- .../planner/plan/parameter/SeriesScanOptions.java | 22 ++-- .../db/queryengine/plan/analyze/AnalyzeTest.java | 13 +- .../node/process/AggregationNodeSerdeTest.java | 7 +- 24 files changed, 314 insertions(+), 135 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java index d5e49c09ecf..2c68184f8e7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java @@ -633,7 +633,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { TSDataType dataType, boolean isAligned, long startTime, - long endTme, + long endTime, long interval, TAggregationType aggregationType, List<DataRegion> dataRegionList) @@ -645,7 +645,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { "dataRegionList.size() should only be 1 now, current size is " + dataRegionSize); } - Filter timeFilter = TimeFilter.between(startTime, endTme - 1); + Filter timeFilter = TimeFilter.between(startTime, endTime - 1); QueryId queryId = new QueryId("stub_query"); FragmentInstanceId instanceId = @@ -675,7 +675,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter( - startTime, endTme, new TimeDuration(0, interval), new TimeDuration(0, interval), true); + startTime, endTime, new TimeDuration(0, interval), new TimeDuration(0, interval), true); IMeasurementSchema measurementSchema = new MeasurementSchema(measurement, dataType); AbstractSeriesAggregationScanOperator operator; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java index 8c0a1926bb3..94a5f753d3d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java @@ -157,7 +157,7 @@ public class MPPQueryContext { public void generateGlobalTimeFilter(Analysis analysis) { this.globalTimeFilter = - ExpressionUtils.convertExpressionToTimeFilter(analysis.getGlobalTimeFilter()); + ExpressionUtils.convertPredicateToTimeFilter(analysis.getGlobalTimePredicate()); } public Filter getGlobalTimeFilter() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java index 7e5dd8ed319..3dcaea19f31 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java @@ -25,8 +25,8 @@ import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; import org.apache.iotdb.db.queryengine.common.QueryId; import org.apache.iotdb.db.queryengine.common.SessionInfo; +import org.apache.iotdb.db.queryengine.plan.analyze.ExpressionUtils; import org.apache.iotdb.db.queryengine.plan.expression.Expression; -import org.apache.iotdb.db.queryengine.plan.expression.visitor.predicate.ConvertExpressionToTimeFilterVisitor; import org.apache.iotdb.db.storageengine.dataregion.IDataRegionForQuery; import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager; @@ -58,7 +58,7 @@ public class FragmentInstanceContext extends QueryContext { private final FragmentInstanceStateMachine stateMachine; private IDataRegionForQuery dataRegion; - private Filter timeFilter; + private Filter globalTimeFilter; private List<PartialPath> sourcePaths; // Shared by all scan operators in this fragment instance to avoid memory problem private QueryDataSource sharedQueryDataSource; @@ -111,11 +111,16 @@ public class FragmentInstanceContext extends QueryContext { FragmentInstanceStateMachine stateMachine, SessionInfo sessionInfo, IDataRegionForQuery dataRegion, - Expression timeFilter, + Expression globalTimePredicate, Map<QueryId, DataNodeQueryContext> dataNodeQueryContextMap) { FragmentInstanceContext instanceContext = new FragmentInstanceContext( - id, stateMachine, sessionInfo, dataRegion, timeFilter, dataNodeQueryContextMap); + id, + stateMachine, + sessionInfo, + dataRegion, + globalTimePredicate, + dataNodeQueryContextMap); instanceContext.initialize(); instanceContext.start(); return instanceContext; @@ -141,14 +146,14 @@ public class FragmentInstanceContext extends QueryContext { FragmentInstanceStateMachine stateMachine, SessionInfo sessionInfo, IDataRegionForQuery dataRegion, - Expression timeFilter, + Expression globalTimePredicate, Map<QueryId, DataNodeQueryContext> dataNodeQueryContextMap) { this.id = id; this.stateMachine = stateMachine; this.executionEndTime.set(END_TIME_INITIAL_VALUE); this.sessionInfo = sessionInfo; this.dataRegion = dataRegion; - this.timeFilter = timeFilter.accept(new ConvertExpressionToTimeFilterVisitor(), null); + this.globalTimeFilter = ExpressionUtils.convertPredicateToTimeFilter(globalTimePredicate); this.dataNodeQueryContextMap = dataNodeQueryContextMap; this.dataNodeQueryContext = dataNodeQueryContextMap.get(id.getQueryId()); } @@ -168,13 +173,13 @@ public class FragmentInstanceContext extends QueryContext { FragmentInstanceStateMachine stateMachine, SessionInfo sessionInfo, IDataRegionForQuery dataRegion, - Filter timeFilter) { + Filter globalTimeFilter) { this.id = id; this.stateMachine = stateMachine; this.executionEndTime.set(END_TIME_INITIAL_VALUE); this.sessionInfo = sessionInfo; this.dataRegion = dataRegion; - this.timeFilter = timeFilter; + this.globalTimeFilter = globalTimeFilter; this.dataNodeQueryContextMap = null; } @@ -299,8 +304,8 @@ public class FragmentInstanceContext extends QueryContext { return Optional.ofNullable(stateMachine.getFailureCauses().peek()); } - public Filter getTimeFilter() { - return timeFilter; + public Filter getGlobalTimeFilter() { + return globalTimeFilter; } public IDataRegionForQuery getDataRegion() { @@ -331,7 +336,7 @@ public class FragmentInstanceContext extends QueryContext { // filtered according to timeIndex selectedDeviceIdSet.size() == 1 ? selectedDeviceIdSet.iterator().next() : null, this, - timeFilter); + globalTimeFilter); // used files should be added before mergeLock is unlocked, or they may be deleted by // running merge @@ -439,7 +444,7 @@ public class FragmentInstanceContext extends QueryContext { } dataRegion = null; - timeFilter = null; + globalTimeFilter = null; sourcePaths = null; sharedQueryDataSource = null; releaseDataNodeQueryContext(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java index b9ead85cbb5..945a59d3894 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java @@ -151,7 +151,7 @@ public class FragmentInstanceManager { stateMachine, instance.getSessionInfo(), dataRegion, - instance.getTimeFilter(), + instance.getGlobalTimePredicate(), dataNodeQueryContextMap)); try { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanUtil.java index 2f31a6b58ed..6b9f206adce 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanUtil.java @@ -156,7 +156,7 @@ public class AlignedSeriesScanUtil extends SeriesScanUtil { if (firstTimeSeriesMetadata != null && !isFileOverlapped() && !firstTimeSeriesMetadata.isModified()) { - Filter queryFilter = scanOptions.getQueryFilter(); + Filter queryFilter = scanOptions.getPushDownFilter(); Statistics statistics = firstTimeSeriesMetadata.getStatistics(); if (queryFilter == null || queryFilter.allSatisfy(statistics)) { skipOffsetByTimeSeriesMetadata(); @@ -204,7 +204,7 @@ public class AlignedSeriesScanUtil extends SeriesScanUtil { @Override protected void filterFirstChunkMetadata() throws IOException { if (firstChunkMetadata != null && !isChunkOverlapped() && !firstChunkMetadata.isModified()) { - Filter queryFilter = scanOptions.getQueryFilter(); + Filter queryFilter = scanOptions.getPushDownFilter(); Statistics statistics = firstChunkMetadata.getStatistics(); if (queryFilter == null || queryFilter.allSatisfy(statistics)) { skipOffsetByChunkMetadata(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java index f3f9b483134..b09bde7198f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java @@ -296,7 +296,7 @@ public class SeriesScanUtil { @SuppressWarnings("squid:S3740") protected void filterFirstChunkMetadata() throws IOException { if (firstChunkMetadata != null && !isChunkOverlapped() && !firstChunkMetadata.isModified()) { - Filter queryFilter = scanOptions.getQueryFilter(); + Filter queryFilter = scanOptions.getPushDownFilter(); Statistics statistics = firstChunkMetadata.getStatistics(); if (queryFilter == null || queryFilter.allSatisfy(statistics)) { long rowCount = statistics.getCount(); @@ -646,7 +646,7 @@ public class SeriesScanUtil { return res; } else { // next page is not overlapped, push down value filter & limit offset - Filter queryFilter = scanOptions.getQueryFilter(); + Filter queryFilter = scanOptions.getPushDownFilter(); if (queryFilter != null) { firstPageReader.setFilter(queryFilter); } @@ -791,7 +791,7 @@ public class SeriesScanUtil { } } - Filter queryFilter = scanOptions.getQueryFilter(); + Filter queryFilter = scanOptions.getPushDownFilter(); if (queryFilter != null && !queryFilter.satisfy(timeValuePair.getTimestamp(), valueForFilter)) { continue; @@ -1051,7 +1051,7 @@ public class SeriesScanUtil { if (firstTimeSeriesMetadata != null && !isFileOverlapped() && !firstTimeSeriesMetadata.isModified()) { - Filter queryFilter = scanOptions.getQueryFilter(); + Filter queryFilter = scanOptions.getPushDownFilter(); Statistics statistics = firstTimeSeriesMetadata.getStatistics(); if (queryFilter == null || queryFilter.allSatisfy(statistics)) { long rowCount = statistics.getCount(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java index 12b54118e6d..31cfa51d747 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java @@ -177,8 +177,8 @@ public class Analysis { // indicate is there a value filter private boolean hasValueFilter = false; - // a global time filter used in `initQueryDataSource` and filter push down - private Expression globalTimeFilter; + // a global time predicate used in `initQueryDataSource` and filter push down + private Expression globalTimePredicate; // expression of output column to be calculated private Set<Expression> selectExpressions; @@ -330,12 +330,12 @@ public class Analysis { redirectNodeList.add(endPoint); } - public Expression getGlobalTimeFilter() { - return globalTimeFilter; + public Expression getGlobalTimePredicate() { + return globalTimePredicate; } - public void setGlobalTimeFilter(Expression timeFilter) { - this.globalTimeFilter = timeFilter; + public void setGlobalTimePredicate(Expression timeFilter) { + this.globalTimePredicate = timeFilter; } public DatasetHeader getRespDatasetHeader() { @@ -351,9 +351,6 @@ public class Analysis { if (expression.getExpressionType().equals(ExpressionType.NULL)) { return null; } - if (expression.getExpressionType().equals(ExpressionType.TIMESTAMP)) { - return TSDataType.INT64; - } TSDataType type = expressionTypes.get(NodeRef.of(expression)); checkArgument(type != null, "Expression is not analyzed: %s", expression); return type; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java index 3b1749d9ffd..c51933221e5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java @@ -330,7 +330,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> analyzeOutput(analysis, queryStatement, outputExpressions); // fetch partition information - analyzeDataPartition(analysis, queryStatement, schemaTree, context); + analyzeDataPartition(analysis, queryStatement, schemaTree, context.getGlobalTimeFilter()); } catch (StatementAnalyzeException e) { throw new StatementAnalyzeException( @@ -385,17 +385,17 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> } private void analyzeGlobalTimeFilter(Analysis analysis, QueryStatement queryStatement) { - Expression globalTimeFilter = null; + Expression globalTimePredicate = null; boolean hasValueFilter = false; if (queryStatement.getWhereCondition() != null) { WhereCondition whereCondition = queryStatement.getWhereCondition(); Expression predicate = whereCondition.getPredicate(); Pair<Expression, Boolean> resultPair = - ExpressionAnalyzer.extractGlobalTimeFilter(predicate, true, true); - globalTimeFilter = resultPair.left; - if (globalTimeFilter != null) { - globalTimeFilter = ExpressionAnalyzer.predicateRemoveNot(globalTimeFilter); + ExpressionAnalyzer.extractGlobalTimePredicate(predicate, true, true); + globalTimePredicate = resultPair.left; + if (globalTimePredicate != null) { + globalTimePredicate = ExpressionAnalyzer.predicateRemoveNot(globalTimePredicate); } hasValueFilter = resultPair.right; @@ -410,7 +410,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> whereCondition.setPredicate(predicate); } } - analysis.setGlobalTimeFilter(globalTimeFilter); + analysis.setGlobalTimePredicate(globalTimePredicate); analysis.setHasValueFilter(hasValueFilter); } @@ -433,7 +433,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> analysis.setRespDatasetHeader(DatasetHeaderFactory.getLastQueryHeader()); // fetch partition information - analyzeDataPartition(analysis, queryStatement, schemaTree, context); + analyzeDataPartition(analysis, queryStatement, schemaTree, context.getGlobalTimeFilter()); return analysis; } @@ -1765,14 +1765,14 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(groupByTimeComponent); analysis.setGroupByTimeParameter(groupByTimeParameter); - Expression globalTimeFilter = analysis.getGlobalTimeFilter(); - Expression groupByExpression = ExpressionFactory.groupByTime(groupByTimeParameter); - if (globalTimeFilter == null) { - globalTimeFilter = groupByExpression; + Expression globalTimePredicate = analysis.getGlobalTimePredicate(); + Expression groupByTimePredicate = ExpressionFactory.groupByTime(groupByTimeParameter); + if (globalTimePredicate == null) { + globalTimePredicate = groupByTimePredicate; } else { - globalTimeFilter = ExpressionFactory.and(globalTimeFilter, groupByExpression); + globalTimePredicate = ExpressionFactory.and(globalTimePredicate, groupByTimePredicate); } - analysis.setGlobalTimeFilter(globalTimeFilter); + analysis.setGlobalTimePredicate(globalTimePredicate); } private void analyzeFill(Analysis analysis, QueryStatement queryStatement) { @@ -1789,7 +1789,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> Analysis analysis, QueryStatement queryStatement, ISchemaTree schemaTree, - MPPQueryContext context) { + Filter globalTimeFilter) { Set<String> deviceSet = new HashSet<>(); if (queryStatement.isAlignByDevice()) { deviceSet = @@ -1802,7 +1802,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> } } DataPartition dataPartition = - fetchDataPartitionByDevices(deviceSet, schemaTree, context.getGlobalTimeFilter()); + fetchDataPartitionByDevices(deviceSet, schemaTree, globalTimeFilter); analysis.setDataPartitionInfo(dataPartition); } @@ -2668,7 +2668,8 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> Collections.singletonList( new TimeSeriesOperand(showTimeSeriesStatement.getPathPattern())), schemaTree); - analyzeDataPartition(analysis, new QueryStatement(), schemaTree, context); + analyzeDataPartition( + analysis, new QueryStatement(), schemaTree, context.getGlobalTimeFilter()); } analysis.setRespDatasetHeader(DatasetHeaderFactory.getShowTimeSeriesHeader()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionAnalyzer.java index 8859b7c59e7..44a7ba4ecb4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionAnalyzer.java @@ -470,21 +470,21 @@ public class ExpressionAnalyzer { } /** - * Extract global time filter from query filter. + * Extract global time predicate from query predicate. * - * @param predicate raw query filter + * @param predicate raw query predicate * @param canRewrite determined by the father of current expression * @param isFirstOr whether it is the first LogicOrExpression encountered - * @return global time filter + * @return global time predicate */ - public static Pair<Expression, Boolean> extractGlobalTimeFilter( + public static Pair<Expression, Boolean> extractGlobalTimePredicate( Expression predicate, boolean canRewrite, boolean isFirstOr) { if (predicate.getExpressionType().equals(ExpressionType.LOGIC_AND)) { Pair<Expression, Boolean> leftResultPair = - extractGlobalTimeFilter( + extractGlobalTimePredicate( ((BinaryExpression) predicate).getLeftExpression(), canRewrite, isFirstOr); Pair<Expression, Boolean> rightResultPair = - extractGlobalTimeFilter( + extractGlobalTimePredicate( ((BinaryExpression) predicate).getRightExpression(), canRewrite, isFirstOr); // rewrite predicate to avoid duplicate calculation on time filter @@ -513,9 +513,10 @@ public class ExpressionAnalyzer { return new Pair<>(null, true); } else if (predicate.getExpressionType().equals(ExpressionType.LOGIC_OR)) { Pair<Expression, Boolean> leftResultPair = - extractGlobalTimeFilter(((BinaryExpression) predicate).getLeftExpression(), false, false); + extractGlobalTimePredicate( + ((BinaryExpression) predicate).getLeftExpression(), false, false); Pair<Expression, Boolean> rightResultPair = - extractGlobalTimeFilter( + extractGlobalTimePredicate( ((BinaryExpression) predicate).getRightExpression(), false, false); if (leftResultPair.left != null && rightResultPair.left != null) { @@ -532,7 +533,7 @@ public class ExpressionAnalyzer { return new Pair<>(null, true); } else if (predicate.getExpressionType().equals(ExpressionType.LOGIC_NOT)) { Pair<Expression, Boolean> childResultPair = - extractGlobalTimeFilter( + extractGlobalTimePredicate( ((UnaryExpression) predicate).getExpression(), canRewrite, isFirstOr); if (childResultPair.left != null) { return new Pair<>(ExpressionFactory.not(childResultPair.left), childResultPair.right); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionUtils.java index 4fac3b4c47b..b14512dedf4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionUtils.java @@ -360,7 +360,7 @@ public class ExpressionUtils { } } - public static Filter convertExpressionToTimeFilter(Expression expression) { - return expression.accept(new ConvertExpressionToTimeFilterVisitor(), null); + public static Filter convertPredicateToTimeFilter(Expression predicate) { + return predicate.accept(new ConvertExpressionToTimeFilterVisitor(), null); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/PredicateUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/PredicateUtils.java new file mode 100644 index 00000000000..06230f7cdc3 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/PredicateUtils.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.analyze; + +import org.apache.iotdb.db.queryengine.plan.expression.Expression; +import org.apache.iotdb.db.queryengine.plan.expression.visitor.predicate.ConvertExpressionToFilterVisitor; +import org.apache.iotdb.tsfile.read.filter.basic.Filter; + +public class PredicateUtils { + + private PredicateUtils() { + // util class + } + + public static Filter convertPredicateToFilter(Expression predicate, TypeProvider typeProvider) { + return predicate.accept(new ConvertExpressionToFilterVisitor(), typeProvider); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/predicate/ExtractGlobalTimePredicateVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/predicate/ExtractGlobalTimePredicateVisitor.java new file mode 100644 index 00000000000..91ea5c1cf73 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/visitor/predicate/ExtractGlobalTimePredicateVisitor.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.expression.visitor.predicate; + +import org.apache.iotdb.db.queryengine.plan.expression.Expression; +import org.apache.iotdb.db.queryengine.plan.expression.binary.EqualToExpression; +import org.apache.iotdb.db.queryengine.plan.expression.binary.GreaterEqualExpression; +import org.apache.iotdb.db.queryengine.plan.expression.binary.GreaterThanExpression; +import org.apache.iotdb.db.queryengine.plan.expression.binary.LessEqualExpression; +import org.apache.iotdb.db.queryengine.plan.expression.binary.LessThanExpression; +import org.apache.iotdb.db.queryengine.plan.expression.binary.LogicAndExpression; +import org.apache.iotdb.db.queryengine.plan.expression.binary.LogicOrExpression; +import org.apache.iotdb.db.queryengine.plan.expression.binary.NonEqualExpression; +import org.apache.iotdb.db.queryengine.plan.expression.ternary.BetweenExpression; +import org.apache.iotdb.db.queryengine.plan.expression.unary.FixedIntervalMultiRangeExpression; +import org.apache.iotdb.db.queryengine.plan.expression.unary.InExpression; +import org.apache.iotdb.db.queryengine.plan.expression.unary.IsNullExpression; +import org.apache.iotdb.db.queryengine.plan.expression.unary.LikeExpression; +import org.apache.iotdb.db.queryengine.plan.expression.unary.LogicNotExpression; +import org.apache.iotdb.db.queryengine.plan.expression.unary.RegularExpression; + +/** Extract global time predicate from query predicate. */ +public class ExtractGlobalTimePredicateVisitor + extends PredicateVisitor<Expression, ExtractGlobalTimePredicateVisitor.Context> { + + @Override + public Expression visitInExpression(InExpression inExpression, Context context) { + return null; + } + + @Override + public Expression visitFixedIntervalMultiRangeExpression( + FixedIntervalMultiRangeExpression fixedIntervalMultiRangeExpression, Context context) { + return null; + } + + @Override + public Expression visitIsNullExpression(IsNullExpression isNullExpression, Context context) { + return null; + } + + @Override + public Expression visitLikeExpression(LikeExpression likeExpression, Context context) { + return null; + } + + @Override + public Expression visitRegularExpression(RegularExpression regularExpression, Context context) { + return null; + } + + @Override + public Expression visitLogicNotExpression( + LogicNotExpression logicNotExpression, Context context) { + return null; + } + + @Override + public Expression visitLogicAndExpression( + LogicAndExpression logicAndExpression, Context context) { + return null; + } + + @Override + public Expression visitLogicOrExpression(LogicOrExpression logicOrExpression, Context context) { + return null; + } + + @Override + public Expression visitEqualToExpression(EqualToExpression equalToExpression, Context context) { + return null; + } + + @Override + public Expression visitNonEqualExpression( + NonEqualExpression nonEqualExpression, Context context) { + return null; + } + + @Override + public Expression visitGreaterThanExpression( + GreaterThanExpression greaterThanExpression, Context context) { + return null; + } + + @Override + public Expression visitGreaterEqualExpression( + GreaterEqualExpression greaterEqualExpression, Context context) { + return null; + } + + @Override + public Expression visitLessThanExpression( + LessThanExpression lessThanExpression, Context context) { + return null; + } + + @Override + public Expression visitLessEqualExpression( + LessEqualExpression lessEqualExpression, Context context) { + return null; + } + + @Override + public Expression visitBetweenExpression(BetweenExpression betweenExpression, Context context) { + return null; + } + + protected static class Context { + + boolean hasValueFilter = false; + + // determined by the father of current expression + boolean canRewrite = false; + + // whether it is the first LogicOrExpression encountered + boolean isFirstOr = true; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanContext.java index 0978dfe594c..08a0b6f1c23 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanContext.java @@ -293,6 +293,6 @@ public class LocalExecutionPlanContext { } public Filter getGlobalTimeFilter() { - return driverContext.getFragmentInstanceContext().getTimeFilter(); + return driverContext.getFragmentInstanceContext().getGlobalTimeFilter(); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java index a23cc3d6d30..c0a99e9a4c1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java @@ -133,13 +133,13 @@ import org.apache.iotdb.db.queryengine.execution.operator.window.WindowParameter import org.apache.iotdb.db.queryengine.execution.operator.window.WindowType; import org.apache.iotdb.db.queryengine.plan.Coordinator; import org.apache.iotdb.db.queryengine.plan.analyze.ExpressionTypeAnalyzer; +import org.apache.iotdb.db.queryengine.plan.analyze.PredicateUtils; import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider; import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeSchemaCache; import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand; import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimestampOperand; import org.apache.iotdb.db.queryengine.plan.expression.visitor.ColumnTransformerVisitor; -import org.apache.iotdb.db.queryengine.plan.expression.visitor.predicate.ConvertExpressionToFilterVisitor; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.CountSchemaMergeNode; @@ -290,10 +290,9 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP Expression pushDownPredicate = node.getPushDownPredicate(); if (pushDownPredicate != null) { pushDownFilter = - node.getPushDownPredicate() - .accept(new ConvertExpressionToFilterVisitor(), context.getTypeProvider()); + PredicateUtils.convertPredicateToFilter(pushDownPredicate, context.getTypeProvider()); } - seriesScanOptionsBuilder.withQueryFilter(pushDownFilter); + seriesScanOptionsBuilder.withPushDownFilter(pushDownFilter); PartialPath seriesPath = node.getSeriesPath(); seriesScanOptionsBuilder.withAllSensors( @@ -335,10 +334,9 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP Expression pushDownPredicate = node.getPushDownPredicate(); if (pushDownPredicate != null) { pushDownFilter = - node.getPushDownPredicate() - .accept(new ConvertExpressionToFilterVisitor(), context.getTypeProvider()); + PredicateUtils.convertPredicateToFilter(pushDownPredicate, context.getTypeProvider()); } - seriesScanOptionsBuilder.withQueryFilter(pushDownFilter); + seriesScanOptionsBuilder.withPushDownFilter(pushDownFilter); seriesScanOptionsBuilder.withLimit(node.getLimit()); seriesScanOptionsBuilder.withOffset(node.getOffset()); @@ -404,10 +402,9 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP Expression pushDownPredicate = node.getPushDownPredicate(); if (pushDownPredicate != null) { pushDownFilter = - node.getPushDownPredicate() - .accept(new ConvertExpressionToFilterVisitor(), context.getTypeProvider()); + PredicateUtils.convertPredicateToFilter(pushDownPredicate, context.getTypeProvider()); } - scanOptionsBuilder.withQueryFilter(pushDownFilter); + scanOptionsBuilder.withPushDownFilter(pushDownFilter); scanOptionsBuilder.withAllSensors( context.getAllSensors(seriesPath.getDevice(), seriesPath.getMeasurement())); @@ -501,10 +498,9 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP Expression pushDownPredicate = node.getPushDownPredicate(); if (pushDownPredicate != null) { pushDownFilter = - node.getPushDownPredicate() - .accept(new ConvertExpressionToFilterVisitor(), context.getTypeProvider()); + PredicateUtils.convertPredicateToFilter(pushDownPredicate, context.getTypeProvider()); } - scanOptionsBuilder.withQueryFilter(pushDownFilter); + scanOptionsBuilder.withPushDownFilter(pushDownFilter); scanOptionsBuilder.withAllSensors(new HashSet<>(seriesPath.getMeasurementList())); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java index b3ba0bf1475..5d7bb04fdc3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java @@ -135,12 +135,12 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner { } private void produceFragmentInstance(PlanFragment fragment) { - Expression timeFilter = analysis.getGlobalTimeFilter(); + Expression globalTimePredicate = analysis.getGlobalTimePredicate(); FragmentInstance fragmentInstance = new FragmentInstance( fragment, fragment.getId().genFragmentInstanceId(), - timeFilter, + globalTimePredicate, queryContext.getQueryType(), queryContext.getTimeOut(), queryContext.getSession(), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/WriteFragmentParallelPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/WriteFragmentParallelPlanner.java index 141ce3ccfd0..36cb4ecc9d1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/WriteFragmentParallelPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/WriteFragmentParallelPlanner.java @@ -49,7 +49,7 @@ public class WriteFragmentParallelPlanner implements IFragmentParallelPlaner { @Override public List<FragmentInstance> parallelPlan() { PlanFragment fragment = subPlan.getPlanFragment(); - Expression timeFilter = analysis.getGlobalTimeFilter(); + Expression globalTimePredicate = analysis.getGlobalTimePredicate(); PlanNode node = fragment.getPlanNodeTree(); if (!(node instanceof WritePlanNode)) { throw new IllegalArgumentException("PlanNode should be IWritePlanNode in WRITE operation"); @@ -61,7 +61,7 @@ public class WriteFragmentParallelPlanner implements IFragmentParallelPlaner { new FragmentInstance( new PlanFragment(fragment.getId(), split), fragment.getId().genFragmentInstanceId(), - timeFilter, + globalTimePredicate, queryContext.getQueryType(), queryContext.getTimeOut(), queryContext.getSession()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java index 00fc2978a50..ae83fd6623a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java @@ -60,7 +60,7 @@ public class FragmentInstance implements IConsensusRequest { private TDataNodeLocation hostDataNode; - private final Expression timeFilter; + private final Expression globalTimePredicate; private final long timeOut; @@ -82,12 +82,12 @@ public class FragmentInstance implements IConsensusRequest { public FragmentInstance( PlanFragment fragment, FragmentInstanceId id, - Expression timeFilter, + Expression globalTimePredicate, QueryType type, long timeOut, SessionInfo sessionInfo) { this.fragment = fragment; - this.timeFilter = timeFilter; + this.globalTimePredicate = globalTimePredicate; this.id = id; this.type = type; this.timeOut = timeOut > 0 ? timeOut : config.getQueryTimeoutThreshold(); @@ -98,24 +98,24 @@ public class FragmentInstance implements IConsensusRequest { public FragmentInstance( PlanFragment fragment, FragmentInstanceId id, - Expression timeFilter, + Expression globalTimePredicate, QueryType type, long timeOut, SessionInfo sessionInfo, boolean isRoot) { - this(fragment, id, timeFilter, type, timeOut, sessionInfo); + this(fragment, id, globalTimePredicate, type, timeOut, sessionInfo); this.isRoot = isRoot; } public FragmentInstance( PlanFragment fragment, FragmentInstanceId id, - Expression timeFilter, + Expression globalTimePredicate, QueryType type, long timeOut, SessionInfo sessionInfo, int dataNodeFINum) { - this(fragment, id, timeFilter, type, timeOut, sessionInfo); + this(fragment, id, globalTimePredicate, type, timeOut, sessionInfo); this.dataNodeFINum = dataNodeFINum; } @@ -166,8 +166,8 @@ public class FragmentInstance implements IConsensusRequest { isHighestPriority = highestPriority; } - public Expression getTimeFilter() { - return timeFilter; + public Expression getGlobalTimePredicate() { + return globalTimePredicate; } public QueryType getType() { @@ -207,13 +207,13 @@ public class FragmentInstance implements IConsensusRequest { long timeOut = ReadWriteIOUtils.readLong(buffer); boolean hasSessionInfo = ReadWriteIOUtils.readBool(buffer); SessionInfo sessionInfo = hasSessionInfo ? SessionInfo.deserializeFrom(buffer) : null; - boolean hasTimeFilter = ReadWriteIOUtils.readBool(buffer); - Expression timeFilter = hasTimeFilter ? Expression.deserialize(buffer) : null; + boolean hasTimePredicate = ReadWriteIOUtils.readBool(buffer); + Expression globalTimePredicate = hasTimePredicate ? Expression.deserialize(buffer) : null; QueryType queryType = QueryType.values()[ReadWriteIOUtils.readInt(buffer)]; int dataNodeFINum = ReadWriteIOUtils.readInt(buffer); FragmentInstance fragmentInstance = new FragmentInstance( - planFragment, id, timeFilter, queryType, timeOut, sessionInfo, dataNodeFINum); + planFragment, id, globalTimePredicate, queryType, timeOut, sessionInfo, dataNodeFINum); boolean hasHostDataNode = ReadWriteIOUtils.readBool(buffer); fragmentInstance.hostDataNode = hasHostDataNode ? ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(buffer) : null; @@ -230,9 +230,9 @@ public class FragmentInstance implements IConsensusRequest { if (sessionInfo != null) { sessionInfo.serialize(outputStream); } - ReadWriteIOUtils.write(timeFilter != null, outputStream); - if (timeFilter != null) { - Expression.serialize(timeFilter, outputStream); + ReadWriteIOUtils.write(globalTimePredicate != null, outputStream); + if (globalTimePredicate != null) { + Expression.serialize(globalTimePredicate, outputStream); } ReadWriteIOUtils.write(type.ordinal(), outputStream); ReadWriteIOUtils.write(dataNodeFINum, outputStream); @@ -257,12 +257,12 @@ public class FragmentInstance implements IConsensusRequest { && Objects.equals(fragment, instance.fragment) && Objects.equals(executorType, instance.executorType) && Objects.equals(hostDataNode, instance.hostDataNode) - && Objects.equals(timeFilter, instance.timeFilter); + && Objects.equals(globalTimePredicate, instance.globalTimePredicate); } @Override public int hashCode() { - return Objects.hash(id, type, fragment, executorType, hostDataNode, timeFilter); + return Objects.hash(id, type, fragment, executorType, hostDataNode, globalTimePredicate); } public TDataNodeLocation getHostDataNode() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java index 18cbb6c2ef0..5d8f7f8f3d0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java @@ -81,11 +81,11 @@ public class AlignedSeriesAggregationScanNode extends SeriesAggregationSourceNod AlignedPath alignedPath, List<AggregationDescriptor> aggregationDescriptorList, Ordering scanOrder, - @Nullable Expression valueFilter, + @Nullable Expression pushDownPredicate, @Nullable GroupByTimeParameter groupByTimeParameter, TRegionReplicaSet dataRegionReplicaSet) { this(id, alignedPath, aggregationDescriptorList, scanOrder, groupByTimeParameter); - this.pushDownPredicate = valueFilter; + this.pushDownPredicate = pushDownPredicate; this.regionReplicaSet = dataRegionReplicaSet; } @@ -214,9 +214,9 @@ public class AlignedSeriesAggregationScanNode extends SeriesAggregationSourceNod } Ordering scanOrder = Ordering.values()[ReadWriteIOUtils.readInt(byteBuffer)]; byte isNull = ReadWriteIOUtils.readByte(byteBuffer); - Expression valueFilter = null; + Expression pushDownPredicate = null; if (isNull == 1) { - valueFilter = Expression.deserialize(byteBuffer); + pushDownPredicate = Expression.deserialize(byteBuffer); } isNull = ReadWriteIOUtils.readByte(byteBuffer); GroupByTimeParameter groupByTimeParameter = null; @@ -229,7 +229,7 @@ public class AlignedSeriesAggregationScanNode extends SeriesAggregationSourceNod alignedPath, aggregationDescriptorList, scanOrder, - valueFilter, + pushDownPredicate, groupByTimeParameter, null); } @@ -257,7 +257,7 @@ public class AlignedSeriesAggregationScanNode extends SeriesAggregationSourceNod @Override public PartialPath getPartitionPath() { - return alignedPath; + return getAlignedPath(); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesScanNode.java index b2297eb54e6..3e22b13a5f4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesScanNode.java @@ -248,16 +248,23 @@ public class AlignedSeriesScanNode extends SeriesSourceNode { AlignedPath alignedPath = (AlignedPath) PathDeserializeUtil.deserialize(byteBuffer); Ordering scanOrder = Ordering.values()[ReadWriteIOUtils.readInt(byteBuffer)]; byte isNull = ReadWriteIOUtils.readByte(byteBuffer); - Expression valueFilter = null; + Expression pushDownPredicate = null; if (isNull == 1) { - valueFilter = Expression.deserialize(byteBuffer); + pushDownPredicate = Expression.deserialize(byteBuffer); } long limit = ReadWriteIOUtils.readLong(byteBuffer); long offset = ReadWriteIOUtils.readLong(byteBuffer); boolean queryAllSensors = ReadWriteIOUtils.readBool(byteBuffer); PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); return new AlignedSeriesScanNode( - planNodeId, alignedPath, scanOrder, valueFilter, limit, offset, null, queryAllSensors); + planNodeId, + alignedPath, + scanOrder, + pushDownPredicate, + limit, + offset, + null, + queryAllSensors); } @Override @@ -304,6 +311,6 @@ public class AlignedSeriesScanNode extends SeriesSourceNode { @Override public PartialPath getPartitionPath() { - return alignedPath; + return getAlignedPath(); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesAggregationScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesAggregationScanNode.java index 8bd9afe2f5d..71c15ad8bda 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesAggregationScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesAggregationScanNode.java @@ -93,11 +93,11 @@ public class SeriesAggregationScanNode extends SeriesAggregationSourceNode { MeasurementPath seriesPath, List<AggregationDescriptor> aggregationDescriptorList, Ordering scanOrder, - @Nullable Expression valueFilter, + @Nullable Expression pushDownPredicate, @Nullable GroupByTimeParameter groupByTimeParameter, TRegionReplicaSet dataRegionReplicaSet) { this(id, seriesPath, aggregationDescriptorList, scanOrder, groupByTimeParameter); - this.pushDownPredicate = valueFilter; + this.pushDownPredicate = pushDownPredicate; this.regionReplicaSet = dataRegionReplicaSet; } @@ -221,9 +221,9 @@ public class SeriesAggregationScanNode extends SeriesAggregationSourceNode { } Ordering scanOrder = Ordering.values()[ReadWriteIOUtils.readInt(byteBuffer)]; byte isNull = ReadWriteIOUtils.readByte(byteBuffer); - Expression valueFilter = null; + Expression pushDownPredicate = null; if (isNull == 1) { - valueFilter = Expression.deserialize(byteBuffer); + pushDownPredicate = Expression.deserialize(byteBuffer); } isNull = ReadWriteIOUtils.readByte(byteBuffer); GroupByTimeParameter groupByTimeParameter = null; @@ -236,7 +236,7 @@ public class SeriesAggregationScanNode extends SeriesAggregationSourceNode { partialPath, aggregationDescriptorList, scanOrder, - valueFilter, + pushDownPredicate, groupByTimeParameter, null); } @@ -264,7 +264,7 @@ public class SeriesAggregationScanNode extends SeriesAggregationSourceNode { @Override public PartialPath getPartitionPath() { - return seriesPath; + return getSeriesPath(); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesScanNode.java index 6451d8999fb..64eb9d3612d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesScanNode.java @@ -232,14 +232,15 @@ public class SeriesScanNode extends SeriesSourceNode { MeasurementPath partialPath = (MeasurementPath) PathDeserializeUtil.deserialize(byteBuffer); Ordering scanOrder = Ordering.values()[ReadWriteIOUtils.readInt(byteBuffer)]; byte isNull = ReadWriteIOUtils.readByte(byteBuffer); - Expression valueFilter = null; + Expression pushDownPredicate = null; if (isNull == 1) { - valueFilter = Expression.deserialize(byteBuffer); + pushDownPredicate = Expression.deserialize(byteBuffer); } long limit = ReadWriteIOUtils.readLong(byteBuffer); long offset = ReadWriteIOUtils.readLong(byteBuffer); PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); - return new SeriesScanNode(planNodeId, partialPath, scanOrder, valueFilter, limit, offset, null); + return new SeriesScanNode( + planNodeId, partialPath, scanOrder, pushDownPredicate, limit, offset, null); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/SeriesScanOptions.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/SeriesScanOptions.java index 953d7d07c5e..8cd69d57255 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/SeriesScanOptions.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/SeriesScanOptions.java @@ -35,7 +35,7 @@ public class SeriesScanOptions { private Filter globalTimeFilter; - private Filter queryFilter; + private Filter pushDownFilter; private final long limit; private final long offset; @@ -44,12 +44,12 @@ public class SeriesScanOptions { public SeriesScanOptions( Filter globalTimeFilter, - Filter queryFilter, + Filter pushDownFilter, long limit, long offset, Set<String> allSensors) { this.globalTimeFilter = globalTimeFilter; - this.queryFilter = queryFilter; + this.pushDownFilter = pushDownFilter; this.limit = limit; this.offset = offset; this.allSensors = allSensors; @@ -69,8 +69,8 @@ public class SeriesScanOptions { return globalTimeFilter; } - public Filter getQueryFilter() { - return queryFilter; + public Filter getPushDownFilter() { + return pushDownFilter; } public long getLimit() { @@ -91,8 +91,8 @@ public class SeriesScanOptions { public void setTTL(long dataTTL) { this.globalTimeFilter = updateFilterUsingTTL(globalTimeFilter, dataTTL); - if (this.queryFilter != null) { - this.queryFilter = updateFilterUsingTTL(queryFilter, dataTTL); + if (this.pushDownFilter != null) { + this.pushDownFilter = updateFilterUsingTTL(pushDownFilter, dataTTL); } } @@ -112,7 +112,7 @@ public class SeriesScanOptions { public static class Builder { private Filter globalTimeFilter = null; - private Filter queryFilter = null; + private Filter pushDownFilter = null; private long limit = 0L; private long offset = 0L; @@ -123,8 +123,8 @@ public class SeriesScanOptions { return this; } - public Builder withQueryFilter(Filter queryFilter) { - this.queryFilter = queryFilter; + public Builder withPushDownFilter(Filter pushDownFilter) { + this.pushDownFilter = pushDownFilter; return this; } @@ -143,7 +143,7 @@ public class SeriesScanOptions { } public SeriesScanOptions build() { - return new SeriesScanOptions(globalTimeFilter, queryFilter, limit, offset, allSensors); + return new SeriesScanOptions(globalTimeFilter, pushDownFilter, limit, offset, allSensors); } } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeTest.java index d2998fb1c5a..9489ae54fed 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeTest.java @@ -84,7 +84,7 @@ public class AnalyzeTest { Analysis actualAnalysis = analyzeSQL(sql); Analysis expectedAnalysis = new Analysis(); - expectedAnalysis.setGlobalTimeFilter(gt(time(), longValue(100))); + expectedAnalysis.setGlobalTimePredicate(gt(time(), longValue(100))); expectedAnalysis.setSelectExpressions( Sets.newHashSet( new TimeSeriesOperand(new PartialPath("root.sg.d1.s1")), @@ -125,7 +125,7 @@ public class AnalyzeTest { + "where time >= 100 and s2 >= 10 and s2 != 6;"; actualAnalysis = analyzeSQL(sql); expectedAnalysis = new Analysis(); - expectedAnalysis.setGlobalTimeFilter(gt(time(), longValue(100))); + expectedAnalysis.setGlobalTimePredicate(gt(time(), longValue(100))); expectedAnalysis.setSelectExpressions( Sets.newHashSet( new TimeSeriesOperand(new PartialPath("root.sg.d1.s1")), @@ -195,7 +195,7 @@ public class AnalyzeTest { Analysis actualAnalysis = analyzeSQL(sql); Analysis expectedAnalysis = new Analysis(); - expectedAnalysis.setGlobalTimeFilter( + expectedAnalysis.setGlobalTimePredicate( and(gt(time(), longValue(100)), groupByTime(10, 10, 0, 1000))); expectedAnalysis.setSelectExpressions( Sets.newHashSet( @@ -283,7 +283,7 @@ public class AnalyzeTest { Analysis actualAnalysis = analyzeSQL(sql); Analysis expectedAnalysis = new Analysis(); - expectedAnalysis.setGlobalTimeFilter(gt(time(), longValue(100))); + expectedAnalysis.setGlobalTimePredicate(gt(time(), longValue(100))); expectedAnalysis.setSelectExpressions( Sets.newHashSet( new TimeSeriesOperand( @@ -384,7 +384,7 @@ public class AnalyzeTest { Analysis actualAnalysis = analyzeSQL(sql); Analysis expectedAnalysis = new Analysis(); - expectedAnalysis.setGlobalTimeFilter( + expectedAnalysis.setGlobalTimePredicate( and(gt(time(), longValue(100)), groupByTime(10, 10, 0, 1000))); expectedAnalysis.setSelectExpressions( Sets.newHashSet( @@ -1115,7 +1115,8 @@ public class AnalyzeTest { assertEquals(expectedAnalysis.getSelectExpressions(), actualAnalysis.getSelectExpressions()); assertEquals(expectedAnalysis.getHavingExpression(), actualAnalysis.getHavingExpression()); assertEquals(expectedAnalysis.getRespDatasetHeader(), actualAnalysis.getRespDatasetHeader()); - assertEquals(expectedAnalysis.getGlobalTimeFilter(), actualAnalysis.getGlobalTimeFilter()); + assertEquals( + expectedAnalysis.getGlobalTimePredicate(), actualAnalysis.getGlobalTimePredicate()); } private void orderByAnalysisEqualTest(Analysis actualAnalysis, Analysis expectedAnalysis) { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/node/process/AggregationNodeSerdeTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/node/process/AggregationNodeSerdeTest.java index 2474246e037..47cc47fc721 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/node/process/AggregationNodeSerdeTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/node/process/AggregationNodeSerdeTest.java @@ -22,7 +22,6 @@ import org.apache.iotdb.common.rpc.thrift.TAggregationType; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; -import org.apache.iotdb.db.queryengine.plan.expression.ExpressionFactory; import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand; import org.apache.iotdb.db.queryengine.plan.plan.node.PlanNodeDeserializeHelper; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; @@ -44,6 +43,8 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import static org.apache.iotdb.db.queryengine.plan.expression.ExpressionFactory.in; +import static org.apache.iotdb.db.queryengine.plan.expression.ExpressionFactory.timeSeries; import static org.junit.Assert.assertEquals; public class AggregationNodeSerdeTest { @@ -63,9 +64,7 @@ public class AggregationNodeSerdeTest { Collections.singletonList( new TimeSeriesOperand(new PartialPath("root.sg.d1.s1"))))), Ordering.ASC, - ExpressionFactory.in( - ExpressionFactory.timeSeries("root.sg.d1.s1"), - Sets.newLinkedHashSet(Arrays.asList("s1", "s2"))), + in(timeSeries("root.sg.d1.s1"), Sets.newLinkedHashSet(Arrays.asList("s1", "s2"))), groupByTimeParameter, null); AggregationNode aggregationNode =
