This is an automated email from the ASF dual-hosted git repository. lancelly pushed a commit to branch support_fold_scalar in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8d917320982017f6d577bfefd4579f289e01fcbe Author: lancelly <[email protected]> AuthorDate: Mon Mar 3 19:49:24 2025 +0800 tmp --- .../plan/relational/planner/SubqueryPlanner.java | 8 ++ .../relational/planner/ir/TestTsBlockBuilder.java | 117 +++++++++++++++++ .../ir/TimePredicateWithSubqueryReconstructer.java | 142 +++++++++++++++++++++ .../relational/sql/ast/ComparisonExpression.java | 12 +- 4 files changed, 277 insertions(+), 2 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/SubqueryPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/SubqueryPlanner.java index ebc32b6a247..beb0ccb16f2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/SubqueryPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/SubqueryPlanner.java @@ -29,6 +29,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.analyzer.NodeRef; import org.apache.iotdb.db.queryengine.plan.relational.analyzer.RelationType; import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Scope; import org.apache.iotdb.db.queryengine.plan.relational.planner.QueryPlanner.PlanAndMappings; +import org.apache.iotdb.db.queryengine.plan.relational.planner.ir.TimePredicateWithSubqueryReconstructer; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ApplyNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CorrelatedJoinNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.EnforceSingleRowNode; @@ -125,6 +126,7 @@ class SubqueryPlanner { List<SubqueryExpression> scalarSubqueries = subqueries.getSubqueries(); if (!scalarSubqueries.isEmpty()) { + tryFoldScalarSubqueryInTimePredicate(expression, plannerContext); for (Cluster<SubqueryExpression> cluster : cluster(builder.getScope(), selectSubqueries(builder, expression, scalarSubqueries))) { builder = planScalarSubquery(builder, cluster); @@ -151,6 +153,12 @@ class SubqueryPlanner { return builder; } + private void tryFoldScalarSubqueryInTimePredicate( + Expression expression, MPPQueryContext context) { + TimePredicateWithSubqueryReconstructer.reconstructTimePredicateWithSubquery( + expression, context); + } + /** * Find subqueries from the candidate set that are children of the given parent and that have not * already been handled in the subplan diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/ir/TestTsBlockBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/ir/TestTsBlockBuilder.java new file mode 100644 index 00000000000..5d8be688b2f --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/ir/TestTsBlockBuilder.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.planner.ir; + +import org.apache.iotdb.db.queryengine.execution.operator.AbstractOperator; + +import org.apache.tsfile.block.column.ColumnBuilder; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.block.TsBlockBuilder; +import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn; + +import java.util.Arrays; + +import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator.TIME_COLUMN_TEMPLATE; + +public class TestTsBlockBuilder extends AbstractOperator { + private TsBlockBuilder resultBuilder; + private int positionCount = 9363; + + public TestTsBlockBuilder() { + this.resultBuilder = + new TsBlockBuilder(Arrays.asList(TSDataType.INT32, TSDataType.INT32, TSDataType.INT32)); + } + + public static void main(String[] args) { + TestTsBlockBuilder testTsBlockBuilder = new TestTsBlockBuilder(); + try { + while (testTsBlockBuilder.hasNext()) { + TsBlock tsBlock = testTsBlockBuilder.next(); + System.out.println(tsBlock); + } + testTsBlockBuilder.next(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Override + public TsBlock next() throws Exception { + if (retainedTsBlock != null) { + getResultFromRetainedTsBlock(); + } + int probeIndex = 0; + while (probeIndex < positionCount) { + appendValueToResult(); + probeIndex++; + } + if (resultBuilder.isEmpty()) { + return null; + } + + resultTsBlock = + resultBuilder.build( + new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, resultBuilder.getPositionCount())); + resultBuilder.reset(); + return checkTsBlockSizeAndGetResult(); + } + + private void appendValueToResult() { + for (int i = 0; i < resultBuilder.getValueColumnBuilders().length; i++) { + ColumnBuilder columnBuilder = resultBuilder.getColumnBuilder(i); + columnBuilder.writeInt(1); + } + resultBuilder.declarePositions(1); + } + + @Override + public boolean hasNext() throws Exception { + return true; + } + + @Override + public void close() throws Exception {} + + @Override + public boolean isFinished() throws Exception { + return false; + } + + @Override + public long calculateMaxPeekMemory() { + return 0; + } + + @Override + public long calculateMaxReturnSize() { + return 0; + } + + @Override + public long calculateRetainedSizeAfterCallingNext() { + return 0; + } + + @Override + public long ramBytesUsed() { + return 0; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/ir/TimePredicateWithSubqueryReconstructer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/ir/TimePredicateWithSubqueryReconstructer.java new file mode 100644 index 00000000000..3f074d71dee --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/ir/TimePredicateWithSubqueryReconstructer.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.planner.ir; + +import org.apache.iotdb.commons.exception.IoTDBException; +import org.apache.iotdb.db.protocol.session.SessionManager; +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; +import org.apache.iotdb.db.queryengine.plan.Coordinator; +import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult; +import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Identifier; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LogicalExpression; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LongLiteral; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.NotExpression; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SubqueryExpression; +import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.read.common.block.TsBlock; + +import java.util.Optional; + +import static com.google.common.base.Preconditions.checkArgument; + +public class TimePredicateWithSubqueryReconstructer { + + private static final SqlParser relationSqlParser = new SqlParser(); + + private static final Coordinator coordinator = Coordinator.getInstance(); + + private TimePredicateWithSubqueryReconstructer() { + // utility class + } + + public static void reconstructTimePredicateWithSubquery( + Expression expression, MPPQueryContext context) { + if (expression instanceof LogicalExpression) { + LogicalExpression logicalExpression = (LogicalExpression) expression; + for (Expression term : logicalExpression.getTerms()) { + reconstructTimePredicateWithSubquery(term, context); + } + } else if (expression instanceof NotExpression) { + NotExpression notExpression = (NotExpression) expression; + reconstructTimePredicateWithSubquery(notExpression.getValue(), context); + } else if (expression instanceof ComparisonExpression) { + ComparisonExpression comparisonExpression = (ComparisonExpression) expression; + Expression left = comparisonExpression.getLeft(); + Expression right = comparisonExpression.getRight(); + if (left instanceof Identifier + && ((Identifier) left).getValue().equals("time") + && right instanceof SubqueryExpression) { + Optional<LongLiteral> result = + fetchSubqueryResultForTimePredicate((SubqueryExpression) right, context); + if (result.isPresent()) { + right = result.get(); + } + } else if (right instanceof Identifier + && ((Identifier) right).getValue().equals("time") + && left instanceof SubqueryExpression) { + Optional<LongLiteral> result = + fetchSubqueryResultForTimePredicate((SubqueryExpression) left, context); + if (result.isPresent()) { + left = result.get(); + } + } + comparisonExpression.setLeft(left); + comparisonExpression.setRight(right); + } + } + + private static Optional<LongLiteral> fetchSubqueryResultForTimePredicate( + SubqueryExpression subqueryExpression, MPPQueryContext context) { + final long queryId = SessionManager.getInstance().requestQueryId(); + Throwable t = null; + + try { + final ExecutionResult executionResult = + coordinator.executeForTableModel( + subqueryExpression.getQuery(), + relationSqlParser, + SessionManager.getInstance().getCurrSession(), + queryId, + SessionManager.getInstance() + .getSessionInfoOfTableModel(SessionManager.getInstance().getCurrSession()), + "Fetch Subquery Result for Time Predicate", + LocalExecutionPlanner.getInstance().metadata, + context.getTimeOut(), + false); + + if (executionResult.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return Optional.empty(); + } + + while (coordinator.getQueryExecution(queryId).hasNextResult()) { + final Optional<TsBlock> tsBlock; + try { + tsBlock = coordinator.getQueryExecution(queryId).getBatchResult(); + } catch (final IoTDBException e) { + t = e; + throw new RuntimeException("Fetch Table Device Schema failed. ", e); + } + if (!tsBlock.isPresent() || tsBlock.get().isEmpty()) { + continue; + } + final Column[] columns = tsBlock.get().getValueColumns(); + checkArgument(columns.length == 1, "Subquery result should have only one column."); + checkArgument( + tsBlock.get().getPositionCount() == 1 && !tsBlock.get().getColumn(0).isNull(0), + "Subquery result should have only one row."); + // result of subquery expression in time predicate must be numeric, otherwise Exception will + // be thrown + final long value = columns[0].getLong(0); + return Optional.of(new LongLiteral(Long.toString(value))); + } + } catch (final Throwable throwable) { + t = throwable; + } finally { + coordinator.cleanupQueryExecution(queryId, null, t); + } + return Optional.empty(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ComparisonExpression.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ComparisonExpression.java index a70badc932a..5786fa26ccb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ComparisonExpression.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ComparisonExpression.java @@ -96,8 +96,8 @@ public class ComparisonExpression extends Expression { } private final Operator operator; - private final Expression left; - private final Expression right; + private Expression left; + private Expression right; public ComparisonExpression(Operator operator, Expression left, Expression right) { super(null); @@ -134,6 +134,14 @@ public class ComparisonExpression extends Expression { return right; } + public void setLeft(Expression left) { + this.left = left; + } + + public void setRight(Expression right) { + this.right = right; + } + @Override public <R, C> R accept(AstVisitor<R, C> visitor, C context) { return visitor.visitComparisonExpression(this, context);
