This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch SortPrune in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 25ce9b9e59820a719f2b324c69c85f1f66bdd519 Author: JackieTien97 <[email protected]> AuthorDate: Tue Nov 19 16:15:21 2024 +0800 Try to eliminate redundant Project and Sort For right table of Join clause in some self-join cases --- .../relational/aggregation/AccumulatorFactory.java | 15 +-- .../queryengine/plan/analyze/PredicateUtils.java | 9 +- .../plan/planner/TableOperatorGenerator.java | 92 ++++++++++++------- .../predicate/ConvertPredicateToFilterVisitor.java | 39 ++++---- .../ConvertPredicateToTimeFilterVisitor.java | 60 +++++++----- .../plan/relational/planner/PlanBuilder.java | 3 +- .../plan/relational/planner/QueryPlanner.java | 7 +- .../plan/relational/planner/RelationPlan.java | 14 ++- .../plan/relational/planner/RelationPlanner.java | 33 ++++--- .../ir/GlobalTimePredicateExtractVisitor.java | 37 ++++---- .../optimizations/PushPredicateIntoTableScan.java | 101 +++++++-------------- 11 files changed, 223 insertions(+), 187 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java index f47c8121e76..8a44437ad9e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java @@ -37,7 +37,6 @@ import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggr import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.GroupedSumAccumulator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.GroupedVarianceAccumulator; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression; -import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SymbolReference; import org.apache.tsfile.enums.TSDataType; @@ -45,9 +44,9 @@ import java.util.List; import java.util.Map; import static com.google.common.base.Preconditions.checkState; -import static org.apache.iotdb.commons.schema.table.TsTable.TIME_COLUMN_NAME; import static org.apache.iotdb.db.queryengine.plan.relational.metadata.TableBuiltinAggregationFunction.FIRST_BY; import static org.apache.iotdb.db.queryengine.plan.relational.metadata.TableBuiltinAggregationFunction.LAST_BY; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.ir.GlobalTimePredicateExtractVisitor.isTimeColumn; public class AccumulatorFactory { @@ -57,7 +56,8 @@ public class AccumulatorFactory { List<TSDataType> inputDataTypes, List<Expression> inputExpressions, Map<String, String> inputAttributes, - boolean ascending) { + boolean ascending, + String timeColumnName) { if (aggregationType == TAggregationType.UDAF) { // If UDAF accumulator receives raw input, it needs to check input's attribute throw new UnsupportedOperationException(); @@ -66,9 +66,9 @@ public class AccumulatorFactory { && inputExpressions.size() > 1) { boolean xIsTimeColumn = false; boolean yIsTimeColumn = false; - if (isTimeColumn(inputExpressions.get(1))) { + if (isTimeColumn(inputExpressions.get(1), timeColumnName)) { yIsTimeColumn = true; - } else if (isTimeColumn(inputExpressions.get(0))) { + } else if (isTimeColumn(inputExpressions.get(0), timeColumnName)) { xIsTimeColumn = true; } if (LAST_BY.getFunctionName().equals(functionName)) { @@ -326,9 +326,4 @@ public class AccumulatorFactory { public interface KeepEvaluator { boolean apply(long keep); } - - public static boolean isTimeColumn(Expression expression) { - return expression instanceof SymbolReference - && TIME_COLUMN_NAME.equals(((SymbolReference) expression).getName()); - } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/PredicateUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/PredicateUtils.java index 79f3c1ea485..9fb41fc152d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/PredicateUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/PredicateUtils.java @@ -309,16 +309,17 @@ public class PredicateUtils { public static Filter convertPredicateToFilter( org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression predicate, - List<String> allMeasurements, - Map<Symbol, ColumnSchema> schemaMap) { + Map<String, Integer> measurementColumnsIndexMap, + Map<Symbol, ColumnSchema> schemaMap, + String timeColumnName) { if (predicate == null) { return null; } return predicate.accept( new org.apache.iotdb.db.queryengine.plan.relational.analyzer.predicate - .ConvertPredicateToFilterVisitor(), + .ConvertPredicateToFilterVisitor(timeColumnName), new org.apache.iotdb.db.queryengine.plan.relational.analyzer.predicate - .ConvertPredicateToFilterVisitor.Context(allMeasurements, schemaMap)); + .ConvertPredicateToFilterVisitor.Context(measurementColumnsIndexMap, schemaMap)); } /** diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java index d5d3327dbf9..6fce84dabc9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java @@ -176,12 +176,12 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableList.toImmutableList; import static java.util.Objects.requireNonNull; import static org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.MEASUREMENT; +import static org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.TIME; import static org.apache.iotdb.db.queryengine.common.DataNodeEndPoints.isSameNode; import static org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.MergeSortComparator.getComparatorForTable; import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.constructAlignedPath; import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.AccumulatorFactory.createAccumulator; import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.AccumulatorFactory.createGroupedAccumulator; -import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.AccumulatorFactory.isTimeColumn; import static org.apache.iotdb.db.queryengine.plan.analyze.PredicateUtils.convertPredicateToFilter; import static org.apache.iotdb.db.queryengine.plan.planner.OperatorTreeGenerator.ASC_TIME_COMPARATOR; import static org.apache.iotdb.db.queryengine.plan.planner.OperatorTreeGenerator.IDENTITY_FILL; @@ -190,6 +190,7 @@ import static org.apache.iotdb.db.queryengine.plan.planner.OperatorTreeGenerator import static org.apache.iotdb.db.queryengine.plan.planner.OperatorTreeGenerator.getPreviousFill; import static org.apache.iotdb.db.queryengine.plan.relational.metadata.TableBuiltinAggregationFunction.getAggregationTypeByFuncName; import static org.apache.iotdb.db.queryengine.plan.relational.planner.SortOrder.ASC_NULLS_LAST; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.ir.GlobalTimePredicateExtractVisitor.isTimeColumn; import static org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager.getTSDataType; import static org.apache.iotdb.db.utils.constant.SqlConstant.AVG; import static org.apache.iotdb.db.utils.constant.SqlConstant.COUNT; @@ -309,6 +310,8 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution Map<Symbol, ColumnSchema> columnSchemaMap = node.getAssignments(); Map<Symbol, Integer> idAndAttributeColumnsIndexMap = node.getIdAndAttributeIndexMap(); List<String> measurementColumnNames = new ArrayList<>(); + Map<String, Integer> measurementColumnsIndexMap = new HashMap<>(); + String timeColumnName = null; List<IMeasurementSchema> measurementSchemas = new ArrayList<>(); int measurementColumnCount = 0; int idx = 0; @@ -327,14 +330,16 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution case MEASUREMENT: columnsIndexArray[idx++] = measurementColumnCount; measurementColumnCount++; - measurementColumnNames.add(columnName.getName()); + measurementColumnNames.add(schema.getName()); measurementSchemas.add( new MeasurementSchema(schema.getName(), getTSDataType(schema.getType()))); columnSchemas.add(schema); + measurementColumnsIndexMap.put(columnName.getName(), measurementColumnCount - 1); break; case TIME: columnsIndexArray[idx++] = -1; columnSchemas.add(schema); + timeColumnName = columnName.getName(); break; default: throw new IllegalArgumentException( @@ -347,17 +352,20 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution if (!outputSet.contains(entry.getKey()) && entry.getValue().getColumnCategory() == MEASUREMENT) { measurementColumnCount++; - measurementColumnNames.add(entry.getKey().getName()); + measurementColumnNames.add(entry.getValue().getName()); measurementSchemas.add( new MeasurementSchema( entry.getValue().getName(), getTSDataType(entry.getValue().getType()))); + measurementColumnsIndexMap.put(entry.getKey().getName(), measurementColumnCount - 1); + } else if (entry.getValue().getColumnCategory() == TIME) { + timeColumnName = entry.getKey().getName(); } } SeriesScanOptions.Builder scanOptionsBuilder = - node.getTimePredicate() - .map(timePredicate -> getSeriesScanOptionsBuilder(context, timePredicate)) - .orElse(new SeriesScanOptions.Builder()); + node.getTimePredicate().isPresent() + ? getSeriesScanOptionsBuilder(context, node.getTimePredicate().get()) + : new SeriesScanOptions.Builder(); scanOptionsBuilder.withPushDownLimit(node.getPushDownLimit()); scanOptionsBuilder.withPushDownOffset(node.getPushDownOffset()); scanOptionsBuilder.withPushLimitToEachDevice(node.isPushLimitToEachDevice()); @@ -366,7 +374,8 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution Expression pushDownPredicate = node.getPushDownPredicate(); if (pushDownPredicate != null) { scanOptionsBuilder.withPushDownFilter( - convertPredicateToFilter(pushDownPredicate, measurementColumnNames, columnSchemaMap)); + convertPredicateToFilter( + pushDownPredicate, measurementColumnsIndexMap, columnSchemaMap, timeColumnName)); } OperatorContext operatorContext = @@ -1364,17 +1373,20 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution aggregationMap.get(symbol), node.getStep(), typeProvider, - true))); + true, + null))); return new AggregationOperator(context, child, aggregatorBuilder.build()); } + // timeColumnName will only be set for AggTableScan. private TableAggregator buildAggregator( Map<Symbol, Integer> childLayout, Symbol symbol, AggregationNode.Aggregation aggregation, AggregationNode.Step step, TypeProvider typeProvider, - boolean scanAscending) { + boolean scanAscending, + String timeColumnName) { List<Integer> argumentChannels = new ArrayList<>(); for (Expression argument : aggregation.getArguments()) { Symbol argumentSymbol = Symbol.from(argument); @@ -1393,7 +1405,8 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution originalArgumentTypes, aggregation.getArguments(), Collections.emptyMap(), - scanAscending); + scanAscending, + timeColumnName); return new TableAggregator( accumulator, @@ -1424,7 +1437,8 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution .forEach( (k, v) -> aggregatorBuilder.add( - buildAggregator(childLayout, k, v, node.getStep(), typeProvider, true))); + buildAggregator( + childLayout, k, v, node.getStep(), typeProvider, true, null))); return new StreamingAggregationOperator( operatorContext, @@ -1563,9 +1577,6 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution List<TableAggregator> aggregators = new ArrayList<>(node.getAggregations().size()); Map<Symbol, Integer> columnLayout = new HashMap<>(node.getAggregations().size()); - boolean[] ret = checkStatisticAndScanOrder(node); - boolean canUseStatistic = ret[0]; - boolean scanAscending = ret[1]; int distinctArgumentCount = node.getAssignments().size(); int aggregationsCount = node.getAggregations().size(); List<Integer> aggColumnIndexes = new ArrayList<>(); @@ -1577,6 +1588,8 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution List<ColumnSchema> columnSchemas = new ArrayList<>(aggregationsCount); int[] columnsIndexArray = new int[distinctArgumentCount]; List<String> measurementColumnNames = new ArrayList<>(); + Map<String, Integer> measurementColumnsIndexMap = new HashMap<>(); + String timeColumnName = null; List<IMeasurementSchema> measurementSchemas = new ArrayList<>(); for (Map.Entry<Symbol, AggregationNode.Aggregation> entry : node.getAggregations().entrySet()) { @@ -1599,16 +1612,18 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution if (!columnLayout.containsKey(symbol)) { columnsIndexArray[channel] = measurementColumnCount; measurementColumnCount++; - measurementColumnNames.add(symbol.getName()); + measurementColumnNames.add(schema.getName()); measurementSchemas.add( new MeasurementSchema(schema.getName(), getTSDataType(schema.getType()))); columnSchemas.add(schema); + measurementColumnsIndexMap.put(symbol.getName(), measurementColumnCount - 1); } break; case TIME: if (!columnLayout.containsKey(symbol)) { columnsIndexArray[channel] = -1; columnSchemas.add(schema); + timeColumnName = symbol.getName(); } break; default: @@ -1623,29 +1638,38 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution aggColumnIndexes.add(columnLayout.get(symbol)); } } - - aggregators.add( - buildAggregator( - columnLayout, - entry.getKey(), - entry.getValue(), - node.getStep(), - context.getTypeProvider(), - scanAscending)); } - // TODO if this needed? for (Map.Entry<Symbol, ColumnSchema> entry : node.getAssignments().entrySet()) { if (!columnLayout.containsKey(entry.getKey()) && entry.getValue().getColumnCategory() == MEASUREMENT) { measurementColumnCount++; - measurementColumnNames.add(entry.getKey().getName()); + measurementColumnNames.add(entry.getValue().getName()); measurementSchemas.add( new MeasurementSchema( entry.getValue().getName(), getTSDataType(entry.getValue().getType()))); + measurementColumnsIndexMap.put(entry.getKey().getName(), measurementColumnCount - 1); + } else if (entry.getValue().getColumnCategory() == TIME) { + timeColumnName = entry.getKey().getName(); } } + boolean[] ret = checkStatisticAndScanOrder(node, timeColumnName); + boolean canUseStatistic = ret[0]; + boolean scanAscending = ret[1]; + + for (Map.Entry<Symbol, AggregationNode.Aggregation> entry : node.getAggregations().entrySet()) { + aggregators.add( + buildAggregator( + columnLayout, + entry.getKey(), + entry.getValue(), + node.getStep(), + context.getTypeProvider(), + scanAscending, + timeColumnName)); + } + ITableTimeRangeIterator timeRangeIterator = null; List<ColumnSchema> groupingKeySchemas = null; int[] groupingKeyIndex = null; @@ -1699,9 +1723,9 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution node.getPlanNodeId(), AggregationTableScanNode.class.getSimpleName()); SeriesScanOptions.Builder scanOptionsBuilder = - node.getTimePredicate() - .map(timePredicate -> getSeriesScanOptionsBuilder(context, timePredicate)) - .orElse(new SeriesScanOptions.Builder()); + node.getTimePredicate().isPresent() + ? getSeriesScanOptionsBuilder(context, node.getTimePredicate().get()) + : new SeriesScanOptions.Builder(); scanOptionsBuilder.withPushDownLimit(node.getPushDownLimit()); scanOptionsBuilder.withPushDownOffset(node.getPushDownOffset()); scanOptionsBuilder.withPushLimitToEachDevice(node.isPushLimitToEachDevice()); @@ -1709,7 +1733,8 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution Expression pushDownPredicate = node.getPushDownPredicate(); if (pushDownPredicate != null) { scanOptionsBuilder.withPushDownFilter( - convertPredicateToFilter(pushDownPredicate, measurementColumnNames, columnSchemaMap)); + convertPredicateToFilter( + pushDownPredicate, measurementColumnsIndexMap, columnSchemaMap, timeColumnName)); } Set<String> allSensors = new HashSet<>(measurementColumnNames); @@ -1755,7 +1780,8 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution return aggTableScanOperator; } - private boolean[] checkStatisticAndScanOrder(AggregationTableScanNode node) { + private boolean[] checkStatisticAndScanOrder( + AggregationTableScanNode node, String timeColumnName) { boolean canUseStatistic = true; int ascendingCount = 0, descendingCount = 0; @@ -1797,8 +1823,8 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution // only last_by(time, x) or last_by(x,time) can use statistic if ((LAST_BY_AGGREGATION.equals(funcName) || FIRST_BY_AGGREGATION.equals(funcName)) - && !isTimeColumn(aggregation.getArguments().get(0)) - && !isTimeColumn(aggregation.getArguments().get(1))) { + && !isTimeColumn(aggregation.getArguments().get(0), timeColumnName) + && !isTimeColumn(aggregation.getArguments().get(1), timeColumnName)) { canUseStatistic = false; } break; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/predicate/ConvertPredicateToFilterVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/predicate/ConvertPredicateToFilterVisitor.java index f1384a1997a..5879ac1ff0d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/predicate/ConvertPredicateToFilterVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/predicate/ConvertPredicateToFilterVisitor.java @@ -55,7 +55,8 @@ import org.apache.tsfile.read.filter.factory.FilterFactory; import org.apache.tsfile.read.filter.factory.ValueFilterApi; import org.apache.tsfile.utils.Binary; -import java.util.HashMap; +import javax.annotation.Nullable; + import java.util.HashSet; import java.util.List; import java.util.Map; @@ -64,23 +65,28 @@ import java.util.Set; import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkArgument; -import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.AccumulatorFactory.isTimeColumn; import static org.apache.iotdb.db.queryengine.plan.expression.unary.LikeExpression.getEscapeCharacter; import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.predicate.ConvertPredicateToTimeFilterVisitor.getLongValue; import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.predicate.PredicatePushIntoScanChecker.isLiteral; import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.predicate.PredicatePushIntoScanChecker.isSymbolReference; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.ir.GlobalTimePredicateExtractVisitor.isTimeColumn; public class ConvertPredicateToFilterVisitor extends PredicateVisitor<Filter, ConvertPredicateToFilterVisitor.Context> { - private static final ConvertPredicateToTimeFilterVisitor TIME_FILTER_VISITOR = - new ConvertPredicateToTimeFilterVisitor(); + @Nullable private final String timeColumnName; + private final ConvertPredicateToTimeFilterVisitor timeFilterVisitor; + + public ConvertPredicateToFilterVisitor(@Nullable String timeColumnName) { + this.timeColumnName = timeColumnName; + this.timeFilterVisitor = new ConvertPredicateToTimeFilterVisitor(); + } @Override protected Filter visitInPredicate(InPredicate node, Context context) { Expression operand = node.getValue(); - if (isTimeColumn(operand)) { - return TIME_FILTER_VISITOR.process(node, null); + if (isTimeColumn(operand, timeColumnName)) { + return timeFilterVisitor.process(node, null); } checkArgument(isSymbolReference(operand)); @@ -245,8 +251,9 @@ public class ConvertPredicateToFilterVisitor @Override protected Filter visitComparisonExpression(ComparisonExpression node, Context context) { - if (isTimeColumn(node.getLeft()) || isTimeColumn(node.getRight())) { - return TIME_FILTER_VISITOR.process(node, null); + if (isTimeColumn(node.getLeft(), timeColumnName) + || isTimeColumn(node.getRight(), timeColumnName)) { + return timeFilterVisitor.process(node, null); } Expression left = node.getLeft(); @@ -294,10 +301,10 @@ public class ConvertPredicateToFilterVisitor Expression secondExpression = node.getMin(); Expression thirdExpression = node.getMax(); - if (isTimeColumn(firstExpression) - || isTimeColumn(secondExpression) - || isTimeColumn(thirdExpression)) { - return TIME_FILTER_VISITOR.process(node, null); + if (isTimeColumn(firstExpression, timeColumnName) + || isTimeColumn(secondExpression, timeColumnName) + || isTimeColumn(thirdExpression, timeColumnName)) { + return timeFilterVisitor.process(node, null); } if (isSymbolReference(firstExpression) @@ -384,11 +391,9 @@ public class ConvertPredicateToFilterVisitor private final Map<String, Integer> measuremrntsMap; private final Map<Symbol, ColumnSchema> schemaMap; - public Context(List<String> allMeasurements, Map<Symbol, ColumnSchema> schemaMap) { - this.measuremrntsMap = new HashMap<>(allMeasurements.size()); - for (int i = 0, size = allMeasurements.size(); i < size; i++) { - measuremrntsMap.put(allMeasurements.get(i), i); - } + public Context( + Map<String, Integer> measurementColumnsIndexMap, Map<Symbol, ColumnSchema> schemaMap) { + this.measuremrntsMap = measurementColumnsIndexMap; this.schemaMap = schemaMap; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/predicate/ConvertPredicateToTimeFilterVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/predicate/ConvertPredicateToTimeFilterVisitor.java index 2b09c1e01cc..b9ab2109a15 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/predicate/ConvertPredicateToTimeFilterVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/predicate/ConvertPredicateToTimeFilterVisitor.java @@ -34,6 +34,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.NotExpression; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.NullIfExpression; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SearchedCaseExpression; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SimpleCaseExpression; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SymbolReference; import org.apache.tsfile.read.filter.basic.Filter; import org.apache.tsfile.read.filter.factory.FilterFactory; @@ -46,13 +47,12 @@ import java.util.Set; import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkArgument; -import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.AccumulatorFactory.isTimeColumn; +/** The caller must make sure that the Expression only contains valid time predicate */ public class ConvertPredicateToTimeFilterVisitor extends PredicateVisitor<Filter, Void> { @Override protected Filter visitInPredicate(InPredicate node, Void context) { - checkArgument(isTimeColumn(node.getValue())); Expression valueList = node.getValueList(); checkArgument(valueList instanceof InListExpression); List<Expression> values = ((InListExpression) valueList).getValues(); @@ -60,7 +60,7 @@ public class ConvertPredicateToTimeFilterVisitor extends PredicateVisitor<Filter checkArgument(value instanceof LongLiteral); } if (values.size() == 1) { - TimeFilterApi.eq(getLongValue(values.get(0))); + return TimeFilterApi.eq(getLongValue(values.get(0))); } Set<Long> longValues = new HashSet<>(); for (Expression value : values) { @@ -111,28 +111,44 @@ public class ConvertPredicateToTimeFilterVisitor extends PredicateVisitor<Filter long value; if (node.getLeft() instanceof LongLiteral) { value = getLongValue(node.getLeft()); + switch (node.getOperator()) { + case EQUAL: + return TimeFilterApi.eq(value); + case NOT_EQUAL: + return TimeFilterApi.notEq(value); + case GREATER_THAN: + return TimeFilterApi.lt(value); + case GREATER_THAN_OR_EQUAL: + return TimeFilterApi.ltEq(value); + case LESS_THAN: + return TimeFilterApi.gt(value); + case LESS_THAN_OR_EQUAL: + return TimeFilterApi.gtEq(value); + default: + throw new IllegalArgumentException("Unsupported operator: " + node.getOperator()); + } } else if (node.getRight() instanceof LongLiteral) { value = getLongValue(node.getRight()); + switch (node.getOperator()) { + case EQUAL: + return TimeFilterApi.eq(value); + case NOT_EQUAL: + return TimeFilterApi.notEq(value); + case GREATER_THAN: + return TimeFilterApi.gt(value); + case GREATER_THAN_OR_EQUAL: + return TimeFilterApi.gtEq(value); + case LESS_THAN: + return TimeFilterApi.lt(value); + case LESS_THAN_OR_EQUAL: + return TimeFilterApi.ltEq(value); + default: + throw new IllegalArgumentException("Unsupported operator: " + node.getOperator()); + } } else { throw new IllegalStateException( "Either left or right operand of Time ComparisonExpression should be LongLiteral"); } - switch (node.getOperator()) { - case EQUAL: - return TimeFilterApi.eq(value); - case NOT_EQUAL: - return TimeFilterApi.notEq(value); - case GREATER_THAN: - return TimeFilterApi.gt(value); - case GREATER_THAN_OR_EQUAL: - return TimeFilterApi.gtEq(value); - case LESS_THAN: - return TimeFilterApi.lt(value); - case LESS_THAN_OR_EQUAL: - return TimeFilterApi.ltEq(value); - default: - throw new IllegalArgumentException("Unsupported operator: " + node.getOperator()); - } } @Override @@ -161,7 +177,7 @@ public class ConvertPredicateToTimeFilterVisitor extends PredicateVisitor<Filter Expression secondExpression = node.getMin(); Expression thirdExpression = node.getMax(); - if (isTimeColumn(firstExpression)) { + if (firstExpression instanceof SymbolReference) { // firstExpression is TIMESTAMP long minValue = getLongValue(secondExpression); long maxValue = getLongValue(thirdExpression); @@ -170,7 +186,7 @@ public class ConvertPredicateToTimeFilterVisitor extends PredicateVisitor<Filter return TimeFilterApi.eq(minValue); } return TimeFilterApi.between(minValue, maxValue); - } else if (isTimeColumn(secondExpression)) { + } else if (secondExpression instanceof SymbolReference) { // secondExpression is TIMESTAMP long value = getLongValue(firstExpression); long maxValue = getLongValue(thirdExpression); @@ -186,7 +202,7 @@ public class ConvertPredicateToTimeFilterVisitor extends PredicateVisitor<Filter value <= maxValue, String.format("Predicate [%s] should be simplified in previous step", node)); return TimeFilterApi.ltEq(value); - } else if (isTimeColumn(thirdExpression)) { + } else if (thirdExpression instanceof SymbolReference) { // thirdExpression is TIMESTAMP long value = getLongValue(firstExpression); long minValue = getLongValue(secondExpression); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/PlanBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/PlanBuilder.java index 1d6cc02990f..f0ff73e5e18 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/PlanBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/PlanBuilder.java @@ -27,7 +27,6 @@ import com.google.common.collect.ImmutableMap; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.function.BiFunction; import java.util.function.BiPredicate; @@ -56,7 +55,7 @@ public class PlanBuilder { RelationPlan plan, Analysis analysis, Map<ScopeAware<Expression>, Symbol> mappings) { return new PlanBuilder( new TranslationMap( - Optional.empty(), + plan.getOuterContext(), plan.getScope(), analysis, plan.getFieldMappings(), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java index cfc19a3e7da..3b427458160 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java @@ -153,7 +153,10 @@ public class QueryPlanner { builder = builder.appendProjections(outputs, symbolAllocator, queryContext); return new RelationPlan( - builder.getRoot(), analysis.getScope(query), computeOutputs(builder, outputs)); + builder.getRoot(), + analysis.getScope(query), + computeOutputs(builder, outputs), + outerContext); } public RelationPlan plan(QuerySpecification node) { @@ -261,7 +264,7 @@ public class QueryPlanner { builder = builder.appendProjections(outputs, symbolAllocator, queryContext); return new RelationPlan( - builder.getRoot(), analysis.getScope(node), computeOutputs(builder, outputs)); + builder.getRoot(), analysis.getScope(node), computeOutputs(builder, outputs), outerContext); } private static boolean hasExpressionsToUnfold(List<Analysis.SelectExpression> selectExpressions) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlan.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlan.java index 32e33e8b9d6..8c82a826039 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlan.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlan.java @@ -20,6 +20,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Scope; import com.google.common.collect.ImmutableList; import java.util.List; +import java.util.Optional; import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; @@ -38,7 +39,13 @@ public class RelationPlan { // for each field in the relation, the corresponding symbol from "root" private final List<Symbol> fieldMappings; - public RelationPlan(PlanNode root, Scope scope, List<Symbol> fieldMappings) { + private final Optional<TranslationMap> outerContext; + + public RelationPlan( + PlanNode root, + Scope scope, + List<Symbol> fieldMappings, + Optional<TranslationMap> outerContext) { requireNonNull(root, "root is null"); requireNonNull(fieldMappings, "fieldMappings is null"); requireNonNull(scope, "scope is null"); @@ -53,6 +60,7 @@ public class RelationPlan { this.root = root; this.scope = scope; this.fieldMappings = ImmutableList.copyOf(fieldMappings); + this.outerContext = outerContext; } public Symbol getSymbol(int fieldIndex) { @@ -78,4 +86,8 @@ public class RelationPlan { public Scope getScope() { return scope; } + + public Optional<TranslationMap> getOuterContext() { + return outerContext; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java index ed48a7937bc..af8dec2edaa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java @@ -154,7 +154,7 @@ public class RelationPlanner extends AstVisitor<RelationPlan, Void> { // put the pre-planned recursive subquery in the actual outer context to enable resolving // correlation return new RelationPlan( - expansion.getRoot(), expansion.getScope(), expansion.getFieldMappings()); + expansion.getRoot(), expansion.getScope(), expansion.getFieldMappings(), outerContext); } Scope scope = analysis.getScope(table); @@ -199,7 +199,7 @@ public class RelationPlanner extends AstVisitor<RelationPlan, Void> { outputSymbols, tableColumnSchema, idAndAttributeIndexMap); - return new RelationPlan(tableScanNode, scope, outputSymbols); + return new RelationPlan(tableScanNode, scope, outputSymbols, outerContext); // Collection<Field> fields = analysis.getMaterializedViewStorageTableFields(node); // Query namedQuery = analysis.getNamedQuery(node); @@ -223,8 +223,8 @@ public class RelationPlanner extends AstVisitor<RelationPlan, Void> { @Override protected RelationPlan visitTableSubquery(TableSubquery node, Void context) { RelationPlan plan = process(node.getQuery(), context); - // TODO transmit outerContext - return new RelationPlan(plan.getRoot(), analysis.getScope(node), plan.getFieldMappings()); + return new RelationPlan( + plan.getRoot(), analysis.getScope(node), plan.getFieldMappings(), outerContext); } @Override @@ -365,8 +365,8 @@ public class RelationPlanner extends AstVisitor<RelationPlan, Void> { return new RelationPlan( new ProjectNode(queryContext.getQueryId().genPlanNodeId(), join, assignments.build()), analysis.getScope(node), - outputs.build()); - // outerContext); + outputs.build(), + outerContext); } public RelationPlan planJoin( @@ -567,7 +567,7 @@ public class RelationPlanner extends AstVisitor<RelationPlan, Void> { } } - return new RelationPlan(root, scope, outputSymbols); + return new RelationPlan(root, scope, outputSymbols, outerContext); } public static JoinNode.JoinType mapJoinType(Join.Type joinType) { @@ -611,7 +611,7 @@ public class RelationPlanner extends AstVisitor<RelationPlan, Void> { mappings = newMappings.build(); } - return new RelationPlan(root, analysis.getScope(node), mappings); + return new RelationPlan(root, analysis.getScope(node), mappings, outerContext); } // ================================ Implemented later ===================================== @@ -658,14 +658,16 @@ public class RelationPlanner extends AstVisitor<RelationPlan, Void> { insertTabletStatement.getRowCount(), insertTabletStatement.getColumnCategories()); insertNode.setFailedMeasurementNumber(insertTabletStatement.getFailedMeasurementNumber()); - return new RelationPlan(insertNode, analysis.getRootScope(), Collections.emptyList()); + return new RelationPlan( + insertNode, analysis.getRootScope(), Collections.emptyList(), outerContext); } @Override protected RelationPlan visitInsertRow(InsertRow node, Void context) { InsertRowStatement insertRowStatement = node.getInnerTreeStatement(); RelationalInsertRowNode insertNode = fromInsertRowStatement(insertRowStatement); - return new RelationPlan(insertNode, analysis.getRootScope(), Collections.emptyList()); + return new RelationPlan( + insertNode, analysis.getRootScope(), Collections.emptyList(), outerContext); } protected RelationalInsertRowNode fromInsertRowStatement(InsertRowStatement insertRowStatement) { @@ -698,7 +700,7 @@ public class RelationPlanner extends AstVisitor<RelationPlan, Void> { RelationalInsertRowsNode relationalInsertRowsNode = new RelationalInsertRowsNode(idAllocator.genPlanNodeId(), indices, insertRowStatements); return new RelationPlan( - relationalInsertRowsNode, analysis.getRootScope(), Collections.emptyList()); + relationalInsertRowsNode, analysis.getRootScope(), Collections.emptyList(), outerContext); } @Override @@ -711,7 +713,8 @@ public class RelationPlanner extends AstVisitor<RelationPlan, Void> { new LoadTsFileNode( idAllocator.genPlanNodeId(), node.getResources(), isTableModel, node.getDatabase()), analysis.getRootScope(), - Collections.emptyList()); + Collections.emptyList(), + outerContext); } @Override @@ -724,7 +727,8 @@ public class RelationPlanner extends AstVisitor<RelationPlan, Void> { return new RelationPlan( new PipeEnrichedInsertNode((InsertNode) relationPlan.getRoot()), analysis.getRootScope(), - Collections.emptyList()); + Collections.emptyList(), + outerContext); } throw new IllegalStateException("Other WritePlanNode is not supported in current version."); } @@ -734,6 +738,7 @@ public class RelationPlanner extends AstVisitor<RelationPlan, Void> { return new RelationPlan( new RelationalDeleteDataNode(idAllocator.genPlanNodeId(), node), analysis.getRootScope(), - Collections.emptyList()); + Collections.emptyList(), + outerContext); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/ir/GlobalTimePredicateExtractVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/ir/GlobalTimePredicateExtractVisitor.java index 2c188cf12c0..f168eb50d83 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/ir/GlobalTimePredicateExtractVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/ir/GlobalTimePredicateExtractVisitor.java @@ -45,7 +45,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import static org.apache.iotdb.commons.conf.IoTDBConstant.TIME; import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.BooleanLiteral.TRUE_LITERAL; import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LogicalExpression.Operator.AND; import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LogicalExpression.Operator.OR; @@ -63,9 +62,11 @@ public class GlobalTimePredicateExtractVisitor * @return Pair, left is globalTimePredicate, right is if hasValueFilter. * @throws UnknownExpressionTypeException unknown expression type */ - public static Pair<Expression, Boolean> extractGlobalTimeFilter(Expression predicate) { + public static Pair<Expression, Boolean> extractGlobalTimeFilter( + Expression predicate, String timeColumnName) { return new GlobalTimePredicateExtractVisitor() - .process(predicate, new GlobalTimePredicateExtractVisitor.Context(true, true)); + .process( + predicate, new GlobalTimePredicateExtractVisitor.Context(true, true, timeColumnName)); } protected Pair<Expression, Boolean> visitExpression( @@ -106,7 +107,7 @@ public class GlobalTimePredicateExtractVisitor List<Pair<Expression, Boolean>> resultPairs = new ArrayList<>(); for (Expression term : node.getTerms()) { - resultPairs.add(process(term, new Context(false, false))); + resultPairs.add(process(term, new Context(false, false, context.timeColumnName))); } List<Expression> newTimeFilterTerms = new ArrayList<>(); @@ -159,8 +160,8 @@ public class GlobalTimePredicateExtractVisitor ComparisonExpression node, Context context) { Expression leftExpression = node.getLeft(); Expression rightExpression = node.getRight(); - if (checkIsTimeFilter(leftExpression, rightExpression) - || checkIsTimeFilter(rightExpression, leftExpression)) { + if (checkIsTimeFilter(leftExpression, context.timeColumnName, rightExpression) + || checkIsTimeFilter(rightExpression, context.timeColumnName, leftExpression)) { return new Pair<>(node, false); } return new Pair<>(null, true); @@ -196,7 +197,7 @@ public class GlobalTimePredicateExtractVisitor Expression thirdExpression = node.getMax(); boolean isTimeFilter = false; - if (isTimeIdentifier(firstExpression)) { + if (isTimeColumn(firstExpression, context.timeColumnName)) { isTimeFilter = checkBetweenConstantSatisfy(secondExpression, thirdExpression); } // TODO After Constant-Folding introduced @@ -213,7 +214,7 @@ public class GlobalTimePredicateExtractVisitor @Override protected Pair<Expression, Boolean> visitInPredicate(InPredicate node, Context context) { - if (isTimeIdentifier(node.getValue())) { + if (isTimeColumn(node.getValue(), context.timeColumnName)) { return new Pair<>(node, false); } @@ -223,7 +224,9 @@ public class GlobalTimePredicateExtractVisitor @Override protected Pair<Expression, Boolean> visitNotExpression(NotExpression node, Context context) { Pair<Expression, Boolean> result = - process(node.getValue(), new Context(context.canRewrite, context.isFirstOr)); + process( + node.getValue(), + new Context(context.canRewrite, context.isFirstOr, context.timeColumnName)); if (result.left != null) { return new Pair<>(new NotExpression(result.left), result.right); } @@ -261,14 +264,14 @@ public class GlobalTimePredicateExtractVisitor throw new IllegalStateException(String.format(NOT_SUPPORTED, node.getClass())); } - private static boolean isTimeIdentifier(Expression e) { - return e instanceof SymbolReference && TIME.equalsIgnoreCase(((SymbolReference) e).getName()); + public static boolean isTimeColumn(Expression e, String timeColumnName) { + return e instanceof SymbolReference + && ((SymbolReference) e).getName().equalsIgnoreCase(timeColumnName); } - private static boolean checkIsTimeFilter(Expression timeExpression, Expression valueExpression) { - return timeExpression instanceof SymbolReference - && TIME.equalsIgnoreCase(((SymbolReference) timeExpression).getName()) - && valueExpression instanceof LongLiteral; + private static boolean checkIsTimeFilter( + Expression timeExpression, String timeColumnName, Expression valueExpression) { + return isTimeColumn(timeExpression, timeColumnName) && valueExpression instanceof LongLiteral; } private static boolean checkBetweenConstantSatisfy(Expression e1, Expression e2) { @@ -280,10 +283,12 @@ public class GlobalTimePredicateExtractVisitor public static class Context { boolean canRewrite; boolean isFirstOr; + String timeColumnName; - public Context(boolean canRewrite, boolean isFirstOr) { + public Context(boolean canRewrite, boolean isFirstOr, String timeColumnName) { this.canRewrite = canRewrite; this.isFirstOr = isFirstOr; + this.timeColumnName = timeColumnName; } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java index 8e9296b7f48..f9d2d5edb1a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java @@ -62,10 +62,11 @@ import com.google.common.collect.ImmutableSet; import org.apache.tsfile.read.filter.basic.Filter; import org.apache.tsfile.utils.Pair; +import javax.annotation.Nullable; + import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -299,68 +300,15 @@ public class PushPredicateIntoTableScan implements PlanOptimizer { @Override public PlanNode visitTableScan(TableScanNode tableScanNode, RewriteContext context) { - // columnSymbols in TableScanNode may be added suffix in Join situation(such as self join), - // in which we need add a new ProjectNode above TableScanNode. - boolean hasSuffixInScanNodeColumns = false; - for (Map.Entry<Symbol, ColumnSchema> entry : tableScanNode.getAssignments().entrySet()) { - Symbol columnSymbol = entry.getKey(); - ColumnSchema columnSchema = entry.getValue(); - if (!columnSymbol.getName().equals(columnSchema.getName())) { - hasSuffixInScanNodeColumns = true; - break; - } - } - - Map<Symbol, Expression> newProjectAssignments = null; - if (hasSuffixInScanNodeColumns) { - newProjectAssignments = getProjectAssignments(tableScanNode, context); - } // no predicate, just scan all matched deviceEntries if (TRUE_LITERAL.equals(context.inheritedPredicate)) { - getDeviceEntriesWithDataPartitions(tableScanNode, Collections.emptyList()); - return hasSuffixInScanNodeColumns - ? new ProjectNode( - queryId.genPlanNodeId(), tableScanNode, new Assignments(newProjectAssignments)) - : tableScanNode; + getDeviceEntriesWithDataPartitions(tableScanNode, Collections.emptyList(), null); + return tableScanNode; } // has predicate, deal with split predicate - PlanNode result = combineFilterAndScan(tableScanNode, context.inheritedPredicate); - return hasSuffixInScanNodeColumns - ? new ProjectNode(queryId.genPlanNodeId(), result, new Assignments(newProjectAssignments)) - : result; - } - - private Map<Symbol, Expression> getProjectAssignments( - TableScanNode tableScanNode, RewriteContext context) { - context.inheritedPredicate = - ReplaceSymbolInExpression.transform( - context.inheritedPredicate, tableScanNode.getAssignments()); - - int size = tableScanNode.getOutputSymbols().size(); - Map<Symbol, Expression> projectAssignments = new LinkedHashMap<>(size); - List<Symbol> newTableScanSymbols = new ArrayList<>(size); - Map<Symbol, ColumnSchema> newTableScanAssignments = new LinkedHashMap<>(size); - for (Symbol originalSymbol : tableScanNode.getOutputSymbols()) { - ColumnSchema columnSchema = tableScanNode.getAssignments().get(originalSymbol); - - Symbol realSymbol = Symbol.of(columnSchema.getName()); - newTableScanSymbols.add(realSymbol); - newTableScanAssignments.put(realSymbol, columnSchema); - projectAssignments.put(originalSymbol, new SymbolReference(columnSchema.getName())); - queryContext.getTypeProvider().putTableModelType(originalSymbol, columnSchema.getType()); - Map<Symbol, Integer> idAndAttributeIndexMap = tableScanNode.getIdAndAttributeIndexMap(); - if (idAndAttributeIndexMap.containsKey(originalSymbol)) { - Integer idx = idAndAttributeIndexMap.get(originalSymbol); - idAndAttributeIndexMap.remove(originalSymbol); - idAndAttributeIndexMap.put(realSymbol, idx); - } - } - - tableScanNode.setOutputSymbols(newTableScanSymbols); - tableScanNode.setAssignments(newTableScanAssignments); - return projectAssignments; + return combineFilterAndScan(tableScanNode, context.inheritedPredicate); } public PlanNode combineFilterAndScan(TableScanNode tableScanNode, Expression predicate) { @@ -375,7 +323,8 @@ public class PushPredicateIntoTableScan implements PlanOptimizer { : new LogicalExpression(LogicalExpression.Operator.AND, expressions); // extract global time filter and set it to TableScanNode - Pair<Expression, Boolean> resultPair = extractGlobalTimeFilter(pushDownPredicate); + Pair<Expression, Boolean> resultPair = + extractGlobalTimeFilter(pushDownPredicate, splitExpression.getTimeColumnName()); if (resultPair.left != null) { tableScanNode.setTimePredicate(resultPair.left); } @@ -393,7 +342,10 @@ public class PushPredicateIntoTableScan implements PlanOptimizer { } // do index scan after expressionCanPushDown is processed - getDeviceEntriesWithDataPartitions(tableScanNode, splitExpression.getMetadataExpressions()); + getDeviceEntriesWithDataPartitions( + tableScanNode, + splitExpression.getMetadataExpressions(), + splitExpression.getTimeColumnName()); // exist expressions can not push down to scan operator if (!splitExpression.getExpressionsCannotPushDown().isEmpty()) { @@ -412,11 +364,14 @@ public class PushPredicateIntoTableScan implements PlanOptimizer { private SplitExpression splitPredicate(TableScanNode node, Expression predicate) { Set<String> idOrAttributeColumnNames = new HashSet<>(node.getAssignments().size()); Set<String> measurementColumnNames = new HashSet<>(node.getAssignments().size()); + String timeColumnName = null; for (Map.Entry<Symbol, ColumnSchema> entry : node.getAssignments().entrySet()) { Symbol columnSymbol = entry.getKey(); ColumnSchema columnSchema = entry.getValue(); - if (MEASUREMENT.equals(columnSchema.getColumnCategory()) - || TIME.equals(columnSchema.getColumnCategory())) { + if (TIME.equals(columnSchema.getColumnCategory())) { + measurementColumnNames.add(columnSymbol.getName()); + timeColumnName = columnSymbol.getName(); + } else if (MEASUREMENT.equals(columnSchema.getColumnCategory())) { measurementColumnNames.add(columnSymbol.getName()); } else { idOrAttributeColumnNames.add(columnSymbol.getName()); @@ -442,7 +397,7 @@ public class PushPredicateIntoTableScan implements PlanOptimizer { } return new SplitExpression( - metadataExpressions, expressionsCanPushDown, expressionsCannotPushDown); + metadataExpressions, expressionsCanPushDown, expressionsCannotPushDown, timeColumnName); } if (PredicatePushIntoMetadataChecker.check(idOrAttributeColumnNames, predicate)) { @@ -454,11 +409,11 @@ public class PushPredicateIntoTableScan implements PlanOptimizer { } return new SplitExpression( - metadataExpressions, expressionsCanPushDown, expressionsCannotPushDown); + metadataExpressions, expressionsCanPushDown, expressionsCannotPushDown, timeColumnName); } private void getDeviceEntriesWithDataPartitions( - TableScanNode tableScanNode, List<Expression> metadataExpressions) { + TableScanNode tableScanNode, List<Expression> metadataExpressions, String timeColumnName) { List<String> attributeColumns = new ArrayList<>(); int attributeIndex = 0; @@ -475,7 +430,12 @@ public class PushPredicateIntoTableScan implements PlanOptimizer { List<DeviceEntry> deviceEntries = metadata.indexScan( tableScanNode.getQualifiedObjectName(), - metadataExpressions, + metadataExpressions.stream() + .map( + expression -> + ReplaceSymbolInExpression.transform( + expression, tableScanNode.getAssignments())) + .collect(Collectors.toList()), attributeColumns, queryContext); tableScanNode.setDeviceEntries(deviceEntries); @@ -808,15 +768,19 @@ public class PushPredicateIntoTableScan implements PlanOptimizer { // expressions can not push down into TableScan, such as `s_1 is null` List<Expression> expressionsCannotPushDown; + @Nullable String timeColumnName; + public SplitExpression( List<Expression> metadataExpressions, List<Expression> expressionsCanPushDown, - List<Expression> expressionsCannotPushDown) { + List<Expression> expressionsCannotPushDown, + @Nullable String timeColumnName) { this.metadataExpressions = requireNonNull(metadataExpressions, "metadataExpressions is null"); this.expressionsCanPushDown = requireNonNull(expressionsCanPushDown, "expressionsCanPushDown is null"); this.expressionsCannotPushDown = requireNonNull(expressionsCannotPushDown, "expressionsCannotPushDown is null"); + this.timeColumnName = timeColumnName; } public List<Expression> getMetadataExpressions() { @@ -830,5 +794,10 @@ public class PushPredicateIntoTableScan implements PlanOptimizer { public List<Expression> getExpressionsCannotPushDown() { return this.expressionsCannotPushDown; } + + @Nullable + public String getTimeColumnName() { + return timeColumnName; + } } }
