This is an automated email from the ASF dual-hosted git repository. lancelly pushed a commit to branch support_exists_and_correlate in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit fb6c5e51a63add52421457cff4843d83264f7c27 Author: lancelly <[email protected]> AuthorDate: Fri Jan 10 16:42:24 2025 +0800 add AssignUniqueId --- .../operator/process/AssignUniqueIdOperator.java | 142 +++++ .../plan/planner/TableOperatorGenerator.java | 15 + .../plan/planner/plan/node/PlanGraphPrinter.java | 10 + .../plan/planner/plan/node/PlanNodeType.java | 1 + .../plan/planner/plan/node/PlanVisitor.java | 5 + .../iterative/rule/PruneAssignUniqueIdColumns.java | 47 ++ .../rule/TransformExistsApplyToCorrelatedJoin.java | 215 +++++++ .../relational/planner/node/AssignUniqueId.java | 97 +++ .../plan/relational/planner/node/Patterns.java | 8 +- .../optimizations/LogicalOptimizeFactory.java | 2 + .../optimizations/PlanNodeDecorrelator.java | 699 +++++++++++++++++++++ .../optimizations/PushPredicateIntoTableScan.java | 15 + ...mQuantifiedComparisonApplyToCorrelatedJoin.java | 27 +- .../optimizations/UnaliasSymbolReferences.java | 13 + .../relational/planner/optimizations/Util.java | 19 + .../planner/assertions/AssignUniqueIdMatcher.java | 49 ++ .../planner/assertions/PlanMatchPattern.java | 6 + 17 files changed, 1344 insertions(+), 26 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AssignUniqueIdOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AssignUniqueIdOperator.java new file mode 100644 index 00000000000..97a64293805 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AssignUniqueIdOperator.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process; + +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; +import org.apache.iotdb.db.queryengine.execution.operator.Operator; +import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; +import org.apache.iotdb.db.queryengine.execution.schedule.task.DriverTaskId; + +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.common.conf.TSFileDescriptor; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.block.column.LongColumnBuilder; +import org.apache.tsfile.utils.RamUsageEstimator; + +import java.util.concurrent.atomic.AtomicLong; + +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Verify.verify; + +public class AssignUniqueIdOperator implements ProcessOperator { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(AssignUniqueIdOperator.class); + + private static final long ROW_IDS_PER_REQUEST = 1L << 20L; + private static final long MAX_ROW_ID = 1L << 40L; + + private final OperatorContext operatorContext; + private final Operator child; + + private final AtomicLong rowIdPool = new AtomicLong(); + private final long uniqueValueMask; + + private long rowIdCounter; + private long maxRowIdCounterValue; + + public AssignUniqueIdOperator(OperatorContext operatorContext, Operator child) { + this.operatorContext = operatorContext; + this.child = child; + + DriverTaskId id = operatorContext.getDriverContext().getDriverTaskID(); + this.uniqueValueMask = + (((long) id.getFragmentId().getId()) << 54) | (((long) id.getPipelineId()) << 40); + requestValues(); + } + + @Override + public ListenableFuture<?> isBlocked() { + return child.isBlocked(); + } + + @Override + public TsBlock next() throws Exception { + TsBlock tsBlock = child.next(); + if (tsBlock == null || tsBlock.isEmpty()) { + return null; + } + return tsBlock.appendValueColumns(new Column[] {generateIdColumn(tsBlock.getPositionCount())}); + } + + @Override + public boolean hasNext() throws Exception { + return child.hasNext(); + } + + @Override + public void close() throws Exception { + if (child != null) { + child.close(); + } + } + + @Override + public boolean isFinished() throws Exception { + return child.isFinished(); + } + + @Override + public long calculateMaxPeekMemory() { + return child.calculateMaxPeekMemory(); + } + + @Override + public long calculateMaxReturnSize() { + return child.calculateMaxReturnSize() + + (long) Long.SIZE * TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber(); + } + + @Override + public long calculateRetainedSizeAfterCallingNext() { + return child.calculateRetainedSizeAfterCallingNext(); + } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(child) + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext); + } + + @Override + public OperatorContext getOperatorContext() { + return operatorContext; + } + + private Column generateIdColumn(int positionCount) { + LongColumnBuilder columnBuilder = new LongColumnBuilder(null, positionCount); + for (int currentPosition = 0; currentPosition < positionCount; currentPosition++) { + if (rowIdCounter >= maxRowIdCounterValue) { + requestValues(); + } + long rowId = rowIdCounter++; + verify((rowId & uniqueValueMask) == 0, "RowId and uniqueValue mask overlaps"); + columnBuilder.writeLong(uniqueValueMask | rowId); + } + return columnBuilder.build(); + } + + private void requestValues() { + rowIdCounter = rowIdPool.getAndAdd(ROW_IDS_PER_REQUEST); + maxRowIdCounterValue = Math.min(rowIdCounter + ROW_IDS_PER_REQUEST, MAX_ROW_ID); + checkState(rowIdCounter < MAX_ROW_ID, "Unique row id exceeds a limit: %s", MAX_ROW_ID); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java index 9ca7ddda112..1d99562e5ed 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java @@ -129,6 +129,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTreeDeviceViewScanNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AssignUniqueId; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.EnforceSingleRowNode; @@ -1580,6 +1581,20 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution return new EnforceSingleRowOperator(operatorContext, child); } + @Override + public Operator visitAssignUniqueId(AssignUniqueId node, LocalExecutionPlanContext context) { + Operator child = node.getChild().accept(this, context); + OperatorContext operatorContext = + context + .getDriverContext() + .addOperatorContext( + context.getNextOperatorId(), + node.getPlanNodeId(), + EnforceSingleRowOperator.class.getSimpleName()); + + return new EnforceSingleRowOperator(operatorContext, child); + } + @Override public Operator visitCountMerge( final CountSchemaMergeNode node, final LocalExecutionPlanContext context) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java index 442d59f8be3..c61ab511168 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java @@ -66,6 +66,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.CrossSeriesAg import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.DeviceViewIntoPathDescriptor; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.IntoPathDescriptor; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTreeDeviceViewScanNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AssignUniqueId; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.EnforceSingleRowNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExchangeNode; @@ -947,6 +948,15 @@ public class PlanGraphPrinter extends PlanVisitor<List<String>, PlanGraphPrinter return render(node, boxValue, context); } + @Override + public List<String> visitAssignUniqueId(AssignUniqueId node, GraphContext context) { + List<String> boxValue = new ArrayList<>(); + boxValue.add(String.format("AssignUniqueId-%s", node.getPlanNodeId().getId())); + boxValue.add(String.format("OutputSymbols: %s", node.getOutputSymbols())); + boxValue.add(String.format("IdColumnSymbol: %s", node.getIdColumn())); + return render(node, boxValue, context); + } + private String printRegion(TRegionReplicaSet regionReplicaSet) { return String.format( "Partition: %s", diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java index 3e86d189e1e..0dd5788970f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java @@ -290,6 +290,7 @@ public enum PlanNodeType { TREE_ALIGNED_DEVICE_VIEW_SCAN_NODE((short) 1023), TREE_NONALIGNED_DEVICE_VIEW_SCAN_NODE((short) 1024), TABLE_SEMI_JOIN_NODE((short) 1025), + TABLE_ASSIGN_UNIQUE_ID((short) 1026), RELATIONAL_INSERT_TABLET((short) 2000), RELATIONAL_INSERT_ROW((short) 2001), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java index d04c57e77fb..bf0099d17fd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java @@ -654,6 +654,11 @@ public abstract class PlanVisitor<R, C> { return visitTwoChildProcess(node, context); } + public R visitAssignUniqueId( + org.apache.iotdb.db.queryengine.plan.relational.planner.node.AssignUniqueId node, C context) { + return visitSingleChildProcess(node, context); + } + public R visitEnforceSingleRow( org.apache.iotdb.db.queryengine.plan.relational.planner.node.EnforceSingleRowNode node, C context) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneAssignUniqueIdColumns.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneAssignUniqueIdColumns.java new file mode 100644 index 00000000000..a5c675b5fc8 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneAssignUniqueIdColumns.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule; + +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AssignUniqueId; + +import java.util.Optional; +import java.util.Set; + +import static org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.Util.restrictChildOutputs; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.assignUniqueId; + +public class PruneAssignUniqueIdColumns extends ProjectOffPushDownRule<AssignUniqueId> { + public PruneAssignUniqueIdColumns() { + super(assignUniqueId()); + } + + @Override + protected Optional<PlanNode> pushDownProjectOff( + Context context, AssignUniqueId assignUniqueId, Set<Symbol> referencedOutputs) { + // remove unused AssignUniqueId node + if (!referencedOutputs.contains(assignUniqueId.getIdColumn())) { + return Optional.of(assignUniqueId.getChild()); + } + + return restrictChildOutputs(context.getIdAllocator(), assignUniqueId, referencedOutputs); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/TransformExistsApplyToCorrelatedJoin.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/TransformExistsApplyToCorrelatedJoin.java new file mode 100644 index 00000000000..86f1ec96a69 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/TransformExistsApplyToCorrelatedJoin.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule; + +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.ResolvedFunction; +import org.apache.iotdb.db.queryengine.plan.relational.planner.Assignments; +import org.apache.iotdb.db.queryengine.plan.relational.planner.PlannerContext; +import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; +import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ApplyNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CorrelatedJoinNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PlanNodeDecorrelator; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.BooleanLiteral; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Cast; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CoalesceExpression; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LongLiteral; +import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Captures; +import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Pattern; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.tsfile.read.common.type.LongType; + +import java.util.Optional; + +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.collect.Iterables.getOnlyElement; +import static java.util.Objects.requireNonNull; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.globalAggregation; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.singleAggregation; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode.JoinType.INNER; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode.JoinType.LEFT; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.applyNode; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.Util.getResolvedBuiltInAggregateFunction; +import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.BooleanLiteral.TRUE_LITERAL; +import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression.Operator.GREATER_THAN; +import static org.apache.iotdb.db.queryengine.plan.relational.type.TypeSignatureTranslator.toSqlType; +import static org.apache.tsfile.read.common.type.BooleanType.BOOLEAN; + +/** + * EXISTS is modeled as (if correlated predicates are equality comparisons): + * + * <pre> + * - Project(exists := COALESCE(subqueryTrue, false)) + * - CorrelatedJoin(LEFT) + * - input + * - Project(subqueryTrue := true) + * - Limit(count=1) + * - subquery + * </pre> + * + * or: + * + * <pre> + * - CorrelatedJoin(LEFT) + * - input + * - Project($0 > 0) + * - Aggregation(COUNT(*)) + * - subquery + * </pre> + * + * otherwise + */ +public class TransformExistsApplyToCorrelatedJoin implements Rule<ApplyNode> { + private static final Pattern<ApplyNode> PATTERN = applyNode(); + + private final PlannerContext plannerContext; + + public TransformExistsApplyToCorrelatedJoin(PlannerContext plannerContext) { + this.plannerContext = requireNonNull(plannerContext, "plannerContext is null"); + } + + @Override + public Pattern<ApplyNode> getPattern() { + return PATTERN; + } + + @Override + public Result apply(ApplyNode parent, Captures captures, Context context) { + if (parent.getSubqueryAssignments().size() != 1) { + return Result.empty(); + } + + ApplyNode.SetExpression expression = getOnlyElement(parent.getSubqueryAssignments().values()); + if (!(expression instanceof ApplyNode.Exists)) { + return Result.empty(); + } + + /* + Empty correlation list indicates that the subquery contains no correlation symbols from the + immediate outer scope. The subquery might be either not correlated at all, or correlated with + symbols from further outer scope. + Currently, the two cases are indistinguishable. + To support the latter case, the ApplyNode with empty correlation list is rewritten to default + aggregation, which is inefficient in the rare case of uncorrelated EXISTS subquery, + but currently allows to successfully decorrelate a correlated EXISTS subquery. + + TODO: remove this condition when exploratory optimizer is implemented or support for decorrelating joins is implemented in PlanNodeDecorrelator + */ + if (parent.getCorrelation().isEmpty()) { + return Result.ofPlanNode(rewriteToDefaultAggregation(parent, context)); + } + + Optional<PlanNode> nonDefaultAggregation = rewriteToNonDefaultAggregation(parent, context); + return nonDefaultAggregation + .map(Result::ofPlanNode) + .orElseGet(() -> Result.ofPlanNode(rewriteToDefaultAggregation(parent, context))); + } + + private Optional<PlanNode> rewriteToNonDefaultAggregation(ApplyNode applyNode, Context context) { + checkState( + applyNode.getSubquery().getOutputSymbols().isEmpty(), + "Expected subquery output symbols to be pruned"); + + Symbol subqueryTrue = context.getSymbolAllocator().newSymbol("subqueryTrue", BOOLEAN); + + PlanNode subquery = + new ProjectNode( + context.getIdAllocator().genPlanNodeId(), + new LimitNode( + context.getIdAllocator().genPlanNodeId(), + applyNode.getSubquery(), + 1L, + Optional.empty()), + Assignments.of(subqueryTrue, TRUE_LITERAL)); + + PlanNodeDecorrelator decorrelator = + new PlanNodeDecorrelator(plannerContext, context.getSymbolAllocator(), context.getLookup()); + if (!decorrelator.decorrelateFilters(subquery, applyNode.getCorrelation()).isPresent()) { + return Optional.empty(); + } + + Symbol exists = getOnlyElement(applyNode.getSubqueryAssignments().keySet()); + Assignments.Builder assignments = + Assignments.builder() + .putIdentities(applyNode.getInput().getOutputSymbols()) + .put( + exists, + new CoalesceExpression( + ImmutableList.of( + subqueryTrue.toSymbolReference(), BooleanLiteral.FALSE_LITERAL))); + + return Optional.of( + new ProjectNode( + context.getIdAllocator().genPlanNodeId(), + new CorrelatedJoinNode( + applyNode.getPlanNodeId(), + applyNode.getInput(), + subquery, + applyNode.getCorrelation(), + LEFT, + TRUE_LITERAL, + applyNode.getOriginSubquery()), + assignments.build())); + } + + private PlanNode rewriteToDefaultAggregation(ApplyNode applyNode, Context context) { + ResolvedFunction countFunction = + getResolvedBuiltInAggregateFunction( + plannerContext.getMetadata(), "count", ImmutableList.of()); + Symbol count = context.getSymbolAllocator().newSymbol("count", LongType.getInstance()); + Symbol exists = getOnlyElement(applyNode.getSubqueryAssignments().keySet()); + + return new CorrelatedJoinNode( + applyNode.getPlanNodeId(), + applyNode.getInput(), + new ProjectNode( + context.getIdAllocator().genPlanNodeId(), + singleAggregation( + context.getIdAllocator().genPlanNodeId(), + applyNode.getSubquery(), + ImmutableMap.of( + count, + new AggregationNode.Aggregation( + countFunction, + ImmutableList.of(), + false, + Optional.empty(), + Optional.empty(), + Optional.empty())), + globalAggregation()), + Assignments.of( + exists, + new ComparisonExpression( + GREATER_THAN, + count.toSymbolReference(), + new Cast(new LongLiteral("0"), toSqlType(LongType.getInstance()))))), + applyNode.getCorrelation(), + INNER, + TRUE_LITERAL, + applyNode.getOriginSubquery()); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AssignUniqueId.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AssignUniqueId.java new file mode 100644 index 00000000000..3528d8d3649 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AssignUniqueId.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.planner.node; + +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleChildProcessNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; + +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + +public class AssignUniqueId extends SingleChildProcessNode { + private final Symbol idColumn; + + public AssignUniqueId(PlanNodeId id, PlanNode child, Symbol idColumn) { + super(id); + this.child = child; + this.idColumn = requireNonNull(idColumn, "idColumn is null"); + } + + @Override + public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { + return visitor.visitAssignUniqueId(this, context); + } + + @Override + public List<Symbol> getOutputSymbols() { + return ImmutableList.<Symbol>builder().addAll(child.getOutputSymbols()).add(idColumn).build(); + } + + @Override + public PlanNode clone() { + // clone without child + return new AssignUniqueId(id, null, idColumn); + } + + @Override + public PlanNode replaceChildren(List<PlanNode> newChildren) { + checkArgument(newChildren.size() == 1, "expected newChildren to contain 1 node"); + return new AssignUniqueId(getPlanNodeId(), Iterables.getOnlyElement(newChildren), idColumn); + } + + @Override + protected void serializeAttributes(ByteBuffer byteBuffer) { + PlanNodeType.TABLE_ASSIGN_UNIQUE_ID.serialize(byteBuffer); + Symbol.serialize(idColumn, byteBuffer); + } + + @Override + protected void serializeAttributes(DataOutputStream stream) throws IOException { + PlanNodeType.TABLE_ASSIGN_UNIQUE_ID.serialize(stream); + Symbol.serialize(idColumn, stream); + } + + public static AssignUniqueId deserialize(ByteBuffer byteBuffer) { + Symbol idColumn = Symbol.deserialize(byteBuffer); + PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); + return new AssignUniqueId(planNodeId, null, idColumn); + } + + @Override + public List<String> getOutputColumnNames() { + throw new UnsupportedOperationException(); + } + + public Symbol getIdColumn() { + return idColumn; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/Patterns.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/Patterns.java index 604e2b06a3a..f676fcd0ca5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/Patterns.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/Patterns.java @@ -42,10 +42,10 @@ public final class Patterns { return typeOf(AggregationNode.class); } - // public static Pattern<AssignUniqueId> assignUniqueId() - // { - // return typeOf(AssignUniqueId.class); - // } + public static Pattern<AssignUniqueId> assignUniqueId() { + return typeOf(AssignUniqueId.class); + } + // // public static Pattern<GroupIdNode> groupId() // { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java index b3766383c18..de2b245bfd2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java @@ -35,6 +35,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.Pr import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneApplyColumns; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneApplyCorrelation; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneApplySourceColumns; +import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneAssignUniqueIdColumns; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneCorrelatedJoinColumns; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneCorrelatedJoinCorrelation; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneDistinctAggregation; @@ -86,6 +87,7 @@ public class LogicalOptimizeFactory { new PruneApplyColumns(), new PruneApplyCorrelation(), new PruneApplySourceColumns(), + new PruneAssignUniqueIdColumns(), new PruneCorrelatedJoinColumns(), new PruneCorrelatedJoinCorrelation(), new PruneEnforceSingleRowColumns(), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PlanNodeDecorrelator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PlanNodeDecorrelator.java new file mode 100644 index 00000000000..8527f74191c --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PlanNodeDecorrelator.java @@ -0,0 +1,699 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations; + +import org.apache.iotdb.db.exception.sql.SemanticException; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; +import org.apache.iotdb.db.queryengine.plan.relational.planner.Assignments; +import org.apache.iotdb.db.queryengine.plan.relational.planner.PlannerContext; +import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; +import org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator; +import org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolsExtractor; +import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.GroupReference; +import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Lookup; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKNode; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Cast; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SymbolReference; +import org.apache.iotdb.db.queryengine.plan.relational.type.TypeCoercion; +import org.apache.iotdb.db.queryengine.plan.relational.type.TypeManager; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableMultimap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.ImmutableMap.toImmutableMap; +import static java.util.Objects.requireNonNull; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.ir.DeterminismEvaluator.isDeterministic; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.ir.IrUtils.and; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.ir.IrUtils.combineConjuncts; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.ir.IrUtils.extractConjuncts; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.singleAggregation; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.singleGroupingSet; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.SymbolMapper.symbolMapper; +import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression.Operator.EQUAL; + +public class PlanNodeDecorrelator { + private final PlannerContext plannerContext; + private final SymbolAllocator symbolAllocator; + private final Lookup lookup; + private final TypeCoercion typeCoercion; + + public PlanNodeDecorrelator( + PlannerContext plannerContext, SymbolAllocator symbolAllocator, Lookup lookup) { + this.plannerContext = requireNonNull(plannerContext, "plannerContext is null"); + this.symbolAllocator = requireNonNull(symbolAllocator, "symbolAllocator is null"); + this.lookup = requireNonNull(lookup, "lookup is null"); + this.typeCoercion = new TypeCoercion(plannerContext.getTypeManager()::getType); + } + + public Optional<DecorrelatedNode> decorrelateFilters(PlanNode node, List<Symbol> correlation) { + if (correlation.isEmpty()) { + return Optional.of(new DecorrelatedNode(ImmutableList.of(), node)); + } + + Optional<DecorrelationResult> decorrelationResultOptional = + node.accept(new DecorrelatingVisitor(plannerContext.getTypeManager(), correlation), null); + return decorrelationResultOptional.flatMap( + decorrelationResult -> + decorrelatedNode( + decorrelationResult.correlatedPredicates, decorrelationResult.node, correlation)); + } + + private class DecorrelatingVisitor extends PlanVisitor<Optional<DecorrelationResult>, Void> { + private final TypeManager typeManager; + private final List<Symbol> correlation; + + DecorrelatingVisitor(TypeManager typeManager, List<Symbol> correlation) { + this.typeManager = requireNonNull(typeManager, "typeManager is null"); + this.correlation = requireNonNull(correlation, "correlation is null"); + } + + @Override + public Optional<DecorrelationResult> visitPlan(PlanNode node, Void context) { + if (containsCorrelation(node, correlation)) { + return Optional.empty(); + } + return Optional.of( + new DecorrelationResult( + node, + ImmutableSet.of(), + ImmutableList.of(), + ImmutableMultimap.of(), + ImmutableSet.of(), + false)); + } + + @Override + public Optional<DecorrelationResult> visitGroupReference(GroupReference node, Void context) { + return lookup.resolve(node).accept(this, null); + } + + @Override + public Optional<DecorrelationResult> visitFilter(FilterNode node, Void context) { + Optional<DecorrelationResult> childDecorrelationResultOptional = + Optional.of( + new DecorrelationResult( + node.getChild(), + ImmutableSet.of(), + ImmutableList.of(), + ImmutableMultimap.of(), + ImmutableSet.of(), + false)); + + // try to decorrelate filters down the tree + if (containsCorrelation(node.getChild(), correlation)) { + childDecorrelationResultOptional = node.getChild().accept(this, null); + } + + if (!childDecorrelationResultOptional.isPresent()) { + return Optional.empty(); + } + + Expression predicate = node.getPredicate(); + Map<Boolean, List<Expression>> predicates = + extractConjuncts(predicate).stream() + .collect(Collectors.partitioningBy(DecorrelatingVisitor.this::isCorrelated)); + List<Expression> correlatedPredicates = ImmutableList.copyOf(predicates.get(true)); + List<Expression> uncorrelatedPredicates = ImmutableList.copyOf(predicates.get(false)); + + DecorrelationResult childDecorrelationResult = childDecorrelationResultOptional.get(); + FilterNode newFilterNode = + new FilterNode( + node.getPlanNodeId(), + childDecorrelationResult.node, + combineConjuncts(uncorrelatedPredicates)); + + Set<Symbol> symbolsToPropagate = + Sets.difference( + SymbolsExtractor.extractUnique(correlatedPredicates), + ImmutableSet.copyOf(correlation)); + return Optional.of( + new DecorrelationResult( + newFilterNode, + Sets.union(childDecorrelationResult.symbolsToPropagate, symbolsToPropagate), + ImmutableList.<Expression>builder() + .addAll(childDecorrelationResult.correlatedPredicates) + .addAll(correlatedPredicates) + .build(), + ImmutableMultimap.<Symbol, Symbol>builder() + .putAll(childDecorrelationResult.correlatedSymbolsMapping) + .putAll(extractCorrelatedSymbolsMapping(correlatedPredicates)) + .build(), + ImmutableSet.<Symbol>builder() + .addAll(childDecorrelationResult.constantSymbols) + .addAll(extractConstantSymbols(correlatedPredicates)) + .build(), + childDecorrelationResult.atMostSingleRow)); + } + + @Override + public Optional<DecorrelationResult> visitLimit(LimitNode node, Void context) { + if (node.getCount() == 0 || node.isWithTies()) { + return Optional.empty(); + } + + Optional<DecorrelationResult> childDecorrelationResultOptional = + node.getChild().accept(this, null); + if (!childDecorrelationResultOptional.isPresent()) { + return Optional.empty(); + } + + DecorrelationResult childDecorrelationResult = childDecorrelationResultOptional.get(); + if (childDecorrelationResult.atMostSingleRow) { + return childDecorrelationResultOptional; + } + + if (node.getCount() == 1) { + return rewriteLimitWithRowCountOne(childDecorrelationResult, node.getPlanNodeId()); + } + throw new SemanticException( + "Decorrelation for LIMIT with row count greater than 1 is not supported yet"); + // return rewriteLimitWithRowCountGreaterThanOne(childDecorrelationResult, node); + } + + // TODO Limit (1) could be decorrelated by the method rewriteLimitWithRowCountGreaterThanOne() + // as well. + // The current decorrelation method for Limit (1) cannot deal with subqueries outputting other + // symbols + // than constants. + // + // An example query that is currently not supported: + // SELECT ( + // SELECT a+b + // FROM (VALUES (1, 2), (1, 2)) inner_relation(a, b) + // WHERE a=x + // LIMIT 1) + // FROM (VALUES (1)) outer_relation(x) + // + // Switching the decorrelation method would change the way that queries with EXISTS are + // executed, + // and thus it needs benchmarking. + private Optional<DecorrelationResult> rewriteLimitWithRowCountOne( + DecorrelationResult childDecorrelationResult, PlanNodeId nodeId) { + PlanNode decorrelatedChildNode = childDecorrelationResult.node; + Set<Symbol> constantSymbols = childDecorrelationResult.getConstantSymbols(); + + if (constantSymbols.isEmpty() + || !constantSymbols.containsAll(decorrelatedChildNode.getOutputSymbols())) { + return Optional.empty(); + } + + // rewrite Limit to aggregation on constant symbols + AggregationNode aggregationNode = + singleAggregation( + nodeId, + decorrelatedChildNode, + ImmutableMap.of(), + singleGroupingSet(decorrelatedChildNode.getOutputSymbols())); + + return Optional.of( + new DecorrelationResult( + aggregationNode, + childDecorrelationResult.symbolsToPropagate, + childDecorrelationResult.correlatedPredicates, + childDecorrelationResult.correlatedSymbolsMapping, + childDecorrelationResult.constantSymbols, + true)); + } + + /* + private Optional<DecorrelationResult> rewriteLimitWithRowCountGreaterThanOne( + DecorrelationResult childDecorrelationResult, LimitNode node) { + PlanNode decorrelatedChildNode = childDecorrelationResult.node; + + // no rewrite needed (no symbols to partition by) + if (childDecorrelationResult.symbolsToPropagate.isEmpty()) { + return Optional.of( + new DecorrelationResult( + node.replaceChildren(ImmutableList.of(decorrelatedChildNode)), + childDecorrelationResult.symbolsToPropagate, + childDecorrelationResult.correlatedPredicates, + childDecorrelationResult.correlatedSymbolsMapping, + childDecorrelationResult.constantSymbols, + false)); + } + + Set<Symbol> constantSymbols = childDecorrelationResult.getConstantSymbols(); + if (!constantSymbols.containsAll(childDecorrelationResult.symbolsToPropagate)) { + return Optional.empty(); + } + + // rewrite Limit to RowNumberNode partitioned by constant symbols + RowNumberNode rowNumberNode = + new RowNumberNode( + node.getPlanNodeId(), + decorrelatedChildNode, + ImmutableList.copyOf(childDecorrelationResult.symbolsToPropagate), + false, + symbolAllocator.newSymbol("row_number", BIGINT), + Optional.of(toIntExact(node.getCount())), + Optional.empty()); + + return Optional.of( + new DecorrelationResult( + rowNumberNode, + childDecorrelationResult.symbolsToPropagate, + childDecorrelationResult.correlatedPredicates, + childDecorrelationResult.correlatedSymbolsMapping, + childDecorrelationResult.constantSymbols, + false)); + }*/ + + @Override + public Optional<DecorrelationResult> visitTopK(TopKNode node, Void context) { + throw new SemanticException("TopK is not supported in correlated subquery for now"); + } + + /*@Override + public Optional<DecorrelationResult> visitTopK(TopKNode node, Void context) + { + if (node.getCount() == 0) { + return Optional.empty(); + } + + checkState(node.getChildren().size() == 1, "Expected TopKNode to have a single child"); + Optional<DecorrelationResult> childDecorrelationResultOptional = node.getChildren().get(0).accept(this, null); + if (!childDecorrelationResultOptional.isPresent()) { + return Optional.empty(); + } + + DecorrelationResult childDecorrelationResult = childDecorrelationResultOptional.get(); + if (childDecorrelationResult.atMostSingleRow) { + return childDecorrelationResultOptional; + } + + PlanNode decorrelatedChildNode = childDecorrelationResult.node; + Set<Symbol> constantSymbols = childDecorrelationResult.getConstantSymbols(); + Optional<OrderingScheme> decorrelatedOrderingScheme = decorrelateOrderingScheme(node.getOrderingScheme(), constantSymbols); + + // no partitioning needed (no symbols to partition by) + if (childDecorrelationResult.symbolsToPropagate.isEmpty()) { + return decorrelatedOrderingScheme + .map(orderingScheme -> new DecorrelationResult( + // ordering symbols are present - return decorrelated TopNNode + new TopKNode(node.getPlanNodeId(), decorrelatedChildNode, node.getCount(), orderingScheme, node.getStep()), + childDecorrelationResult.symbolsToPropagate, + childDecorrelationResult.correlatedPredicates, + childDecorrelationResult.correlatedSymbolsMapping, + childDecorrelationResult.constantSymbols, + node.getCount() == 1)) + .or(() -> Optional.of(new DecorrelationResult( + // no ordering symbols are left - convert to LimitNode + new LimitNode(node.getPlanNodeId(), decorrelatedChildNode, node.getCount(), Optional.empty()), + childDecorrelationResult.symbolsToPropagate, + childDecorrelationResult.correlatedPredicates, + childDecorrelationResult.correlatedSymbolsMapping, + childDecorrelationResult.constantSymbols, + node.getCount() == 1))); + } + + if (!constantSymbols.containsAll(childDecorrelationResult.symbolsToPropagate)) { + return Optional.empty(); + } + + return decorrelatedOrderingScheme + .map(orderingScheme -> { + // ordering symbols are present - rewrite TopN to TopNRankingNode partitioned by constant symbols + TopNRankingNode topNRankingNode = new TopNRankingNode( + node.getId(), + decorrelatedChildNode, + new DataOrganizationSpecification( + ImmutableList.copyOf(childDecorrelationResult.symbolsToPropagate), + Optional.of(orderingScheme)), + ROW_NUMBER, + symbolAllocator.newSymbol("ranking", BIGINT), + toIntExact(node.getCount()), + false, + Optional.empty()); + + return Optional.of(new DecorrelationResult( + topNRankingNode, + childDecorrelationResult.symbolsToPropagate, + childDecorrelationResult.correlatedPredicates, + childDecorrelationResult.correlatedSymbolsMapping, + childDecorrelationResult.constantSymbols, + node.getCount() == 1)); + }) + .orElseGet(() -> { + // no ordering symbols are left - rewrite TopN to RowNumberNode partitioned by constant symbols + RowNumberNode rowNumberNode = new RowNumberNode( + node.getId(), + decorrelatedChildNode, + ImmutableList.copyOf(childDecorrelationResult.symbolsToPropagate), + false, + symbolAllocator.newSymbol("row_number", BIGINT), + Optional.of(toIntExact(node.getCount())), + Optional.empty()); + + return Optional.of(new DecorrelationResult( + rowNumberNode, + childDecorrelationResult.symbolsToPropagate, + childDecorrelationResult.correlatedPredicates, + childDecorrelationResult.correlatedSymbolsMapping, + childDecorrelationResult.constantSymbols, + node.getCount() == 1)); + }); + } + + private Optional<OrderingScheme> decorrelateOrderingScheme( + OrderingScheme orderingScheme, Set<Symbol> constantSymbols) { + // remove local and remote constant sort symbols from the OrderingScheme + ImmutableList.Builder<Symbol> nonConstantOrderBy = ImmutableList.builder(); + ImmutableMap.Builder<Symbol, SortOrder> nonConstantOrderings = ImmutableMap.builder(); + for (Symbol symbol : orderingScheme.getOrderBy()) { + if (!constantSymbols.contains(symbol) && !correlation.contains(symbol)) { + nonConstantOrderBy.add(symbol); + nonConstantOrderings.put(symbol, orderingScheme.getOrdering(symbol)); + } + } + if (nonConstantOrderBy.build().isEmpty()) { + return Optional.empty(); + } + return Optional.of( + new OrderingScheme(nonConstantOrderBy.build(), nonConstantOrderings.buildOrThrow())); + }*/ + + @Override + public Optional<DecorrelationResult> visitAggregation(AggregationNode node, Void context) { + // Aggregation with global grouping cannot be converted to aggregation grouped on constants. + // Theoretically, if there are no constants to group on, the aggregation could be successfully + // decorrelated. + // However, it can only happen when Aggregation's source plan is not correlated. + // Then: + // - either we should not reach here because uncorrelated subplans of correlated filters are + // not explored, + // - or the aggregation contains correlation which is unresolvable. This is indicated by + // returning Optional.empty(). + if (node.hasEmptyGroupingSet()) { + return Optional.empty(); + } + + if (node.getGroupingSetCount() != 1) { + return Optional.empty(); + } + + Optional<DecorrelationResult> childDecorrelationResultOptional = + node.getChild().accept(this, null); + if (!childDecorrelationResultOptional.isPresent()) { + return Optional.empty(); + } + + DecorrelationResult childDecorrelationResult = childDecorrelationResultOptional.get(); + Set<Symbol> constantSymbols = childDecorrelationResult.getConstantSymbols(); + + AggregationNode decorrelatedAggregation = + childDecorrelationResult + .getCorrelatedSymbolMapper() + .map(node, childDecorrelationResult.node); + + Set<Symbol> groupingKeys = ImmutableSet.copyOf(node.getGroupingKeys()); + checkState( + ImmutableSet.copyOf(decorrelatedAggregation.getGroupingKeys()).equals(groupingKeys), + "grouping keys were correlated"); + List<Symbol> symbolsToAdd = + childDecorrelationResult.symbolsToPropagate.stream() + .filter(symbol -> !groupingKeys.contains(symbol)) + .collect(toImmutableList()); + + if (!constantSymbols.containsAll(symbolsToAdd)) { + return Optional.empty(); + } + + AggregationNode newAggregation = + AggregationNode.builderFrom(decorrelatedAggregation) + .setGroupingSets( + singleGroupingSet( + ImmutableList.<Symbol>builder() + .addAll(node.getGroupingKeys()) + .addAll(symbolsToAdd) + .build())) + .setPreGroupedSymbols(ImmutableList.of()) + .build(); + + return Optional.of( + new DecorrelationResult( + newAggregation, + childDecorrelationResult.symbolsToPropagate, + childDecorrelationResult.correlatedPredicates, + childDecorrelationResult.correlatedSymbolsMapping, + childDecorrelationResult.constantSymbols, + constantSymbols.containsAll(newAggregation.getGroupingKeys()))); + } + + @Override + public Optional<DecorrelationResult> visitProject(ProjectNode node, Void context) { + Optional<DecorrelationResult> childDecorrelationResultOptional = + node.getChild().accept(this, null); + if (!childDecorrelationResultOptional.isPresent()) { + return Optional.empty(); + } + + DecorrelationResult childDecorrelationResult = childDecorrelationResultOptional.get(); + Set<Symbol> nodeOutputSymbols = ImmutableSet.copyOf(node.getOutputSymbols()); + List<Symbol> symbolsToAdd = + childDecorrelationResult.symbolsToPropagate.stream() + .filter(symbol -> !nodeOutputSymbols.contains(symbol)) + .collect(toImmutableList()); + + Assignments assignments = + Assignments.builder().putAll(node.getAssignments()).putIdentities(symbolsToAdd).build(); + + return Optional.of( + new DecorrelationResult( + new ProjectNode(node.getPlanNodeId(), childDecorrelationResult.node, assignments), + childDecorrelationResult.symbolsToPropagate, + childDecorrelationResult.correlatedPredicates, + childDecorrelationResult.correlatedSymbolsMapping, + childDecorrelationResult.constantSymbols, + childDecorrelationResult.atMostSingleRow)); + } + + private Multimap<Symbol, Symbol> extractCorrelatedSymbolsMapping( + List<Expression> correlatedConjuncts) { + ImmutableMultimap.Builder<Symbol, Symbol> mapping = ImmutableMultimap.builder(); + for (Expression conjunct : correlatedConjuncts) { + if (!(conjunct instanceof ComparisonExpression)) { + continue; + } + + ComparisonExpression comparison = (ComparisonExpression) conjunct; + + if (!(comparison.getLeft() instanceof SymbolReference + && comparison.getRight() instanceof SymbolReference + && comparison.getOperator() == EQUAL)) { + continue; + } + + Symbol left = Symbol.from(comparison.getLeft()); + Symbol right = Symbol.from(comparison.getRight()); + + if (correlation.contains(left) && !correlation.contains(right)) { + mapping.put(left, right); + } + + if (correlation.contains(right) && !correlation.contains(left)) { + mapping.put(right, left); + } + } + + return mapping.build(); + } + + private Set<Symbol> extractConstantSymbols(List<Expression> correlatedConjuncts) { + ImmutableSet.Builder<Symbol> constants = ImmutableSet.builder(); + + correlatedConjuncts.stream() + .filter(ComparisonExpression.class::isInstance) + .map(ComparisonExpression.class::cast) + .filter(comparison -> comparison.getOperator() == EQUAL) + .forEach( + comparison -> { + Expression left = comparison.getLeft(); + Expression right = comparison.getRight(); + + if (!isCorrelated(left) + && (left instanceof SymbolReference || isSimpleInjectiveCast(left)) + && isConstant(right)) { + constants.add(getSymbol(left)); + } + + if (!isCorrelated(right) + && (right instanceof SymbolReference || isSimpleInjectiveCast(right)) + && isConstant(left)) { + constants.add(getSymbol(right)); + } + }); + + return constants.build(); + } + + // checks whether the expression is a deterministic combination of correlation symbols + private boolean isConstant(Expression expression) { + return isDeterministic(expression) + && ImmutableSet.copyOf(correlation) + .containsAll(SymbolsExtractor.extractUnique(expression)); + } + + // checks whether the expression is an injective cast over a symbol + private boolean isSimpleInjectiveCast(Expression expression) { + if (!(expression instanceof Cast)) { + return false; + } + Cast cast = (Cast) expression; + if (!(cast.getExpression() instanceof SymbolReference)) { + return false; + } + // simply return true for now + return true; + /*Symbol sourceSymbol = Symbol.from(cast.getExpression()); + + Type sourceType = symbolAllocator.getTypes().getTableModelType(sourceSymbol); + Type targetType = typeManager.getType(toTypeSignature(((Cast) expression).getType())); + + return typeCoercion.isInjectiveCoercion(sourceType, targetType);*/ + } + + private Symbol getSymbol(Expression expression) { + if (expression instanceof SymbolReference) { + return Symbol.from(expression); + } + return Symbol.from(((Cast) expression).getExpression()); + } + + private boolean isCorrelated(Expression expression) { + return correlation.stream().anyMatch(SymbolsExtractor.extractUnique(expression)::contains); + } + } + + private static class DecorrelationResult { + final PlanNode node; + final Set<Symbol> symbolsToPropagate; + final List<Expression> correlatedPredicates; + + // mapping from correlated symbols to their uncorrelated equivalence + final Multimap<Symbol, Symbol> correlatedSymbolsMapping; + + // local (uncorrelated) symbols known to be constant based on their dependency on correlation + // symbols + // they are derived from the filter predicate, e.g. + // a = corr --> a is constant + // b = f(corr_1, corr_2, ...) --> b is constant provided that f is deterministic + // cast(c AS ...) = corr --> c is constant provided that cast is injective + final Set<Symbol> constantSymbols; + + // If a subquery has at most single row for any correlation values? + final boolean atMostSingleRow; + + DecorrelationResult( + PlanNode node, + Set<Symbol> symbolsToPropagate, + List<Expression> correlatedPredicates, + Multimap<Symbol, Symbol> correlatedSymbolsMapping, + Set<Symbol> constantSymbols, + boolean atMostSingleRow) { + this.node = node; + this.symbolsToPropagate = symbolsToPropagate; + this.correlatedPredicates = correlatedPredicates; + this.atMostSingleRow = atMostSingleRow; + this.correlatedSymbolsMapping = correlatedSymbolsMapping; + this.constantSymbols = constantSymbols; + checkState( + constantSymbols.containsAll(correlatedSymbolsMapping.values()), + "Expected constant symbols to contain all correlated symbols local equivalents"); + checkState( + symbolsToPropagate.containsAll(constantSymbols), + "Expected symbols to propagate to contain all constant symbols"); + } + + SymbolMapper getCorrelatedSymbolMapper() { + return symbolMapper( + correlatedSymbolsMapping.asMap().entrySet().stream() + .collect( + toImmutableMap( + Map.Entry::getKey, symbols -> Iterables.getLast(symbols.getValue())))); + } + + /** + * @return constant symbols from a perspective of a subquery + */ + Set<Symbol> getConstantSymbols() { + return constantSymbols; + } + } + + private Optional<DecorrelatedNode> decorrelatedNode( + List<Expression> correlatedPredicates, PlanNode node, List<Symbol> correlation) { + if (containsCorrelation(node, correlation)) { + // node is still correlated ; / + return Optional.empty(); + } + return Optional.of(new DecorrelatedNode(correlatedPredicates, node)); + } + + private boolean containsCorrelation(PlanNode node, List<Symbol> correlation) { + return Sets.union( + SymbolsExtractor.extractUnique(node, lookup), + SymbolsExtractor.extractOutputSymbols(node, lookup)) + .stream() + .anyMatch(correlation::contains); + } + + public static class DecorrelatedNode { + private final List<Expression> correlatedPredicates; + private final PlanNode node; + + public DecorrelatedNode(List<Expression> correlatedPredicates, PlanNode node) { + requireNonNull(correlatedPredicates, "correlatedPredicates is null"); + this.correlatedPredicates = ImmutableList.copyOf(correlatedPredicates); + this.node = requireNonNull(node, "node is null"); + } + + public Optional<Expression> getCorrelatedPredicates() { + if (correlatedPredicates.isEmpty()) { + return Optional.empty(); + } + return Optional.of(and(correlatedPredicates)); + } + + public PlanNode getNode() { + return node; + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java index f039c09a701..ba9487c2ba9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java @@ -47,6 +47,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; import org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator; import org.apache.iotdb.db.queryengine.plan.relational.planner.ir.ReplaceSymbolInExpression; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AssignUniqueId; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode; @@ -81,6 +82,7 @@ import java.util.Set; import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.ImmutableMap.toImmutableMap; import static java.util.Objects.requireNonNull; import static org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.ATTRIBUTE; @@ -984,6 +986,19 @@ public class PushPredicateIntoTableScan implements PlanOptimizer { return output; } + @Override + public PlanNode visitAssignUniqueId(AssignUniqueId node, RewriteContext context) { + Expression inheritedPredicate = + context.inheritedPredicate != null ? context.inheritedPredicate : TRUE_LITERAL; + Set<Symbol> predicateSymbols = extractUnique(inheritedPredicate); + checkState( + !predicateSymbols.contains(node.getIdColumn()), + "UniqueId in predicate is not yet supported"); + PlanNode rewrittenChild = node.getChild().accept(this, context); + node.replaceChildren(ImmutableList.of(rewrittenChild)); + return node; + } + @Override public PlanNode visitInsertTablet(InsertTabletNode node, RewriteContext context) { return node; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformQuantifiedComparisonApplyToCorrelatedJoin.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformQuantifiedComparisonApplyToCorrelatedJoin.java index 00fecc1a69a..a867faaa6e5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformQuantifiedComparisonApplyToCorrelatedJoin.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformQuantifiedComparisonApplyToCorrelatedJoin.java @@ -21,12 +21,7 @@ package org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations; import org.apache.iotdb.db.queryengine.common.QueryId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; -import org.apache.iotdb.db.queryengine.plan.relational.function.BoundSignature; -import org.apache.iotdb.db.queryengine.plan.relational.function.FunctionId; -import org.apache.iotdb.db.queryengine.plan.relational.function.FunctionKind; -import org.apache.iotdb.db.queryengine.plan.relational.metadata.FunctionNullability; import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata; -import org.apache.iotdb.db.queryengine.plan.relational.metadata.ResolvedFunction; import org.apache.iotdb.db.queryengine.plan.relational.planner.Assignments; import org.apache.iotdb.db.queryengine.plan.relational.planner.SimplePlanRewriter; import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; @@ -54,7 +49,6 @@ import org.apache.tsfile.read.common.type.Type; import java.util.EnumSet; import java.util.List; -import java.util.Locale; import java.util.Optional; import java.util.function.Function; @@ -66,6 +60,7 @@ import static org.apache.iotdb.db.queryengine.plan.relational.planner.ir.IrUtils import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.globalAggregation; import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.singleAggregation; import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.ApplyNode.Quantifier.ALL; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.Util.getResolvedBuiltInAggregateFunction; import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.BooleanLiteral.FALSE_LITERAL; import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.BooleanLiteral.TRUE_LITERAL; import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression.Operator.EQUAL; @@ -142,7 +137,7 @@ public class TransformQuantifiedComparisonApplyToCorrelatedJoin implements PlanO minValue, new AggregationNode.Aggregation( getResolvedBuiltInAggregateFunction( - "min", ImmutableList.of(outputColumnType)), + metadata, "min", ImmutableList.of(outputColumnType)), outputColumnReferences, false, Optional.empty(), @@ -151,7 +146,7 @@ public class TransformQuantifiedComparisonApplyToCorrelatedJoin implements PlanO maxValue, new AggregationNode.Aggregation( getResolvedBuiltInAggregateFunction( - "max", ImmutableList.of(outputColumnType)), + metadata, "max", ImmutableList.of(outputColumnType)), outputColumnReferences, false, Optional.empty(), @@ -160,7 +155,7 @@ public class TransformQuantifiedComparisonApplyToCorrelatedJoin implements PlanO countAllValue, new AggregationNode.Aggregation( getResolvedBuiltInAggregateFunction( - "count_all", ImmutableList.of(outputColumnType)), + metadata, "count_all", ImmutableList.of(outputColumnType)), outputColumnReferences, false, Optional.empty(), @@ -169,7 +164,7 @@ public class TransformQuantifiedComparisonApplyToCorrelatedJoin implements PlanO countNonNullValue, new AggregationNode.Aggregation( getResolvedBuiltInAggregateFunction( - "count", ImmutableList.of(outputColumnType)), + metadata, "count", ImmutableList.of(outputColumnType)), outputColumnReferences, false, Optional.empty(), @@ -197,18 +192,6 @@ public class TransformQuantifiedComparisonApplyToCorrelatedJoin implements PlanO join, Assignments.of(quantifiedComparisonSymbol, valueComparedToSubquery)); } - private ResolvedFunction getResolvedBuiltInAggregateFunction( - String functionName, List<Type> argumentTypes) { - // The same as the code in ExpressionAnalyzer - Type type = metadata.getFunctionReturnType(functionName, argumentTypes); - return new ResolvedFunction( - new BoundSignature(functionName.toLowerCase(Locale.ENGLISH), type, argumentTypes), - new FunctionId("noop"), - FunctionKind.AGGREGATE, - true, - FunctionNullability.getAggregationFunctionNullability(argumentTypes.size())); - } - public Expression rewriteUsingBounds( ApplyNode.QuantifiedComparison quantifiedComparison, Symbol minValue, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java index 2b414438983..2e32cc13acd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java @@ -31,6 +31,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator; import org.apache.iotdb.db.queryengine.plan.relational.planner.ir.DeterminismEvaluator; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ApplyNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AssignUniqueId; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CorrelatedJoinNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.EnforceSingleRowNode; @@ -760,6 +761,18 @@ public class UnaliasSymbolReferences implements PlanOptimizer { newSemiJoinOutput), outputMapping); } + + @Override + public PlanAndMappings visitAssignUniqueId(AssignUniqueId node, UnaliasContext context) { + PlanAndMappings rewrittenSource = node.getChild().accept(this, context); + Map<Symbol, Symbol> mapping = new HashMap<>(rewrittenSource.getMappings()); + SymbolMapper mapper = symbolMapper(mapping); + + Symbol newUnique = mapper.map(node.getIdColumn()); + + return new PlanAndMappings( + new AssignUniqueId(node.getPlanNodeId(), rewrittenSource.getRoot(), newUnique), mapping); + } } private static class UnaliasContext { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/Util.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/Util.java index 83798c451bc..523dd09f9f9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/Util.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/Util.java @@ -21,6 +21,11 @@ package org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations; import org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinAggregationFunction; import org.apache.iotdb.db.queryengine.common.QueryId; +import org.apache.iotdb.db.queryengine.plan.relational.function.BoundSignature; +import org.apache.iotdb.db.queryengine.plan.relational.function.FunctionId; +import org.apache.iotdb.db.queryengine.plan.relational.function.FunctionKind; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.FunctionNullability; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata; import org.apache.iotdb.db.queryengine.plan.relational.metadata.ResolvedFunction; import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; import org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator; @@ -33,6 +38,8 @@ import org.apache.tsfile.read.common.type.Type; import org.apache.tsfile.utils.Pair; import java.util.LinkedHashMap; +import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Optional; @@ -217,4 +224,16 @@ public class Util { node.getGroupIdSymbol()), rightResult); } + + public static ResolvedFunction getResolvedBuiltInAggregateFunction( + Metadata metadata, String functionName, List<Type> argumentTypes) { + // The same as the code in ExpressionAnalyzer + Type type = metadata.getFunctionReturnType(functionName, argumentTypes); + return new ResolvedFunction( + new BoundSignature(functionName.toLowerCase(Locale.ENGLISH), type, argumentTypes), + new FunctionId("noop"), + FunctionKind.AGGREGATE, + true, + FunctionNullability.getAggregationFunctionNullability(argumentTypes.size())); + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/AssignUniqueIdMatcher.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/AssignUniqueIdMatcher.java new file mode 100644 index 00000000000..6ff4a6b9e4c --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/AssignUniqueIdMatcher.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.planner.assertions; + +import org.apache.iotdb.db.queryengine.common.SessionInfo; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata; +import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AssignUniqueId; + +import java.util.Optional; + +import static com.google.common.base.MoreObjects.toStringHelper; + +public class AssignUniqueIdMatcher implements RvalueMatcher { + @Override + public Optional<Symbol> getAssignedSymbol( + PlanNode node, SessionInfo sessionInfo, Metadata metadata, SymbolAliases symbolAliases) { + if (!(node instanceof AssignUniqueId)) { + return Optional.empty(); + } + + AssignUniqueId assignUniqueIdNode = (AssignUniqueId) node; + + return Optional.of(assignUniqueIdNode.getIdColumn()); + } + + @Override + public String toString() { + return toStringHelper(this).toString(); + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java index 5c2ce8e2f80..6d8b3e98628 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java @@ -28,6 +28,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.GroupRe import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTreeDeviceViewScanNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AssignUniqueId; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.EnforceSingleRowNode; @@ -472,6 +473,11 @@ public final class PlanMatchPattern { .with(new SemiJoinMatcher(sourceSymbolAlias, filteringSymbolAlias, outputAlias)); } + public static PlanMatchPattern assignUniqueId(String uniqueSymbolAlias, PlanMatchPattern source) { + return node(AssignUniqueId.class, source) + .withAlias(uniqueSymbolAlias, new AssignUniqueIdMatcher()); + } + public static PlanMatchPattern streamSort(PlanMatchPattern source) { return node(StreamSortNode.class, source); }
