This is an automated email from the ASF dual-hosted git repository.

zhihao pushed a commit to branch perf/szh/push_limit_to_table_scan
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b6ab28cb36cf8a795f75a50c9a84ae7aeccb98ee
Author: Sh-Zh-7 <[email protected]>
AuthorDate: Tue Mar 3 10:24:25 2026 +0800

    Support push predicate through project optimization.
---
 .../it/db/it/IoTDBWindowFunction3IT.java           |  3 +-
 .../distribute/TableDistributedPlanGenerator.java  |  6 +--
 .../iterative/rule/PruneTopKRankingColumns.java    | 27 +++++++------
 .../iterative/rule/PushFilterIntoRowNumber.java    |  3 +-
 .../PushPredicateThroughProjectIntoRowNumber.java  |  9 ++---
 .../PushPredicateThroughProjectIntoWindow.java     | 17 ++++----
 .../relational/planner/node/TopKRankingNode.java   | 32 +++++++--------
 .../optimizations/LogicalOptimizeFactory.java      |  4 +-
 .../planner/WindowFunctionOptimizationTest.java    | 45 ++++++++--------------
 9 files changed, 65 insertions(+), 81 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowFunction3IT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowFunction3IT.java
index a0e23ea1996..60461e75b9a 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowFunction3IT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowFunction3IT.java
@@ -231,8 +231,7 @@ public class IoTDBWindowFunction3IT {
     String[] expectedHeader = new String[] {"time", "device", "value", "rn"};
     String[] retArray =
         new String[] {
-          "2021-01-01T09:05:00.000Z,d1,3.0,1,",
-          "2021-01-01T09:08:00.000Z,d2,2.0,1,",
+          "2021-01-01T09:05:00.000Z,d1,3.0,1,", 
"2021-01-01T09:08:00.000Z,d2,2.0,1,",
         };
     tableResultSetEqualTest(
         "SELECT * FROM (SELECT *, row_number() OVER (PARTITION BY device ORDER 
BY time ASC) as rn FROM demo) WHERE rn <= 1 ORDER BY device",
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
index 26d301605b2..2f62ebc63f8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
@@ -1908,8 +1908,7 @@ public class TableDistributedPlanGenerator
     if (parentRefs != null && !parentRefs.contains(node.getRowNumberSymbol())) 
{
       // If maxRowCountPerPartition is set, push it as a per-device limit to 
each
       // DeviceTableScanNode so that only the required number of rows are 
scanned.
-      node
-          .getMaxRowCountPerPartition()
+      node.getMaxRowCountPerPartition()
           .ifPresent(
               limit -> {
                 for (PlanNode child : childrenNodes) {
@@ -1930,8 +1929,7 @@ public class TableDistributedPlanGenerator
         return childrenNodes;
       } else {
         CollectNode collectNode =
-            new CollectNode(
-                queryId.genPlanNodeId(), 
childrenNodes.get(0).getOutputSymbols());
+            new CollectNode(queryId.genPlanNodeId(), 
childrenNodes.get(0).getOutputSymbols());
         childrenNodes.forEach(collectNode::addChild);
         return Collections.singletonList(collectNode);
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTopKRankingColumns.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTopKRankingColumns.java
index 5bd1ed47239..da49f3a04c6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTopKRankingColumns.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTopKRankingColumns.java
@@ -1,10 +1,11 @@
 package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule;
 
-import com.google.common.collect.Streams;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKRankingNode;
 
+import com.google.common.collect.Streams;
+
 import java.util.Optional;
 import java.util.Set;
 
@@ -12,23 +13,21 @@ import static 
com.google.common.collect.ImmutableSet.toImmutableSet;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.Util.restrictChildOutputs;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.topNRanking;
 
-public class PruneTopKRankingColumns
-    extends ProjectOffPushDownRule<TopKRankingNode>
-{
-  public PruneTopKRankingColumns()
-  {
+public class PruneTopKRankingColumns extends 
ProjectOffPushDownRule<TopKRankingNode> {
+  public PruneTopKRankingColumns() {
     super(topNRanking());
   }
 
   @Override
-  protected Optional<PlanNode> pushDownProjectOff(Context context, 
TopKRankingNode topNRankingNode, Set<Symbol> referencedOutputs)
-  {
-    Set<Symbol> requiredInputs = Streams.concat(
-            referencedOutputs.stream()
-                .filter(symbol -> 
!symbol.equals(topNRankingNode.getRankingSymbol())),
-            topNRankingNode.getSpecification().getPartitionBy().stream(),
-            
topNRankingNode.getSpecification().getOrderingScheme().get().getOrderBy().stream())
-        .collect(toImmutableSet());
+  protected Optional<PlanNode> pushDownProjectOff(
+      Context context, TopKRankingNode topNRankingNode, Set<Symbol> 
referencedOutputs) {
+    Set<Symbol> requiredInputs =
+        Streams.concat(
+                referencedOutputs.stream()
+                    .filter(symbol -> 
!symbol.equals(topNRankingNode.getRankingSymbol())),
+                topNRankingNode.getSpecification().getPartitionBy().stream(),
+                
topNRankingNode.getSpecification().getOrderingScheme().get().getOrderBy().stream())
+            .collect(toImmutableSet());
 
     return restrictChildOutputs(context.getIdAllocator(), topNRankingNode, 
requiredInputs);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushFilterIntoRowNumber.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushFilterIntoRowNumber.java
index 9f1514b199a..2aa90ba5670 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushFilterIntoRowNumber.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushFilterIntoRowNumber.java
@@ -67,7 +67,8 @@ public class PushFilterIntoRowNumber implements 
Rule<FilterNode> {
               source()
                   .matching(
                       rowNumber()
-                          .matching(rowNumber -> 
!rowNumber.getMaxRowCountPerPartition().isPresent())
+                          .matching(
+                              rowNumber -> 
!rowNumber.getMaxRowCountPerPartition().isPresent())
                           .capturedAs(CHILD)));
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushPredicateThroughProjectIntoRowNumber.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushPredicateThroughProjectIntoRowNumber.java
index 55e7da20675..f0dbae59778 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushPredicateThroughProjectIntoRowNumber.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushPredicateThroughProjectIntoRowNumber.java
@@ -46,8 +46,8 @@ import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patte
 import static 
org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Capture.newCapture;
 
 /**
- * Pushes a row-number upper-bound filter through an identity projection into 
{@link
- * RowNumberNode} by setting {@code maxRowCountPerPartition}.
+ * Pushes a row-number upper-bound filter through an identity projection into 
{@link RowNumberNode}
+ * by setting {@code maxRowCountPerPartition}.
  *
  * <p>Before:
  *
@@ -175,9 +175,8 @@ public class PushPredicateThroughProjectIntoRowNumber 
implements Rule<FilterNode
   /**
    * For {@code LESS_THAN} and {@code LESS_THAN_OR_EQUAL}, the RowNumberNode 
with
    * maxRowCountPerPartition produces exactly the rows that satisfy the 
predicate (row numbers
-   * 1..N), so the filter can be removed. For {@code EQUAL} (e.g. {@code rn = 
5}), the
-   * RowNumberNode produces rows 1..5 but only rows where {@code rn = 5} are 
wanted, so the filter
-   * must be kept.
+   * 1..N), so the filter can be removed. For {@code EQUAL} (e.g. {@code rn = 
5}), the RowNumberNode
+   * produces rows 1..5 but only rows where {@code rn = 5} are wanted, so the 
filter must be kept.
    */
   private static boolean needToKeepFilter(Expression predicate) {
     if (!(predicate instanceof ComparisonExpression)) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushPredicateThroughProjectIntoWindow.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushPredicateThroughProjectIntoWindow.java
index aa401de2a46..21c17f7dced 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushPredicateThroughProjectIntoWindow.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushPredicateThroughProjectIntoWindow.java
@@ -98,8 +98,7 @@ public class PushPredicateThroughProjectIntoWindow implements 
Rule<FilterNode> {
                                     .matching(
                                         window()
                                             .matching(
-                                                window ->
-                                                    
toTopNRankingType(window).isPresent())
+                                                window -> 
toTopNRankingType(window).isPresent())
                                             .capturedAs(WINDOW)))));
   }
 
@@ -118,8 +117,7 @@ public class PushPredicateThroughProjectIntoWindow 
implements Rule<FilterNode> {
       return Result.empty();
     }
 
-    OptionalInt upperBound =
-        extractUpperBoundFromComparison(filter.getPredicate(), rankingSymbol);
+    OptionalInt upperBound = 
extractUpperBoundFromComparison(filter.getPredicate(), rankingSymbol);
     if (!upperBound.isPresent()) {
       return Result.empty();
     }
@@ -149,8 +147,7 @@ public class PushPredicateThroughProjectIntoWindow 
implements Rule<FilterNode> {
     return Result.ofPlanNode(project);
   }
 
-  private OptionalInt extractUpperBoundFromComparison(
-      Expression predicate, Symbol rankingSymbol) {
+  private OptionalInt extractUpperBoundFromComparison(Expression predicate, 
Symbol rankingSymbol) {
     if (!(predicate instanceof ComparisonExpression)) {
       return OptionalInt.empty();
     }
@@ -188,10 +185,10 @@ public class PushPredicateThroughProjectIntoWindow 
implements Rule<FilterNode> {
   }
 
   /**
-   * For {@code LESS_THAN} and {@code LESS_THAN_OR_EQUAL}, the TopKRankingNode 
produces exactly
-   * the rows that satisfy the predicate (ranking values 1..N), so the filter 
can be removed. For
-   * {@code EQUAL} (e.g. {@code rn = 5}), TopKRankingNode produces rows 1..5 
but only rows where
-   * {@code rn = 5} are wanted, so the filter must be kept.
+   * For {@code LESS_THAN} and {@code LESS_THAN_OR_EQUAL}, the TopKRankingNode 
produces exactly the
+   * rows that satisfy the predicate (ranking values 1..N), so the filter can 
be removed. For {@code
+   * EQUAL} (e.g. {@code rn = 5}), TopKRankingNode produces rows 1..5 but only 
rows where {@code rn
+   * = 5} are wanted, so the filter must be kept.
    */
   private static boolean needToKeepFilter(Expression predicate) {
     if (!(predicate instanceof ComparisonExpression)) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TopKRankingNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TopKRankingNode.java
index 1dc799611ce..3b48ec48523 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TopKRankingNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TopKRankingNode.java
@@ -90,13 +90,14 @@ public class TopKRankingNode extends SingleChildProcessNode 
{
 
   @Override
   public PlanNode clone() {
-    TopKRankingNode topKRankingNode = new TopKRankingNode(
-        getPlanNodeId(),
-        specification,
-        rankingType,
-        rankingSymbol,
-        maxRankingPerPartition,
-        partial);
+    TopKRankingNode topKRankingNode =
+        new TopKRankingNode(
+            getPlanNodeId(),
+            specification,
+            rankingType,
+            rankingSymbol,
+            maxRankingPerPartition,
+            partial);
     topKRankingNode.setDataPreSortedAndLimited(dataPreSortedAndLimited);
     return topKRankingNode;
   }
@@ -188,14 +189,15 @@ public class TopKRankingNode extends 
SingleChildProcessNode {
 
   @Override
   public PlanNode replaceChildren(List<PlanNode> newChildren) {
-    TopKRankingNode topKRankingNode = new TopKRankingNode(
-        id,
-        Iterables.getOnlyElement(newChildren),
-        specification,
-        rankingType,
-        rankingSymbol,
-        maxRankingPerPartition,
-        partial);
+    TopKRankingNode topKRankingNode =
+        new TopKRankingNode(
+            id,
+            Iterables.getOnlyElement(newChildren),
+            specification,
+            rankingType,
+            rankingSymbol,
+            maxRankingPerPartition,
+            partial);
     topKRankingNode.setDataPreSortedAndLimited(dataPreSortedAndLimited);
     return topKRankingNode;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java
index b3f398089a1..c5047f80468 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java
@@ -80,11 +80,11 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.Pr
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PushDownFilterIntoWindow;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PushDownLimitIntoWindow;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PushFilterIntoRowNumber;
-import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PushPredicateThroughProjectIntoRowNumber;
-import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PushPredicateThroughProjectIntoWindow;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PushLimitThroughOffset;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PushLimitThroughProject;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PushLimitThroughUnion;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PushPredicateThroughProjectIntoRowNumber;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PushPredicateThroughProjectIntoWindow;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PushProjectionThroughUnion;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PushTopKThroughUnion;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.RemoveDuplicateConditions;
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/WindowFunctionOptimizationTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/WindowFunctionOptimizationTest.java
index aed16f77c6e..0dccd601ac6 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/WindowFunctionOptimizationTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/WindowFunctionOptimizationTest.java
@@ -31,18 +31,13 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import org.junit.Test;
 
-import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.mergeSort;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanAssert.assertPlan;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.collect;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.exchange;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.filter;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.group;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.limit;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.mergeSort;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.output;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.project;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.rowNumber;
@@ -50,6 +45,10 @@ import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.tableScan;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.topKRanking;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.window;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 public class WindowFunctionOptimizationTest {
   @Test
@@ -326,8 +325,7 @@ public class WindowFunctionOptimizationTest {
 
     // Distributed plan: TopKRankingNode pushed down to each partition with 
limit push-down.
     // Fragment 0: OutputNode -> CollectNode -> ExchangeNodes
-    assertPlan(
-        planTester.getFragmentPlan(0), output(collect(exchange(), exchange(), 
exchange())));
+    assertPlan(planTester.getFragmentPlan(0), output(collect(exchange(), 
exchange(), exchange())));
 
     // Worker fragments: TopKRankingNode -> DeviceTableScanNode
     // Verify limit is pushed to DeviceTableScanNode and TopKRankingNode is 
marked for streaming.
@@ -343,8 +341,7 @@ public class WindowFunctionOptimizationTest {
 
       PlanNode scanChild = topKNode.getChild();
       assertNotNull("TopKRankingNode should have a child", scanChild);
-      assertTrue(
-          "Child should be DeviceTableScanNode", scanChild instanceof 
DeviceTableScanNode);
+      assertTrue("Child should be DeviceTableScanNode", scanChild instanceof 
DeviceTableScanNode);
       DeviceTableScanNode dts = (DeviceTableScanNode) scanChild;
       assertTrue("pushLimitToEachDevice should be true", 
dts.isPushLimitToEachDevice());
       assertEquals("pushDownLimit should be 2", 2, dts.getPushDownLimit());
@@ -366,8 +363,7 @@ public class WindowFunctionOptimizationTest {
     // Distributed plan: TopKRankingNode eliminated since rn is not in the 
output.
     // Limit is pushed to DeviceTableScanNode.
     // Fragment 0: OutputNode -> CollectNode -> ExchangeNodes
-    assertPlan(
-        planTester.getFragmentPlan(0), output(collect(exchange(), exchange(), 
exchange())));
+    assertPlan(planTester.getFragmentPlan(0), output(collect(exchange(), 
exchange(), exchange())));
 
     // Worker fragments: ProjectNode -> DeviceTableScanNode (no 
TopKRankingNode)
     for (int i = 1; i <= 2; i++) {
@@ -378,11 +374,9 @@ public class WindowFunctionOptimizationTest {
       assertPlan(planTester.getFragmentPlan(i), project(tableScan));
 
       assertTrue(
-          "Fragment " + i + " root should be ProjectNode",
-          fragmentRoot instanceof ProjectNode);
+          "Fragment " + i + " root should be ProjectNode", fragmentRoot 
instanceof ProjectNode);
       PlanNode scanChild = fragmentRoot.getChildren().get(0);
-      assertTrue(
-          "Child should be DeviceTableScanNode", scanChild instanceof 
DeviceTableScanNode);
+      assertTrue("Child should be DeviceTableScanNode", scanChild instanceof 
DeviceTableScanNode);
       DeviceTableScanNode dts = (DeviceTableScanNode) scanChild;
       assertTrue("pushLimitToEachDevice should be true", 
dts.isPushLimitToEachDevice());
       assertEquals("pushDownLimit should be 2", 2, dts.getPushDownLimit());
@@ -462,8 +456,7 @@ public class WindowFunctionOptimizationTest {
 
     // Distributed plan: RowNumberNode eliminated since rn is not in the 
output.
     // Limit (maxRowCountPerPartition=2) is pushed to each DeviceTableScanNode.
-    assertPlan(
-        planTester.getFragmentPlan(0), output(collect(exchange(), exchange(), 
exchange())));
+    assertPlan(planTester.getFragmentPlan(0), output(collect(exchange(), 
exchange(), exchange())));
 
     // Worker fragments: ProjectNode -> DeviceTableScanNode (no RowNumberNode)
     for (int i = 1; i <= 2; i++) {
@@ -474,11 +467,9 @@ public class WindowFunctionOptimizationTest {
       assertPlan(planTester.getFragmentPlan(i), project(tableScan));
 
       assertTrue(
-          "Fragment " + i + " root should be ProjectNode",
-          fragmentRoot instanceof ProjectNode);
+          "Fragment " + i + " root should be ProjectNode", fragmentRoot 
instanceof ProjectNode);
       PlanNode scanChild = fragmentRoot.getChildren().get(0);
-      assertTrue(
-          "Child should be DeviceTableScanNode", scanChild instanceof 
DeviceTableScanNode);
+      assertTrue("Child should be DeviceTableScanNode", scanChild instanceof 
DeviceTableScanNode);
       DeviceTableScanNode dts = (DeviceTableScanNode) scanChild;
       assertTrue("pushLimitToEachDevice should be true", 
dts.isPushLimitToEachDevice());
       assertEquals("pushDownLimit should be 2", 2, dts.getPushDownLimit());
@@ -495,15 +486,15 @@ public class WindowFunctionOptimizationTest {
     LogicalQueryPlan logicalQueryPlan = planTester.createPlan(sql);
     PlanMatchPattern tableScan = tableScan("testdb.table1");
 
-    // Logical plan: RowNumberNode with maxRowCount=2 (filter absorbed), no 
outer project removing rn
+    // Logical plan: RowNumberNode with maxRowCount=2 (filter absorbed), no 
outer project removing
+    // rn
     assertPlan(logicalQueryPlan, output(rowNumber(group(tableScan))));
 
     // Worker fragments should still have RowNumberNode since rn IS in the 
output
     for (int i = 1; i <= 2; i++) {
       PlanNode fragmentRoot = planTester.getFragmentPlan(i);
       assertTrue(
-          "Fragment " + i + " root should be RowNumberNode",
-          fragmentRoot instanceof RowNumberNode);
+          "Fragment " + i + " root should be RowNumberNode", fragmentRoot 
instanceof RowNumberNode);
     }
   }
 
@@ -548,9 +539,7 @@ public class WindowFunctionOptimizationTest {
     assertPlan(logicalQueryPlan, 
output(filter(topKRanking(group(tableScan)))));
 
     // Distributed plan: TopKRanking and filter pushed down
-    assertPlan(
-        planTester.getFragmentPlan(0),
-        output(collect(exchange(), exchange(), exchange())));
+    assertPlan(planTester.getFragmentPlan(0), output(collect(exchange(), 
exchange(), exchange())));
     assertPlan(planTester.getFragmentPlan(1), filter(topKRanking(tableScan)));
     assertPlan(planTester.getFragmentPlan(2), filter(topKRanking(tableScan)));
   }

Reply via email to