This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c80b8fd9aa [Multi-stage] Clean up unnecessary checks in rules (#14066)
c80b8fd9aa is described below

commit c80b8fd9aae8af28f97f62beb07721ac1335a394
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Tue Sep 24 11:34:50 2024 -0700

    [Multi-stage] Clean up unnecessary checks in rules (#14066)
---
 .../PinotAggregateExchangeNodeInsertRule.java      |   2 +
 .../rel/rules/PinotAggregateToSemiJoinRule.java    |  19 +---
 .../rel/rules/PinotEvaluateLiteralRule.java        |   2 +-
 .../rel/rules/PinotExchangeEliminationRule.java    |  12 ++-
 .../rel/rules/PinotFilterExpandSearchRule.java     |  16 +---
 .../rel/rules/PinotJoinExchangeNodeInsertRule.java |  21 ++---
 .../rel/rules/PinotJoinToDynamicBroadcastRule.java |  45 ++++------
 .../rel/rules/PinotRelDistributionTraitRule.java   |   5 --
 .../rules/PinotSetOpExchangeNodeInsertRule.java    |  43 ++-------
 .../rules/PinotSingleValueAggregateRemoveRule.java |  18 ++--
 .../rel/rules/PinotSortExchangeNodeInsertRule.java |  25 ++----
 .../rules/PinotWindowExchangeNodeInsertRule.java   | 100 ++++++++++-----------
 12 files changed, 110 insertions(+), 198 deletions(-)

diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotAggregateExchangeNodeInsertRule.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotAggregateExchangeNodeInsertRule.java
index 9d7b821123..1c66f9a648 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotAggregateExchangeNodeInsertRule.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotAggregateExchangeNodeInsertRule.java
@@ -88,6 +88,8 @@ public class PinotAggregateExchangeNodeInsertRule extends 
RelOptRule {
       new 
PinotAggregateExchangeNodeInsertRule(PinotRuleUtils.PINOT_REL_FACTORY);
 
   public PinotAggregateExchangeNodeInsertRule(RelBuilderFactory factory) {
+    // NOTE: Explicitly match for LogicalAggregate because after applying the 
rule, LogicalAggregate is replaced with
+    //       PinotLogicalAggregate, and the rule won't be applied again.
     super(operand(LogicalAggregate.class, any()), factory, null);
   }
 
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotAggregateToSemiJoinRule.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotAggregateToSemiJoinRule.java
index e93609a38a..327921df71 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotAggregateToSemiJoinRule.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotAggregateToSemiJoinRule.java
@@ -29,7 +29,6 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.JoinInfo;
-import org.apache.calcite.rel.logical.LogicalAggregate;
 import org.apache.calcite.rel.rules.CoreRules;
 import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexNode;
@@ -40,8 +39,7 @@ import org.apache.calcite.util.ImmutableIntList;
 
 
 /**
- * SemiJoinRule that matches an Aggregate on top of a Join with an Aggregate
- * as its right child.
+ * SemiJoinRule that matches an Aggregate on top of a Join with an Aggregate 
as its right child.
  *
  * @see CoreRules#PROJECT_TO_SEMI_JOIN
  */
@@ -50,18 +48,9 @@ public class PinotAggregateToSemiJoinRule extends RelOptRule 
{
       new PinotAggregateToSemiJoinRule(PinotRuleUtils.PINOT_REL_FACTORY);
 
   public PinotAggregateToSemiJoinRule(RelBuilderFactory factory) {
-    super(operand(LogicalAggregate.class, any()), factory, null);
-  }
-
-  @Override
-  @SuppressWarnings("rawtypes")
-  public boolean matches(RelOptRuleCall call) {
-    final Aggregate topAgg = call.rel(0);
-    if (!PinotRuleUtils.isJoin(topAgg.getInput())) {
-      return false;
-    }
-    final Join join = (Join) PinotRuleUtils.unboxRel(topAgg.getInput());
-    return PinotRuleUtils.isAggregate(join.getInput(1));
+    super(operand(Aggregate.class,
+            some(operand(Join.class, some(operand(RelNode.class, any()), 
operand(Aggregate.class, any()))))), factory,
+        null);
   }
 
   @Override
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotEvaluateLiteralRule.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotEvaluateLiteralRule.java
index 1d7f15ec5d..12aaef1d97 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotEvaluateLiteralRule.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotEvaluateLiteralRule.java
@@ -97,7 +97,7 @@ public class PinotEvaluateLiteralRule {
       }
       castedNewProjects.add(newNode);
     }
-    return needCast ? LogicalProject.create(oldProject.getInput(), 
oldProject.getHints(), castedNewProjects,
+    return needCast ? oldProject.copy(oldProject.getTraitSet(), 
oldProject.getInput(), castedNewProjects,
         oldProject.getRowType()) : newProject;
   }
 
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotExchangeEliminationRule.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotExchangeEliminationRule.java
index e6ba7b9c51..5c8dfb0ba0 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotExchangeEliminationRule.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotExchangeEliminationRule.java
@@ -18,11 +18,12 @@
  */
 package org.apache.pinot.calcite.rel.rules;
 
-import java.util.Collections;
+import java.util.List;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Exchange;
 import org.apache.calcite.tools.RelBuilderFactory;
 import org.apache.pinot.calcite.rel.logical.PinotLogicalExchange;
 
@@ -36,17 +37,14 @@ public class PinotExchangeEliminationRule extends 
RelOptRule {
       new PinotExchangeEliminationRule(PinotRuleUtils.PINOT_REL_FACTORY);
 
   public PinotExchangeEliminationRule(RelBuilderFactory factory) {
-    super(operand(PinotLogicalExchange.class,
-        some(operand(PinotLogicalExchange.class, some(operand(RelNode.class, 
any()))))), factory, null);
+    super(operand(Exchange.class, some(operand(Exchange.class, 
some(operand(RelNode.class, any()))))), factory, null);
   }
 
   @Override
   public void onMatch(RelOptRuleCall call) {
-    PinotLogicalExchange exchange0 = call.rel(0);
-    PinotLogicalExchange exchange1 = call.rel(1);
+    Exchange exchange0 = call.rel(0);
     RelNode input = call.rel(2);
     // convert the call to skip the exchange.
-    RelNode rel = exchange0.copy(input.getTraitSet(), 
Collections.singletonList(input));
-    call.transformTo(rel);
+    call.transformTo(exchange0.copy(input.getTraitSet(), List.of(input)));
   }
 }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotFilterExpandSearchRule.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotFilterExpandSearchRule.java
index 1a7e00f6bb..1f34913c19 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotFilterExpandSearchRule.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotFilterExpandSearchRule.java
@@ -21,7 +21,6 @@ package org.apache.pinot.calcite.rel.rules;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.rel.core.Filter;
-import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexUtil;
@@ -33,27 +32,20 @@ public class PinotFilterExpandSearchRule extends RelOptRule 
{
       new PinotFilterExpandSearchRule(PinotRuleUtils.PINOT_REL_FACTORY);
 
   public PinotFilterExpandSearchRule(RelBuilderFactory factory) {
-    super(operand(LogicalFilter.class, any()), factory, null);
+    super(operand(Filter.class, any()), factory, null);
   }
 
   @Override
-  @SuppressWarnings("rawtypes")
   public boolean matches(RelOptRuleCall call) {
-    if (call.rels.length < 1) {
-      return false;
-    }
-    if (call.rel(0) instanceof Filter) {
-      Filter filter = call.rel(0);
-      return containsRangeSearch(filter.getCondition());
-    }
-    return false;
+    Filter filter = call.rel(0);
+    return containsRangeSearch(filter.getCondition());
   }
 
   @Override
   public void onMatch(RelOptRuleCall call) {
     Filter filter = call.rel(0);
     RexNode newCondition = 
RexUtil.expandSearch(filter.getCluster().getRexBuilder(), null, 
filter.getCondition());
-    call.transformTo(LogicalFilter.create(filter.getInput(), newCondition));
+    call.transformTo(filter.copy(filter.getTraitSet(), filter.getInput(), 
newCondition));
   }
 
   private boolean containsRangeSearch(RexNode condition) {
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java
index 1b485551f8..37f12fbd08 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java
@@ -18,14 +18,12 @@
  */
 package org.apache.pinot.calcite.rel.rules;
 
-import com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.rel.RelDistributions;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.JoinInfo;
-import org.apache.calcite.rel.logical.LogicalJoin;
 import org.apache.calcite.tools.RelBuilderFactory;
 import org.apache.pinot.calcite.rel.logical.PinotLogicalExchange;
 
@@ -38,19 +36,13 @@ public class PinotJoinExchangeNodeInsertRule extends 
RelOptRule {
       new PinotJoinExchangeNodeInsertRule(PinotRuleUtils.PINOT_REL_FACTORY);
 
   public PinotJoinExchangeNodeInsertRule(RelBuilderFactory factory) {
-    super(operand(LogicalJoin.class, any()), factory, null);
+    super(operand(Join.class, any()), factory, null);
   }
 
   @Override
   public boolean matches(RelOptRuleCall call) {
-    if (call.rels.length < 1) {
-      return false;
-    }
-    if (call.rel(0) instanceof Join) {
-      Join join = call.rel(0);
-      return !PinotRuleUtils.isExchange(join.getLeft()) && 
!PinotRuleUtils.isExchange(join.getRight());
-    }
-    return false;
+    Join join = call.rel(0);
+    return !PinotRuleUtils.isExchange(join.getLeft()) && 
!PinotRuleUtils.isExchange(join.getRight());
   }
 
   @Override
@@ -73,10 +65,7 @@ public class PinotJoinExchangeNodeInsertRule extends 
RelOptRule {
       rightExchange = PinotLogicalExchange.create(rightInput, 
RelDistributions.hash(joinInfo.rightKeys));
     }
 
-    RelNode newJoinNode =
-        new LogicalJoin(join.getCluster(), join.getTraitSet(), 
join.getHints(), leftExchange, rightExchange,
-            join.getCondition(), join.getVariablesSet(), join.getJoinType(), 
join.isSemiJoinDone(),
-            ImmutableList.copyOf(join.getSystemFieldList()));
-    call.transformTo(newJoinNode);
+    call.transformTo(join.copy(join.getTraitSet(), join.getCondition(), 
leftExchange, rightExchange, join.getJoinType(),
+        join.isSemiJoinDone()));
   }
 }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinToDynamicBroadcastRule.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinToDynamicBroadcastRule.java
index f8bbeb63bf..ed86a6fcc2 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinToDynamicBroadcastRule.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinToDynamicBroadcastRule.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.calcite.rel.rules;
 
-import com.google.common.collect.ImmutableList;
 import java.util.Collections;
 import java.util.List;
 import org.apache.calcite.plan.RelOptRule;
@@ -31,7 +30,6 @@ import org.apache.calcite.rel.core.Exchange;
 import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.JoinInfo;
 import org.apache.calcite.rel.core.JoinRelType;
-import org.apache.calcite.rel.logical.LogicalJoin;
 import org.apache.calcite.tools.RelBuilderFactory;
 import org.apache.pinot.calcite.rel.hint.PinotHintOptions;
 import org.apache.pinot.calcite.rel.hint.PinotHintStrategyTable;
@@ -121,27 +119,23 @@ public class PinotJoinToDynamicBroadcastRule extends 
RelOptRule {
       new PinotJoinToDynamicBroadcastRule(PinotRuleUtils.PINOT_REL_FACTORY);
 
   public PinotJoinToDynamicBroadcastRule(RelBuilderFactory factory) {
-    super(operand(LogicalJoin.class, any()), factory, null);
+    super(operand(Join.class, any()), factory, null);
   }
 
   @Override
   public boolean matches(RelOptRuleCall call) {
-    if (call.rels.length < 1 || !(call.rel(0) instanceof Join)) {
-      return false;
-    }
     Join join = call.rel(0);
-    String joinStrategyString = 
PinotHintStrategyTable.getHintOption(join.getHints(),
-        PinotHintOptions.JOIN_HINT_OPTIONS, 
PinotHintOptions.JoinHintOptions.JOIN_STRATEGY);
-    List<String> joinStrategies = joinStrategyString != null ? 
StringUtils.split(joinStrategyString, ",")
-        : Collections.emptyList();
-    boolean explicitOtherStrategy = joinStrategies.size() > 0
-        && 
!joinStrategies.contains(PinotHintOptions.JoinHintOptions.DYNAMIC_BROADCAST_JOIN_STRATEGY);
+    String joinStrategyString =
+        PinotHintStrategyTable.getHintOption(join.getHints(), 
PinotHintOptions.JOIN_HINT_OPTIONS,
+            PinotHintOptions.JoinHintOptions.JOIN_STRATEGY);
+    List<String> joinStrategies =
+        joinStrategyString != null ? StringUtils.split(joinStrategyString, 
",") : Collections.emptyList();
+    boolean explicitOtherStrategy = !joinStrategies.isEmpty() && 
!joinStrategies.contains(
+        PinotHintOptions.JoinHintOptions.DYNAMIC_BROADCAST_JOIN_STRATEGY);
 
     JoinInfo joinInfo = join.analyzeCondition();
-    RelNode left = join.getLeft() instanceof HepRelVertex ? ((HepRelVertex) 
join.getLeft()).getCurrentRel()
-        : join.getLeft();
-    RelNode right = join.getRight() instanceof HepRelVertex ? ((HepRelVertex) 
join.getRight()).getCurrentRel()
-        : join.getRight();
+    RelNode left = ((HepRelVertex) join.getLeft()).getCurrentRel();
+    RelNode right = ((HepRelVertex) join.getRight()).getCurrentRel();
     return left instanceof Exchange && right instanceof Exchange
         // left side can be pushed as dynamic exchange
         && PinotRuleUtils.canPushDynamicBroadcastToLeaf(left.getInput(0))
@@ -155,16 +149,15 @@ public class PinotJoinToDynamicBroadcastRule extends 
RelOptRule {
   @Override
   public void onMatch(RelOptRuleCall call) {
     Join join = call.rel(0);
-    PinotLogicalExchange left = (PinotLogicalExchange) (join.getLeft() 
instanceof HepRelVertex
-        ? ((HepRelVertex) join.getLeft()).getCurrentRel() : join.getLeft());
-    PinotLogicalExchange right = (PinotLogicalExchange) (join.getRight() 
instanceof HepRelVertex
-        ? ((HepRelVertex) join.getRight()).getCurrentRel() : join.getRight());
+    Exchange left = (Exchange) ((HepRelVertex) join.getLeft()).getCurrentRel();
+    Exchange right = (Exchange) ((HepRelVertex) 
join.getRight()).getCurrentRel();
 
     // when colocated join hint is given, dynamic broadcast exchange can be 
hash-distributed b/c
     //    1. currently, dynamic broadcast only works against main table off 
leaf-stage; (e.g. receive node on leaf)
     //    2. when hash key are the same but hash functions are different, it 
can be done via normal hash shuffle.
-    boolean isColocatedJoin = 
PinotHintStrategyTable.isHintOptionTrue(join.getHints(),
-        PinotHintOptions.JOIN_HINT_OPTIONS, 
PinotHintOptions.JoinHintOptions.IS_COLOCATED_BY_JOIN_KEYS);
+    boolean isColocatedJoin =
+        PinotHintStrategyTable.isHintOptionTrue(join.getHints(), 
PinotHintOptions.JOIN_HINT_OPTIONS,
+            PinotHintOptions.JoinHintOptions.IS_COLOCATED_BY_JOIN_KEYS);
     PinotLogicalExchange dynamicBroadcastExchange;
     RelNode rightInput = right.getInput();
     if (isColocatedJoin) {
@@ -174,10 +167,8 @@ public class PinotJoinToDynamicBroadcastRule extends 
RelOptRule {
       RelDistribution dist = RelDistributions.BROADCAST_DISTRIBUTED;
       dynamicBroadcastExchange = PinotLogicalExchange.create(rightInput, dist, 
PinotRelExchangeType.PIPELINE_BREAKER);
     }
-    Join dynamicFilterJoin =
-        new LogicalJoin(join.getCluster(), join.getTraitSet(), 
left.getInput(), dynamicBroadcastExchange,
-            join.getCondition(), join.getVariablesSet(), join.getJoinType(), 
join.isSemiJoinDone(),
-            ImmutableList.copyOf(join.getSystemFieldList()));
-    call.transformTo(dynamicFilterJoin);
+
+    call.transformTo(join.copy(join.getTraitSet(), join.getCondition(), 
left.getInput(), dynamicBroadcastExchange,
+        join.getJoinType(), join.isSemiJoinDone()));
   }
 }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotRelDistributionTraitRule.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotRelDistributionTraitRule.java
index f2aa72da84..22a37a5f91 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotRelDistributionTraitRule.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotRelDistributionTraitRule.java
@@ -68,11 +68,6 @@ public class PinotRelDistributionTraitRule extends 
RelOptRule {
     super(operand(RelNode.class, any()), factory, null);
   }
 
-  @Override
-  public boolean matches(RelOptRuleCall call) {
-    return call.rels.length >= 1;
-  }
-
   @Override
   public void onMatch(RelOptRuleCall call) {
     RelNode current = call.rel(0);
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotSetOpExchangeNodeInsertRule.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotSetOpExchangeNodeInsertRule.java
index 425fe09335..3db58c048c 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotSetOpExchangeNodeInsertRule.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotSetOpExchangeNodeInsertRule.java
@@ -20,17 +20,13 @@ package org.apache.pinot.calcite.rel.rules;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.rel.RelDistributions;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.SetOp;
-import org.apache.calcite.rel.logical.LogicalIntersect;
-import org.apache.calcite.rel.logical.LogicalMinus;
-import org.apache.calcite.rel.logical.LogicalUnion;
 import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.calcite.util.ImmutableIntList;
 import org.apache.pinot.calcite.rel.logical.PinotLogicalExchange;
 
 
@@ -47,41 +43,20 @@ public class PinotSetOpExchangeNodeInsertRule extends 
RelOptRule {
 
   @Override
   public boolean matches(RelOptRuleCall call) {
-    if (call.rels.length < 1) {
-      return false;
-    }
-    if (call.rel(0) instanceof SetOp) {
-      SetOp setOp = call.rel(0);
-      for (RelNode input : setOp.getInputs()) {
-        if (PinotRuleUtils.isExchange(input)) {
-          return false;
-        }
-      }
-      return true;
-    }
-    return false;
+    SetOp setOp = call.rel(0);
+    return !PinotRuleUtils.isExchange(setOp.getInput(0));
   }
 
   @Override
   public void onMatch(RelOptRuleCall call) {
     SetOp setOp = call.rel(0);
-    List<RelNode> newInputs = new ArrayList<>();
-    List<Integer> hashFields =
-        IntStream.range(0, 
setOp.getRowType().getFieldCount()).boxed().collect(Collectors.toCollection(ArrayList::new));
-    for (RelNode input : setOp.getInputs()) {
-      RelNode exchange = PinotLogicalExchange.create(input, 
RelDistributions.hash(hashFields));
+    List<RelNode> inputs = setOp.getInputs();
+    List<RelNode> newInputs = new ArrayList<>(inputs.size());
+    for (RelNode input : inputs) {
+      RelNode exchange = PinotLogicalExchange.create(input,
+          RelDistributions.hash(ImmutableIntList.range(0, 
setOp.getRowType().getFieldCount())));
       newInputs.add(exchange);
     }
-    SetOp newSetOpNode;
-    if (setOp instanceof LogicalUnion) {
-      newSetOpNode = new LogicalUnion(setOp.getCluster(), setOp.getTraitSet(), 
newInputs, setOp.all);
-    } else if (setOp instanceof LogicalIntersect) {
-      newSetOpNode = new LogicalIntersect(setOp.getCluster(), 
setOp.getTraitSet(), newInputs, setOp.all);
-    } else if (setOp instanceof LogicalMinus) {
-      newSetOpNode = new LogicalMinus(setOp.getCluster(), setOp.getTraitSet(), 
newInputs, setOp.all);
-    } else {
-      throw new UnsupportedOperationException("Unsupported set op node: " + 
setOp);
-    }
-    call.transformTo(newSetOpNode);
+    call.transformTo(setOp.copy(setOp.getTraitSet(), newInputs));
   }
 }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotSingleValueAggregateRemoveRule.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotSingleValueAggregateRemoveRule.java
index 29ef717136..de271cc69a 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotSingleValueAggregateRemoveRule.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotSingleValueAggregateRemoveRule.java
@@ -18,13 +18,12 @@
  */
 package org.apache.pinot.calcite.rel.rules;
 
+import java.util.List;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.hep.HepRelVertex;
-import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.logical.LogicalAggregate;
 import org.apache.calcite.tools.RelBuilderFactory;
 
 
@@ -37,23 +36,22 @@ public class PinotSingleValueAggregateRemoveRule extends 
RelOptRule {
       new 
PinotSingleValueAggregateRemoveRule(PinotRuleUtils.PINOT_REL_FACTORY);
 
   public PinotSingleValueAggregateRemoveRule(RelBuilderFactory factory) {
-    super(operand(LogicalAggregate.class, any()), factory, null);
+    super(operand(Aggregate.class, any()), factory, null);
   }
 
   @Override
   public boolean matches(RelOptRuleCall call) {
-    final Aggregate agg = call.rel(0);
-    if (agg.getAggCallList().size() != 1) {
+    Aggregate agg = call.rel(0);
+    List<AggregateCall> aggCalls = agg.getAggCallList();
+    if (aggCalls.size() != 1) {
       return false;
     }
-    final AggregateCall aggCall = agg.getAggCallList().get(0);
-    return aggCall.getAggregation().getName().equals("SINGLE_VALUE");
+    return aggCalls.get(0).getAggregation().getName().equals("SINGLE_VALUE");
   }
 
   @Override
   public void onMatch(RelOptRuleCall call) {
-    final Aggregate agg = call.rel(0);
-    final RelNode input = ((HepRelVertex) agg.getInput()).getCurrentRel();
-    call.transformTo(input);
+    Aggregate agg = call.rel(0);
+    call.transformTo(((HepRelVertex) agg.getInput()).getCurrentRel());
   }
 }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotSortExchangeNodeInsertRule.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotSortExchangeNodeInsertRule.java
index 2536b0ed01..1069934c2f 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotSortExchangeNodeInsertRule.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotSortExchangeNodeInsertRule.java
@@ -23,7 +23,6 @@ import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.rel.RelDistributions;
 import org.apache.calcite.rel.core.Sort;
-import org.apache.calcite.rel.logical.LogicalSort;
 import org.apache.calcite.tools.RelBuilderFactory;
 import org.apache.pinot.calcite.rel.logical.PinotLogicalSortExchange;
 
@@ -44,19 +43,13 @@ public class PinotSortExchangeNodeInsertRule extends 
RelOptRule {
       new PinotSortExchangeNodeInsertRule(PinotRuleUtils.PINOT_REL_FACTORY);
 
   public PinotSortExchangeNodeInsertRule(RelBuilderFactory factory) {
-    super(operand(LogicalSort.class, any()), factory, null);
+    super(operand(Sort.class, any()), factory, null);
   }
 
   @Override
   public boolean matches(RelOptRuleCall call) {
-    if (call.rels.length < 1) {
-      return false;
-    }
-    if (call.rel(0) instanceof Sort) {
-      Sort sort = call.rel(0);
-      return !PinotRuleUtils.isExchange(sort.getInput());
-    }
-    return false;
+    Sort sort = call.rel(0);
+    return !PinotRuleUtils.isExchange(sort.getInput());
   }
 
   @Override
@@ -65,12 +58,10 @@ public class PinotSortExchangeNodeInsertRule extends 
RelOptRule {
     // TODO: Assess whether sorting is needed on both sender and receiver side 
or only receiver side. Potentially add
     //       SqlHint support to determine this. For now setting sort only on 
receiver side as sender side sorting is
     //       not yet implemented.
-    PinotLogicalSortExchange exchange = PinotLogicalSortExchange.create(
-        sort.getInput(),
-        RelDistributions.hash(Collections.emptyList()),
-        sort.getCollation(),
-        false,
-        !sort.getCollation().getKeys().isEmpty());
-    call.transformTo(LogicalSort.create(exchange, sort.getCollation(), 
sort.offset, sort.fetch));
+    // TODO: Revisit whether we should use hash distribution
+    PinotLogicalSortExchange exchange =
+        PinotLogicalSortExchange.create(sort.getInput(), 
RelDistributions.hash(Collections.emptyList()),
+            sort.getCollation(), false, 
!sort.getCollation().getKeys().isEmpty());
+    call.transformTo(sort.copy(sort.getTraitSet(), exchange, 
sort.getCollation()));
   }
 }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotWindowExchangeNodeInsertRule.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotWindowExchangeNodeInsertRule.java
index 64abb437be..2050aa3b8a 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotWindowExchangeNodeInsertRule.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotWindowExchangeNodeInsertRule.java
@@ -20,7 +20,6 @@ package org.apache.pinot.calcite.rel.rules;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -32,6 +31,7 @@ import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.hep.HepRelVertex;
 import org.apache.calcite.rel.RelDistributions;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Exchange;
 import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.core.Window;
 import org.apache.calcite.rel.logical.LogicalProject;
@@ -65,66 +65,59 @@ public class PinotWindowExchangeNodeInsertRule extends 
RelOptRule {
 
   // Supported window functions
   // OTHER_FUNCTION supported are: BOOL_AND, BOOL_OR
-  private static final Set<SqlKind> SUPPORTED_WINDOW_FUNCTION_KIND = 
ImmutableSet.of(SqlKind.SUM, SqlKind.SUM0,
-      SqlKind.MIN, SqlKind.MAX, SqlKind.COUNT, SqlKind.ROW_NUMBER, 
SqlKind.RANK, SqlKind.DENSE_RANK,
-      SqlKind.LAG, SqlKind.LEAD, SqlKind.FIRST_VALUE, SqlKind.LAST_VALUE, 
SqlKind.OTHER_FUNCTION);
+  private static final Set<SqlKind> SUPPORTED_WINDOW_FUNCTION_KIND =
+      Set.of(SqlKind.SUM, SqlKind.SUM0, SqlKind.MIN, SqlKind.MAX, 
SqlKind.COUNT, SqlKind.ROW_NUMBER, SqlKind.RANK,
+          SqlKind.DENSE_RANK, SqlKind.LAG, SqlKind.LEAD, SqlKind.FIRST_VALUE, 
SqlKind.LAST_VALUE,
+          SqlKind.OTHER_FUNCTION);
 
   public PinotWindowExchangeNodeInsertRule(RelBuilderFactory factory) {
-    super(operand(LogicalWindow.class, any()), factory, null);
+    super(operand(Window.class, any()), factory, null);
   }
 
   @Override
   public boolean matches(RelOptRuleCall call) {
-    if (call.rels.length < 1) {
-      return false;
-    }
-    if (call.rel(0) instanceof Window) {
-      Window window = call.rel(0);
-      // Only run the rule if the input isn't already an exchange node
-      return !PinotRuleUtils.isExchange(window.getInput());
-    }
-    return false;
+    Window window = call.rel(0);
+    return !PinotRuleUtils.isExchange(window.getInput());
   }
 
   @Override
   public void onMatch(RelOptRuleCall call) {
     Window window = call.rel(0);
-    RelNode windowInput = window.getInput();
-
     // Perform all validations
     validateWindows(window);
 
+    RelNode input = window.getInput();
     Window.Group windowGroup = updateLiteralArgumentsInWindowGroup(window);
-    if (windowGroup.keys.isEmpty() && 
windowGroup.orderKeys.getKeys().isEmpty()) {
+    Exchange exchange;
+    if (windowGroup.keys.isEmpty()) {
       // Empty OVER()
-      // Add a single Exchange for empty OVER() since no sort is required
+      if (windowGroup.orderKeys.getKeys().isEmpty()) {
+        // Add a single Exchange for empty OVER() if sort is not required
 
-      if (PinotRuleUtils.isProject(windowInput)) {
-        // Check for empty LogicalProject below LogicalWindow. If present 
modify it to be a Literal only project and add
-        // a project above
-        Project project = (Project) ((HepRelVertex) 
windowInput).getCurrentRel();
-        if (project.getProjects().isEmpty()) {
-          RelNode returnedRelNode = handleEmptyProjectBelowWindow(window, 
project);
-          call.transformTo(returnedRelNode);
-          return;
+        if (PinotRuleUtils.isProject(input)) {
+          // Check for empty LogicalProject below LogicalWindow. If present, 
modify it to be a Literal only project and
+          // add a project above.
+          Project project = (Project) ((HepRelVertex) input).getCurrentRel();
+          if (project.getProjects().isEmpty()) {
+            RelNode returnedRelNode = handleEmptyProjectBelowWindow(window, 
project);
+            call.transformTo(returnedRelNode);
+            return;
+          }
         }
-      }
 
-      PinotLogicalExchange exchange = PinotLogicalExchange.create(windowInput,
-          RelDistributions.hash(Collections.emptyList()));
-      call.transformTo(
-          LogicalWindow.create(window.getTraitSet(), exchange, 
window.constants, window.getRowType(),
-              List.of(windowGroup)));
-    } else if (windowGroup.keys.isEmpty() && 
!windowGroup.orderKeys.getKeys().isEmpty()) {
-      // Only ORDER BY
-      // Add a LogicalSortExchange with collation on the order by key(s) and 
an empty hash partition key
-      // TODO: ORDER BY only type queries need to be sorted on both sender and 
receiver side for better performance.
-      //       Sorted input data can use a k-way merge instead of a 
PriorityQueue for sorting. For now support to
-      //       sort on the sender side is not available thus setting this up 
to only sort on the receiver.
-      PinotLogicalSortExchange sortExchange = 
PinotLogicalSortExchange.create(windowInput,
-          RelDistributions.hash(Collections.emptyList()), 
windowGroup.orderKeys, false, true);
-      call.transformTo(LogicalWindow.create(window.getTraitSet(), 
sortExchange, window.constants, window.getRowType(),
-          List.of(windowGroup)));
+        // TODO: Revisit whether we should use hash distribution
+        exchange = PinotLogicalExchange.create(input, 
RelDistributions.hash(List.of()));
+      } else {
+        // Only ORDER BY
+        // Add a LogicalSortExchange with collation on the order by key(s) and 
an empty hash partition key
+        // TODO: ORDER BY only type queries need to be sorted on both sender 
and receiver side for better performance.
+        //       Sorted input data can use a k-way merge instead of a 
PriorityQueue for sorting. For now support to
+        //       sort on the sender side is not available thus setting this up 
to only sort on the receiver.
+        // TODO: Revisit whether we should use hash distribution
+        exchange =
+            PinotLogicalSortExchange.create(input, 
RelDistributions.hash(List.of()), windowGroup.orderKeys, false,
+                true);
+      }
     } else {
       // All other variants
       // Assess whether this is a PARTITION BY only query or not (includes 
queries of the type where PARTITION BY and
@@ -134,10 +127,7 @@ public class PinotWindowExchangeNodeInsertRule extends 
RelOptRule {
       if (isPartitionByOnly) {
         // Only PARTITION BY or PARTITION BY and ORDER BY on the same key(s)
         // Add an Exchange hashed on the partition by keys
-        PinotLogicalExchange exchange = 
PinotLogicalExchange.create(windowInput,
-            RelDistributions.hash(windowGroup.keys.toList()));
-        call.transformTo(LogicalWindow.create(window.getTraitSet(), exchange, 
window.constants, window.getRowType(),
-            List.of(windowGroup)));
+        exchange = PinotLogicalExchange.create(input, 
RelDistributions.hash(windowGroup.keys.toList()));
       } else {
         // PARTITION BY and ORDER BY on different key(s)
         // Add a LogicalSortExchange hashed on the partition by keys and 
collation based on order by keys
@@ -145,12 +135,13 @@ public class PinotWindowExchangeNodeInsertRule extends 
RelOptRule {
         //       that the data is already partitioned and sorting can be done 
on the sender side instead. This way
         //       sorting on the receiver side can be a no-op. Add support for 
this hint and pass it on. Until sender
         //       side sorting is implemented, setting this hint will throw an 
error on execution.
-        PinotLogicalSortExchange sortExchange = 
PinotLogicalSortExchange.create(windowInput,
-            RelDistributions.hash(windowGroup.keys.toList()), 
windowGroup.orderKeys, false, true);
-        call.transformTo(LogicalWindow.create(window.getTraitSet(), 
sortExchange, window.constants, window.getRowType(),
-            List.of(windowGroup)));
+        exchange = PinotLogicalSortExchange.create(input, 
RelDistributions.hash(windowGroup.keys.toList()),
+            windowGroup.orderKeys, false, true);
       }
     }
+    // NOTE: Need to create a new LogicalWindow to use the modified window 
group.
+    call.transformTo(LogicalWindow.create(window.getTraitSet(), exchange, 
window.constants, window.getRowType(),
+        List.of(windowGroup)));
   }
 
   private Window.Group updateLiteralArgumentsInWindowGroup(Window window) {
@@ -288,8 +279,9 @@ public class PinotWindowExchangeNodeInsertRule extends 
RelOptRule {
     final List<RexNode> expsForProjectBelowWindow = Collections.singletonList(
         rexBuilder.makeLiteral(0, 
cluster.getTypeFactory().createSqlType(SqlTypeName.INTEGER)));
     final List<String> expsFieldNamesBelowWindow = 
Collections.singletonList("winLiteral");
-    Project projectBelowWindow = LogicalProject.create(project.getInput(), 
project.getHints(),
-        expsForProjectBelowWindow, expsFieldNamesBelowWindow);
+    Project projectBelowWindow =
+        LogicalProject.create(project.getInput(), project.getHints(), 
expsForProjectBelowWindow,
+            expsFieldNamesBelowWindow);
 
     // Fix up the inputs to the Window to include the literal column and add 
an exchange
     final RelDataTypeFactory.Builder outputBuilder = 
cluster.getTypeFactory().builder();
@@ -300,8 +292,8 @@ public class PinotWindowExchangeNodeInsertRule extends 
RelOptRule {
     // ROW_NUMBER(). Add an Exchange with empty hash distribution list
     PinotLogicalExchange exchange =
         PinotLogicalExchange.create(projectBelowWindow, 
RelDistributions.hash(Collections.emptyList()));
-    Window newWindow = new LogicalWindow(window.getCluster(), 
window.getTraitSet(), exchange,
-        window.getConstants(), outputBuilder.build(), window.groups);
+    Window newWindow = new LogicalWindow(window.getCluster(), 
window.getTraitSet(), exchange, window.getConstants(),
+        outputBuilder.build(), window.groups);
 
     // Create the LogicalProject above window to remove the literal column
     final List<RexNode> expsForProjectAboveWindow = new ArrayList<>();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to