This is an automated email from the ASF dual-hosted git repository. zhihao pushed a commit to branch perf/szh/push_limit_to_table_scan in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b6ab28cb36cf8a795f75a50c9a84ae7aeccb98ee Author: Sh-Zh-7 <[email protected]> AuthorDate: Tue Mar 3 10:24:25 2026 +0800 Support push predicate through project optimization. --- .../it/db/it/IoTDBWindowFunction3IT.java | 3 +- .../distribute/TableDistributedPlanGenerator.java | 6 +-- .../iterative/rule/PruneTopKRankingColumns.java | 27 +++++++------ .../iterative/rule/PushFilterIntoRowNumber.java | 3 +- .../PushPredicateThroughProjectIntoRowNumber.java | 9 ++--- .../PushPredicateThroughProjectIntoWindow.java | 17 ++++---- .../relational/planner/node/TopKRankingNode.java | 32 +++++++-------- .../optimizations/LogicalOptimizeFactory.java | 4 +- .../planner/WindowFunctionOptimizationTest.java | 45 ++++++++-------------- 9 files changed, 65 insertions(+), 81 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowFunction3IT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowFunction3IT.java index a0e23ea1996..60461e75b9a 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowFunction3IT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowFunction3IT.java @@ -231,8 +231,7 @@ public class IoTDBWindowFunction3IT { String[] expectedHeader = new String[] {"time", "device", "value", "rn"}; String[] retArray = new String[] { - "2021-01-01T09:05:00.000Z,d1,3.0,1,", - "2021-01-01T09:08:00.000Z,d2,2.0,1,", + "2021-01-01T09:05:00.000Z,d1,3.0,1,", "2021-01-01T09:08:00.000Z,d2,2.0,1,", }; tableResultSetEqualTest( "SELECT * FROM (SELECT *, row_number() OVER (PARTITION BY device ORDER BY time ASC) as rn FROM demo) WHERE rn <= 1 ORDER BY device", diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java index 26d301605b2..2f62ebc63f8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java @@ -1908,8 +1908,7 @@ public class TableDistributedPlanGenerator if (parentRefs != null && !parentRefs.contains(node.getRowNumberSymbol())) { // If maxRowCountPerPartition is set, push it as a per-device limit to each // DeviceTableScanNode so that only the required number of rows are scanned. - node - .getMaxRowCountPerPartition() + node.getMaxRowCountPerPartition() .ifPresent( limit -> { for (PlanNode child : childrenNodes) { @@ -1930,8 +1929,7 @@ public class TableDistributedPlanGenerator return childrenNodes; } else { CollectNode collectNode = - new CollectNode( - queryId.genPlanNodeId(), childrenNodes.get(0).getOutputSymbols()); + new CollectNode(queryId.genPlanNodeId(), childrenNodes.get(0).getOutputSymbols()); childrenNodes.forEach(collectNode::addChild); return Collections.singletonList(collectNode); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTopKRankingColumns.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTopKRankingColumns.java index 5bd1ed47239..da49f3a04c6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTopKRankingColumns.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTopKRankingColumns.java @@ -1,10 +1,11 @@ package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule; -import com.google.common.collect.Streams; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKRankingNode; +import com.google.common.collect.Streams; + import java.util.Optional; import java.util.Set; @@ -12,23 +13,21 @@ import static com.google.common.collect.ImmutableSet.toImmutableSet; import static org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.Util.restrictChildOutputs; import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.topNRanking; -public class PruneTopKRankingColumns - extends ProjectOffPushDownRule<TopKRankingNode> -{ - public PruneTopKRankingColumns() - { +public class PruneTopKRankingColumns extends ProjectOffPushDownRule<TopKRankingNode> { + public PruneTopKRankingColumns() { super(topNRanking()); } @Override - protected Optional<PlanNode> pushDownProjectOff(Context context, TopKRankingNode topNRankingNode, Set<Symbol> referencedOutputs) - { - Set<Symbol> requiredInputs = Streams.concat( - referencedOutputs.stream() - .filter(symbol -> !symbol.equals(topNRankingNode.getRankingSymbol())), - topNRankingNode.getSpecification().getPartitionBy().stream(), - topNRankingNode.getSpecification().getOrderingScheme().get().getOrderBy().stream()) - .collect(toImmutableSet()); + protected Optional<PlanNode> pushDownProjectOff( + Context context, TopKRankingNode topNRankingNode, Set<Symbol> referencedOutputs) { + Set<Symbol> requiredInputs = + Streams.concat( + referencedOutputs.stream() + .filter(symbol -> !symbol.equals(topNRankingNode.getRankingSymbol())), + topNRankingNode.getSpecification().getPartitionBy().stream(), + topNRankingNode.getSpecification().getOrderingScheme().get().getOrderBy().stream()) + .collect(toImmutableSet()); return restrictChildOutputs(context.getIdAllocator(), topNRankingNode, requiredInputs); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushFilterIntoRowNumber.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushFilterIntoRowNumber.java index 9f1514b199a..2aa90ba5670 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushFilterIntoRowNumber.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushFilterIntoRowNumber.java @@ -67,7 +67,8 @@ public class PushFilterIntoRowNumber implements Rule<FilterNode> { source() .matching( rowNumber() - .matching(rowNumber -> !rowNumber.getMaxRowCountPerPartition().isPresent()) + .matching( + rowNumber -> !rowNumber.getMaxRowCountPerPartition().isPresent()) .capturedAs(CHILD))); @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushPredicateThroughProjectIntoRowNumber.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushPredicateThroughProjectIntoRowNumber.java index 55e7da20675..f0dbae59778 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushPredicateThroughProjectIntoRowNumber.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushPredicateThroughProjectIntoRowNumber.java @@ -46,8 +46,8 @@ import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patte import static org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Capture.newCapture; /** - * Pushes a row-number upper-bound filter through an identity projection into {@link - * RowNumberNode} by setting {@code maxRowCountPerPartition}. + * Pushes a row-number upper-bound filter through an identity projection into {@link RowNumberNode} + * by setting {@code maxRowCountPerPartition}. * * <p>Before: * @@ -175,9 +175,8 @@ public class PushPredicateThroughProjectIntoRowNumber implements Rule<FilterNode /** * For {@code LESS_THAN} and {@code LESS_THAN_OR_EQUAL}, the RowNumberNode with * maxRowCountPerPartition produces exactly the rows that satisfy the predicate (row numbers - * 1..N), so the filter can be removed. For {@code EQUAL} (e.g. {@code rn = 5}), the - * RowNumberNode produces rows 1..5 but only rows where {@code rn = 5} are wanted, so the filter - * must be kept. + * 1..N), so the filter can be removed. For {@code EQUAL} (e.g. {@code rn = 5}), the RowNumberNode + * produces rows 1..5 but only rows where {@code rn = 5} are wanted, so the filter must be kept. */ private static boolean needToKeepFilter(Expression predicate) { if (!(predicate instanceof ComparisonExpression)) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushPredicateThroughProjectIntoWindow.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushPredicateThroughProjectIntoWindow.java index aa401de2a46..21c17f7dced 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushPredicateThroughProjectIntoWindow.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushPredicateThroughProjectIntoWindow.java @@ -98,8 +98,7 @@ public class PushPredicateThroughProjectIntoWindow implements Rule<FilterNode> { .matching( window() .matching( - window -> - toTopNRankingType(window).isPresent()) + window -> toTopNRankingType(window).isPresent()) .capturedAs(WINDOW))))); } @@ -118,8 +117,7 @@ public class PushPredicateThroughProjectIntoWindow implements Rule<FilterNode> { return Result.empty(); } - OptionalInt upperBound = - extractUpperBoundFromComparison(filter.getPredicate(), rankingSymbol); + OptionalInt upperBound = extractUpperBoundFromComparison(filter.getPredicate(), rankingSymbol); if (!upperBound.isPresent()) { return Result.empty(); } @@ -149,8 +147,7 @@ public class PushPredicateThroughProjectIntoWindow implements Rule<FilterNode> { return Result.ofPlanNode(project); } - private OptionalInt extractUpperBoundFromComparison( - Expression predicate, Symbol rankingSymbol) { + private OptionalInt extractUpperBoundFromComparison(Expression predicate, Symbol rankingSymbol) { if (!(predicate instanceof ComparisonExpression)) { return OptionalInt.empty(); } @@ -188,10 +185,10 @@ public class PushPredicateThroughProjectIntoWindow implements Rule<FilterNode> { } /** - * For {@code LESS_THAN} and {@code LESS_THAN_OR_EQUAL}, the TopKRankingNode produces exactly - * the rows that satisfy the predicate (ranking values 1..N), so the filter can be removed. For - * {@code EQUAL} (e.g. {@code rn = 5}), TopKRankingNode produces rows 1..5 but only rows where - * {@code rn = 5} are wanted, so the filter must be kept. + * For {@code LESS_THAN} and {@code LESS_THAN_OR_EQUAL}, the TopKRankingNode produces exactly the + * rows that satisfy the predicate (ranking values 1..N), so the filter can be removed. For {@code + * EQUAL} (e.g. {@code rn = 5}), TopKRankingNode produces rows 1..5 but only rows where {@code rn + * = 5} are wanted, so the filter must be kept. */ private static boolean needToKeepFilter(Expression predicate) { if (!(predicate instanceof ComparisonExpression)) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TopKRankingNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TopKRankingNode.java index 1dc799611ce..3b48ec48523 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TopKRankingNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TopKRankingNode.java @@ -90,13 +90,14 @@ public class TopKRankingNode extends SingleChildProcessNode { @Override public PlanNode clone() { - TopKRankingNode topKRankingNode = new TopKRankingNode( - getPlanNodeId(), - specification, - rankingType, - rankingSymbol, - maxRankingPerPartition, - partial); + TopKRankingNode topKRankingNode = + new TopKRankingNode( + getPlanNodeId(), + specification, + rankingType, + rankingSymbol, + maxRankingPerPartition, + partial); topKRankingNode.setDataPreSortedAndLimited(dataPreSortedAndLimited); return topKRankingNode; } @@ -188,14 +189,15 @@ public class TopKRankingNode extends SingleChildProcessNode { @Override public PlanNode replaceChildren(List<PlanNode> newChildren) { - TopKRankingNode topKRankingNode = new TopKRankingNode( - id, - Iterables.getOnlyElement(newChildren), - specification, - rankingType, - rankingSymbol, - maxRankingPerPartition, - partial); + TopKRankingNode topKRankingNode = + new TopKRankingNode( + id, + Iterables.getOnlyElement(newChildren), + specification, + rankingType, + rankingSymbol, + maxRankingPerPartition, + partial); topKRankingNode.setDataPreSortedAndLimited(dataPreSortedAndLimited); return topKRankingNode; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java index b3f398089a1..c5047f80468 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java @@ -80,11 +80,11 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.Pr import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PushDownFilterIntoWindow; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PushDownLimitIntoWindow; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PushFilterIntoRowNumber; -import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PushPredicateThroughProjectIntoRowNumber; -import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PushPredicateThroughProjectIntoWindow; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PushLimitThroughOffset; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PushLimitThroughProject; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PushLimitThroughUnion; +import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PushPredicateThroughProjectIntoRowNumber; +import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PushPredicateThroughProjectIntoWindow; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PushProjectionThroughUnion; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PushTopKThroughUnion; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.RemoveDuplicateConditions; diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/WindowFunctionOptimizationTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/WindowFunctionOptimizationTest.java index aed16f77c6e..0dccd601ac6 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/WindowFunctionOptimizationTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/WindowFunctionOptimizationTest.java @@ -31,18 +31,13 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import org.junit.Test; -import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.mergeSort; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanAssert.assertPlan; import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.collect; import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.exchange; import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.filter; import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.group; import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.limit; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.mergeSort; import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.output; import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.project; import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.rowNumber; @@ -50,6 +45,10 @@ import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.tableScan; import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.topKRanking; import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.window; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; public class WindowFunctionOptimizationTest { @Test @@ -326,8 +325,7 @@ public class WindowFunctionOptimizationTest { // Distributed plan: TopKRankingNode pushed down to each partition with limit push-down. // Fragment 0: OutputNode -> CollectNode -> ExchangeNodes - assertPlan( - planTester.getFragmentPlan(0), output(collect(exchange(), exchange(), exchange()))); + assertPlan(planTester.getFragmentPlan(0), output(collect(exchange(), exchange(), exchange()))); // Worker fragments: TopKRankingNode -> DeviceTableScanNode // Verify limit is pushed to DeviceTableScanNode and TopKRankingNode is marked for streaming. @@ -343,8 +341,7 @@ public class WindowFunctionOptimizationTest { PlanNode scanChild = topKNode.getChild(); assertNotNull("TopKRankingNode should have a child", scanChild); - assertTrue( - "Child should be DeviceTableScanNode", scanChild instanceof DeviceTableScanNode); + assertTrue("Child should be DeviceTableScanNode", scanChild instanceof DeviceTableScanNode); DeviceTableScanNode dts = (DeviceTableScanNode) scanChild; assertTrue("pushLimitToEachDevice should be true", dts.isPushLimitToEachDevice()); assertEquals("pushDownLimit should be 2", 2, dts.getPushDownLimit()); @@ -366,8 +363,7 @@ public class WindowFunctionOptimizationTest { // Distributed plan: TopKRankingNode eliminated since rn is not in the output. // Limit is pushed to DeviceTableScanNode. // Fragment 0: OutputNode -> CollectNode -> ExchangeNodes - assertPlan( - planTester.getFragmentPlan(0), output(collect(exchange(), exchange(), exchange()))); + assertPlan(planTester.getFragmentPlan(0), output(collect(exchange(), exchange(), exchange()))); // Worker fragments: ProjectNode -> DeviceTableScanNode (no TopKRankingNode) for (int i = 1; i <= 2; i++) { @@ -378,11 +374,9 @@ public class WindowFunctionOptimizationTest { assertPlan(planTester.getFragmentPlan(i), project(tableScan)); assertTrue( - "Fragment " + i + " root should be ProjectNode", - fragmentRoot instanceof ProjectNode); + "Fragment " + i + " root should be ProjectNode", fragmentRoot instanceof ProjectNode); PlanNode scanChild = fragmentRoot.getChildren().get(0); - assertTrue( - "Child should be DeviceTableScanNode", scanChild instanceof DeviceTableScanNode); + assertTrue("Child should be DeviceTableScanNode", scanChild instanceof DeviceTableScanNode); DeviceTableScanNode dts = (DeviceTableScanNode) scanChild; assertTrue("pushLimitToEachDevice should be true", dts.isPushLimitToEachDevice()); assertEquals("pushDownLimit should be 2", 2, dts.getPushDownLimit()); @@ -462,8 +456,7 @@ public class WindowFunctionOptimizationTest { // Distributed plan: RowNumberNode eliminated since rn is not in the output. // Limit (maxRowCountPerPartition=2) is pushed to each DeviceTableScanNode. - assertPlan( - planTester.getFragmentPlan(0), output(collect(exchange(), exchange(), exchange()))); + assertPlan(planTester.getFragmentPlan(0), output(collect(exchange(), exchange(), exchange()))); // Worker fragments: ProjectNode -> DeviceTableScanNode (no RowNumberNode) for (int i = 1; i <= 2; i++) { @@ -474,11 +467,9 @@ public class WindowFunctionOptimizationTest { assertPlan(planTester.getFragmentPlan(i), project(tableScan)); assertTrue( - "Fragment " + i + " root should be ProjectNode", - fragmentRoot instanceof ProjectNode); + "Fragment " + i + " root should be ProjectNode", fragmentRoot instanceof ProjectNode); PlanNode scanChild = fragmentRoot.getChildren().get(0); - assertTrue( - "Child should be DeviceTableScanNode", scanChild instanceof DeviceTableScanNode); + assertTrue("Child should be DeviceTableScanNode", scanChild instanceof DeviceTableScanNode); DeviceTableScanNode dts = (DeviceTableScanNode) scanChild; assertTrue("pushLimitToEachDevice should be true", dts.isPushLimitToEachDevice()); assertEquals("pushDownLimit should be 2", 2, dts.getPushDownLimit()); @@ -495,15 +486,15 @@ public class WindowFunctionOptimizationTest { LogicalQueryPlan logicalQueryPlan = planTester.createPlan(sql); PlanMatchPattern tableScan = tableScan("testdb.table1"); - // Logical plan: RowNumberNode with maxRowCount=2 (filter absorbed), no outer project removing rn + // Logical plan: RowNumberNode with maxRowCount=2 (filter absorbed), no outer project removing + // rn assertPlan(logicalQueryPlan, output(rowNumber(group(tableScan)))); // Worker fragments should still have RowNumberNode since rn IS in the output for (int i = 1; i <= 2; i++) { PlanNode fragmentRoot = planTester.getFragmentPlan(i); assertTrue( - "Fragment " + i + " root should be RowNumberNode", - fragmentRoot instanceof RowNumberNode); + "Fragment " + i + " root should be RowNumberNode", fragmentRoot instanceof RowNumberNode); } } @@ -548,9 +539,7 @@ public class WindowFunctionOptimizationTest { assertPlan(logicalQueryPlan, output(filter(topKRanking(group(tableScan))))); // Distributed plan: TopKRanking and filter pushed down - assertPlan( - planTester.getFragmentPlan(0), - output(collect(exchange(), exchange(), exchange()))); + assertPlan(planTester.getFragmentPlan(0), output(collect(exchange(), exchange(), exchange()))); assertPlan(planTester.getFragmentPlan(1), filter(topKRanking(tableScan))); assertPlan(planTester.getFragmentPlan(2), filter(topKRanking(tableScan))); }
