This is an automated email from the ASF dual-hosted git repository.

ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 782b6979360 [multistage] Multiple Window Group Support (#16109)
782b6979360 is described below

commit 782b6979360b6ec28c869bbf8b7ea59e1548a3ef
Author: Xuanyi Li <[email protected]>
AuthorDate: Tue Jul 8 08:38:55 2025 -0700

    [multistage] Multiple Window Group Support (#16109)
---
 .../calcite/rel/rules/PinotQueryRuleSets.java      |   2 +
 .../calcite/rel/rules/PinotWindowSplitRule.java    | 181 ++++++++++++
 .../rel/rules/PinotWindowSplitRuleTest.java        | 311 +++++++++++++++++++++
 .../resources/queries/PhysicalOptimizerPlans.json  |  73 +++++
 .../resources/queries/WindowFunctionPlans.json     |  80 ++++--
 5 files changed, 627 insertions(+), 20 deletions(-)

diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java
index 7607b0e0d42..191a2005884 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java
@@ -222,6 +222,7 @@ public class PinotQueryRuleSets {
       PinotAggregateExchangeNodeInsertRule.SortProjectAggregate.INSTANCE,
       PinotAggregateExchangeNodeInsertRule.SortAggregate.INSTANCE,
       PinotAggregateExchangeNodeInsertRule.WithoutSort.INSTANCE,
+      PinotWindowSplitRule.INSTANCE,
       PinotWindowExchangeNodeInsertRule.INSTANCE,
       PinotSetOpExchangeNodeInsertRule.INSTANCE,
 
@@ -241,6 +242,7 @@ public class PinotQueryRuleSets {
       PinotLogicalAggregateRule.SortProjectAggregate.INSTANCE,
       PinotLogicalAggregateRule.SortAggregate.INSTANCE,
       PinotLogicalAggregateRule.PinotLogicalAggregateConverter.INSTANCE,
+      PinotWindowSplitRule.INSTANCE,
       // Evaluate the Literal filter nodes
       CoreRules.FILTER_REDUCE_EXPRESSIONS
   );
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotWindowSplitRule.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotWindowSplitRule.java
new file mode 100644
index 00000000000..16d5dd61ab8
--- /dev/null
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotWindowSplitRule.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.calcite.rel.rules;
+
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Window;
+import org.apache.calcite.rel.logical.LogicalWindow;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.rex.RexWindowBound;
+import org.apache.calcite.sql.SqlAggFunction;
+
+/**
+ * A RelOptRule to split a single LogicalWindow with multiple window groups
+ * into a chain of LogicalWindows, where each has exactly one window group.
+ *
+ * This version correctly handles window expressions that refer to constants
+ * by shifting RexInputRef pointers as the input field count changes down the 
chain.
+ */
+public class PinotWindowSplitRule extends RelOptRule {
+
+  public static final PinotWindowSplitRule INSTANCE = new 
PinotWindowSplitRule();
+
+  private PinotWindowSplitRule() {
+    super(operand(LogicalWindow.class, any()), "PinotWindowSplitterRule");
+  }
+
+  @Override
+  public boolean matches(RelOptRuleCall call) {
+    LogicalWindow window = call.rel(0);
+    return window.groups.size() > 1;
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    final LogicalWindow originalWindow = call.rel(0);
+    final List<Window.Group> groups = originalWindow.groups;
+
+    RelNode currentInput = originalWindow.getInput();
+    final List<RelDataTypeField> originalOutputFields = 
originalWindow.getRowType().getFieldList();
+    final int originalInputFieldCount = 
currentInput.getRowType().getFieldCount();
+    final RelDataTypeFactory typeFactory = 
originalWindow.getCluster().getTypeFactory();
+
+    int cumulativeAggFieldCount = 0;
+
+    for (int i = 0; i < groups.size(); i++) {
+      Window.Group group = groups.get(i);
+      final RelDataType currentInputType = currentInput.getRowType();
+      final int currentInputFieldCount = currentInputType.getFieldCount();
+
+      // Only shift if this is not the first window in the chain.
+      if (i > 0) {
+        int shift = currentInputFieldCount - originalInputFieldCount;
+        if (shift > 0) {
+          RexConstantRefShifter shifter = new 
RexConstantRefShifter(originalInputFieldCount, shift);
+          group = shifter.apply(group);
+        }
+      }
+
+      // 1. Determine the RowType for the new single-group window.
+      List<RelDataTypeField> newWindowFields = new 
ArrayList<>(currentInputType.getFieldList());
+      for (int j = 0; j < group.aggCalls.size(); j++) {
+        int fieldIndexInOriginal = originalInputFieldCount + 
cumulativeAggFieldCount + j;
+        newWindowFields.add(originalOutputFields.get(fieldIndexInOriginal));
+      }
+      final RelDataType newWindowRowType = 
typeFactory.createStructType(newWindowFields);
+      cumulativeAggFieldCount += group.aggCalls.size();
+
+      // 2. Create the new LogicalWindow with the (potentially shifted) group.
+      // The newly created window becomes the input for the next iteration.
+      currentInput = new LogicalWindow(
+          originalWindow.getCluster(),
+          originalWindow.getTraitSet(),
+          originalWindow.getHints(),
+          currentInput,
+          originalWindow.getConstants(),
+          newWindowRowType,
+          ImmutableList.of(group));
+    }
+    call.transformTo(currentInput);
+  }
+
+  /**
+   * A RexShuttle that shifts indices of RexInputRefs that point to constants.
+   *
+   * A RexInputRef can point to an input field or a constant. If its index is 
>= originalInputFieldCount,
+   * it's a constant. When we chain windows, the input field count for 
subsequent windows increases,
+   * so we must shift the indices for these constant references to avoid them 
being misinterpreted
+   * as input field references.
+   */
+  static class RexConstantRefShifter extends RexShuttle {
+    private final int _originalInputFieldCount;
+    private final int _shift;
+
+    RexConstantRefShifter(int originalInputFieldCount, int shift) {
+      _originalInputFieldCount = originalInputFieldCount;
+      _shift = shift;
+    }
+
+    @Override
+    public RexNode visitInputRef(RexInputRef inputRef) {
+      int index = inputRef.getIndex();
+      // If the index is greater than or equal to the original number of input 
fields,
+      // it refers to a constant, so we must shift it.
+      if (index >= _originalInputFieldCount) {
+        return new RexInputRef(index + _shift, inputRef.getType());
+      }
+      // Otherwise, it's a reference to a field from the original input 
relation,
+      // which does not need shifting.
+      return inputRef;
+    }
+
+    @Override
+    public RexNode visitCall(RexCall call) {
+      if (call instanceof Window.RexWinAggCall) {
+        Window.RexWinAggCall winCall = (Window.RexWinAggCall) call;
+        List<RexNode> newOperands = winCall.getOperands().stream()
+            .map(operand -> operand.accept(this))
+            .collect(Collectors.toList());
+        return new Window.RexWinAggCall(
+            (SqlAggFunction) winCall.getOperator(),
+            winCall.getType(),
+            newOperands,
+            winCall.ordinal,
+            winCall.distinct,
+            winCall.ignoreNulls
+        );
+      }
+      return super.visitCall(call);
+    }
+
+    /**
+     * Applies the shuttle to all expressions within a Window.Group.
+     */
+    public Window.Group apply(Window.Group group) {
+      List<Window.RexWinAggCall> newAggCalls = group.aggCalls.stream()
+          .map(agg -> (Window.RexWinAggCall) agg.accept(this))
+          .collect(Collectors.toList());
+
+      RexWindowBound newLowerBound = group.lowerBound.accept(this);
+      RexWindowBound newUpperBound = group.upperBound.accept(this);
+
+      return new Window.Group(
+          group.keys,
+          group.isRows,
+          newLowerBound,
+          newUpperBound,
+          group.exclude,
+          group.orderKeys,
+          newAggCalls
+      );
+    }
+  }
+}
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/calcite/rel/rules/PinotWindowSplitRuleTest.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/calcite/rel/rules/PinotWindowSplitRuleTest.java
new file mode 100644
index 00000000000..be187a59709
--- /dev/null
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/calcite/rel/rules/PinotWindowSplitRuleTest.java
@@ -0,0 +1,311 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.calcite.rel.rules;
+
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.plan.hep.HepPlanner;
+import org.apache.calcite.plan.hep.HepProgramBuilder;
+import org.apache.calcite.plan.hep.HepRelVertex;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Window;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.logical.LogicalValues;
+import org.apache.calcite.rel.logical.LogicalWindow;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelDataTypeFieldImpl;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexWindowBounds;
+import org.apache.calcite.rex.RexWindowExclusion;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.fun.SqlAvgAggFunction;
+import org.apache.calcite.sql.fun.SqlSumAggFunction;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.pinot.query.type.TypeFactory;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class PinotWindowSplitRuleTest {
+  private static final TypeFactory TYPE_FACTORY = new TypeFactory();
+  private static final RexBuilder REX_BUILDER = new RexBuilder(TYPE_FACTORY);
+
+  private AutoCloseable _mocks;
+
+  @Mock
+  private RelOptRuleCall _call;
+  @Mock
+  private HepRelVertex _input;
+  @Mock
+  private RelOptCluster _cluster;
+
+  @BeforeMethod
+  public void setUp() {
+    _mocks = MockitoAnnotations.openMocks(this);
+    RelTraitSet traits = RelTraitSet.createEmpty();
+    Mockito.when(_input.getTraitSet()).thenReturn(traits);
+    Mockito.when(_input.getCluster()).thenReturn(_cluster);
+  }
+
+  @AfterMethod
+  public void tearDown()
+      throws Exception {
+    _mocks.close();
+  }
+
+  @Test
+  public void testMatchesWithSingleGroup() {
+    // Test that the rule doesn't match when there's only one window group
+    LogicalWindow singleGroupWindow = createWindowWithGroups(1);
+    Mockito.when(_call.rel(0)).thenReturn(singleGroupWindow);
+
+    boolean matches = PinotWindowSplitRule.INSTANCE.matches(_call);
+    Assert.assertFalse(matches, "Rule should not match when there's only one 
window group");
+  }
+
+  @Test
+  public void testMatchesWithMultipleGroups() {
+    // Test that the rule matches when there are multiple window groups
+    LogicalWindow multiGroupWindow = createWindowWithGroups(3);
+    Mockito.when(_call.rel(0)).thenReturn(multiGroupWindow);
+
+    boolean matches = PinotWindowSplitRule.INSTANCE.matches(_call);
+    Assert.assertTrue(matches, "Rule should match when there are multiple 
window groups");
+  }
+
+  @Test
+  public void testOnMatchWithTwoGroups() {
+    // Test splitting a window with two groups into a chain of two windows
+    LogicalWindow originalWindow = createTestWindow(2);
+    Mockito.when(_call.rel(0)).thenReturn(originalWindow);
+
+    PinotWindowSplitRule.INSTANCE.onMatch(_call);
+
+    // Verify that transformTo was called with a LogicalWindow
+    ArgumentCaptor<RelNode> transformedNodeCapture = 
ArgumentCaptor.forClass(RelNode.class);
+    Mockito.verify(_call, 
Mockito.times(1)).transformTo(transformedNodeCapture.capture());
+
+    RelNode transformedNode = transformedNodeCapture.getValue();
+    Assert.assertTrue(transformedNode instanceof LogicalWindow,
+        "Transformed node should be a LogicalWindow");
+
+    LogicalWindow resultWindow = (LogicalWindow) transformedNode;
+    Assert.assertEquals(resultWindow.groups.size(), 1,
+        "Final window should have exactly one group");
+  }
+
+  @Test
+  public void testOnMatchWithThreeGroups() {
+    // Test splitting a window with three groups into a chain of three windows
+    LogicalWindow originalWindow = createTestWindow(3);
+    Mockito.when(_call.rel(0)).thenReturn(originalWindow);
+
+    PinotWindowSplitRule.INSTANCE.onMatch(_call);
+
+    // Verify that transformTo was called with a LogicalWindow
+    ArgumentCaptor<RelNode> transformedNodeCapture = 
ArgumentCaptor.forClass(RelNode.class);
+    Mockito.verify(_call, 
Mockito.times(1)).transformTo(transformedNodeCapture.capture());
+
+    RelNode transformedNode = transformedNodeCapture.getValue();
+    Assert.assertTrue(transformedNode instanceof LogicalWindow,
+        "Transformed node should be a LogicalWindow");
+
+    LogicalWindow resultWindow = (LogicalWindow) transformedNode;
+    Assert.assertEquals(resultWindow.groups.size(), 1,
+        "Final window should have exactly one group");
+  }
+
+  @Test
+  public void testRexConstantRefShifter() {
+    // Test the RexConstantRefShifter functionality
+    int originalInputFieldCount = 2;
+    int shift = 1;
+
+    // Create a RexInputRef that points to a constant (index >= 
originalInputFieldCount)
+    RexInputRef constantRef = new RexInputRef(3, 
TYPE_FACTORY.createSqlType(SqlTypeName.INTEGER));
+
+    // Create a RexInputRef that points to an input field (index < 
originalInputFieldCount)
+    RexInputRef inputFieldRef = new RexInputRef(1, 
TYPE_FACTORY.createSqlType(SqlTypeName.INTEGER));
+
+    // Create a window group with these references
+    Window.Group group = new Window.Group(
+        ImmutableBitSet.of(0),
+        true,
+        RexWindowBounds.UNBOUNDED_PRECEDING,
+        RexWindowBounds.UNBOUNDED_FOLLOWING,
+        RexWindowExclusion.EXCLUDE_NO_OTHER,
+        RelCollations.EMPTY,
+        Collections.emptyList()
+    );
+
+    // Apply the shifter
+    PinotWindowSplitRule.RexConstantRefShifter shifter =
+        new 
PinotWindowSplitRule.RexConstantRefShifter(originalInputFieldCount, shift);
+    Window.Group shiftedGroup = shifter.apply(group);
+
+    // The group should be the same since it doesn't contain the RexInputRefs 
we're testing
+    Assert.assertEquals(shiftedGroup, group);
+  }
+
+  @Test
+  public void testRexConstantRefShifterWithAggCalls() {
+    // Test the RexConstantRefShifter with window aggregate calls
+    int originalInputFieldCount = 2;
+    int shift = 1;
+
+    // Create RexInputRefs for testing
+    RelDataType intType = TYPE_FACTORY.createSqlType(SqlTypeName.INTEGER);
+    RexInputRef inputFieldRef = new RexInputRef(0, intType); // Input field
+    RexInputRef constantRef = new RexInputRef(3, intType);   // Constant
+
+    // Create window aggregate calls
+    Window.RexWinAggCall inputFieldAgg = new Window.RexWinAggCall(
+        new SqlSumAggFunction(intType), intType, List.of(inputFieldRef), 0, 
false, false);
+    Window.RexWinAggCall constantAgg = new Window.RexWinAggCall(
+        new SqlAvgAggFunction(SqlKind.AVG), intType, List.of(constantRef), 1, 
false, false);
+
+    // Create a window group with these aggregate calls
+    Window.Group group = new Window.Group(
+        ImmutableBitSet.of(0),
+        true,
+        RexWindowBounds.UNBOUNDED_PRECEDING,
+        RexWindowBounds.UNBOUNDED_FOLLOWING,
+        RexWindowExclusion.EXCLUDE_NO_OTHER,
+        RelCollations.EMPTY,
+        Arrays.asList(inputFieldAgg, constantAgg)
+    );
+
+    // Apply the shifter
+    PinotWindowSplitRule.RexConstantRefShifter shifter =
+        new 
PinotWindowSplitRule.RexConstantRefShifter(originalInputFieldCount, shift);
+    Window.Group shiftedGroup = shifter.apply(group);
+
+    // Verify that the input field reference wasn't shifted
+    Window.RexWinAggCall shiftedInputFieldAgg = shiftedGroup.aggCalls.get(0);
+    RexInputRef shiftedInputFieldRef = (RexInputRef) 
shiftedInputFieldAgg.getOperands().get(0);
+    Assert.assertEquals(shiftedInputFieldRef.getIndex(), 0,
+        "Input field reference should not be shifted");
+
+    // Verify that the constant reference was shifted
+    Window.RexWinAggCall shiftedConstantAgg = shiftedGroup.aggCalls.get(1);
+    RexInputRef shiftedConstantRef = (RexInputRef) 
shiftedConstantAgg.getOperands().get(0);
+    Assert.assertEquals(shiftedConstantRef.getIndex(), 4,
+        "Constant reference should be shifted by " + shift);
+  }
+
+  private LogicalWindow createWindowWithGroups(int numGroups) {
+    RelDataType intType = TYPE_FACTORY.createSqlType(SqlTypeName.INTEGER);
+    RexInputRef inputRef = new RexInputRef(0, intType);
+
+    List<Window.Group> groups = createWindowGroups(numGroups);
+
+    List<RexLiteral> constants = Collections.emptyList();
+    return LogicalWindow.create(RelTraitSet.createEmpty(), _input, constants, 
intType, groups);
+  }
+
+  private List<Window.Group> createWindowGroups(int numGroups) {
+    RelDataType intType = TYPE_FACTORY.createSqlType(SqlTypeName.INTEGER);
+    RexInputRef inputRef = new RexInputRef(0, intType);
+
+    List<Window.Group> groups = new ArrayList<>();
+    for (int i = 0; i < numGroups; i++) {
+      Window.RexWinAggCall aggCall = new Window.RexWinAggCall(
+          new SqlSumAggFunction(intType), intType, List.of(inputRef), i, 
false, false);
+      Window.Group group = new Window.Group(
+          ImmutableBitSet.of(0),
+          true,
+          RexWindowBounds.UNBOUNDED_PRECEDING,
+          RexWindowBounds.UNBOUNDED_FOLLOWING,
+          RexWindowExclusion.EXCLUDE_NO_OTHER,
+          RelCollations.EMPTY,
+          List.of(aggCall)
+      );
+      groups.add(group);
+    }
+    return groups;
+  }
+
+  private LogicalWindow createTestWindow(int numGroups) {
+    final RelDataType intType = 
TYPE_FACTORY.createSqlType(SqlTypeName.INTEGER);
+    RexInputRef inputRef = new RexInputRef(0, intType);
+
+    // Create a real RelOptCluster
+    HepPlanner planner = new HepPlanner(new HepProgramBuilder().build());
+    RelOptCluster cluster = RelOptCluster.create(planner, new 
RexBuilder(TYPE_FACTORY));
+
+    // Create a real input project with real row type
+    final List<RelDataTypeField> inputFields = createInputFields(1);
+    RelDataType inputRowType = TYPE_FACTORY.createStructType(inputFields);
+    LogicalProject inputProject = LogicalProject.create(
+        LogicalValues.create(cluster, inputRowType, ImmutableList.of()),
+        Collections.emptyList(),
+        List.of(inputRef),
+        inputRowType
+    );
+    Mockito.when(_input.getCurrentRel()).thenReturn(inputProject);
+    Mockito.when(_input.getCluster()).thenReturn(cluster);
+
+    // Create real window groups
+    List<Window.Group> groups = createWindowGroups(numGroups);
+    List<RexLiteral> constants = Collections.emptyList();
+
+    // Create real struct row type for the window
+    List<RelDataTypeField> windowFields = new ArrayList<>(inputFields);
+    for (int i = 0; i < groups.size(); i++) {
+      final int aggIdx = i;
+      windowFields.add(new RelDataTypeFieldImpl(
+        "agg_field" + aggIdx,
+        inputFields.size() + aggIdx,
+        intType
+      ));
+    }
+    RelDataType windowRowType = TYPE_FACTORY.createStructType(windowFields);
+
+    return LogicalWindow.create(
+        RelTraitSet.createEmpty(), inputProject, constants, windowRowType, 
groups);
+  }
+
+  private List<RelDataTypeField> createInputFields(int count) {
+    List<RelDataTypeField> fields = new ArrayList<>();
+    for (int i = 0; i < count; i++) {
+      fields.add(new RelDataTypeFieldImpl(
+        "field" + i,
+        i,
+        TYPE_FACTORY.createSqlType(SqlTypeName.INTEGER)
+      ));
+    }
+    return fields;
+  }
+}
diff --git 
a/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json 
b/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
index bd5137c837d..b669891ce5b 100644
--- a/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
@@ -882,5 +882,78 @@
         ]
       }
     ]
+  },
+  "physical_opt_window_functions": {
+    "queries": [
+      {
+        "description": "Multiple window groups",
+        "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT 
MIN(a.col3) OVER(PARTITION BY a.col2 ORDER BY a.col3), MAX(a.col3) 
OVER(PARTITION BY a.col1 ORDER BY a.col3), ROW_NUMBER() OVER(PARTITION BY 
a.col1 ORDER BY a.col3), RANK() OVER(PARTITION BY a.col1 ORDER BY a.col3), 
LAG(a.col3, 1, '0') OVER(PARTITION BY a.col1 ORDER BY a.col3) FROM a",
+        "notes": "the table is partitioned by col2, thus we don't need to 
shuffle the data for the first windown function",
+        "output": [
+          "Execution Plan",
+          "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n  PhysicalProject($0=[$3], $1=[$4], $2=[$7], $3=[$5], $4=[$6])",
+          "\n    PhysicalWindow(window#0=[window(partition {0} order by [2] 
rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])], 
constants=[[1, _UTF-8'0']])",
+          "\n      PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE], 
collation=[[2]])",
+          "\n        PhysicalWindow(window#0=[window(partition {0} order by 
[2] aggs [MAX($2), RANK(), LAG($2, 1, _UTF-8'0')])], constants=[[1, 
_UTF-8'0']])",
+          "\n          
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]], 
collation=[[2]])",
+          "\n            PhysicalWindow(window#0=[window(partition {1} order 
by [2] aggs [MIN($2)])], constants=[[1, _UTF-8'0']])",
+          "\n              
PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE], collation=[[2]])",
+          "\n                PhysicalProject(col1=[$0], col2=[$1], col3=[$2])",
+          "\n                  PhysicalTableScan(table=[[default, a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "Multiple window groups",
+        "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT 
MAX(a.col3) OVER(PARTITION BY a.col1 ORDER BY a.col3), MIN(a.col3) 
OVER(PARTITION BY a.col2 ORDER BY a.col3), SUM(a.col3) OVER(PARTITION BY a.col3 
ORDER BY a.col1) FROM a",
+        "notes": "physical optimizer currently doesn't support reorder window 
nodes according to the data distribution; the first window function will be 
processed first, i.e. the lowest window node in the plan tree",
+        "output": [
+          "Execution Plan",
+          "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n  PhysicalProject($0=[$3], $1=[$4], $2=[$5])",
+          "\n    PhysicalWindow(window#0=[window(partition {2} order by [0] 
aggs [SUM($2)])])",
+          "\n      PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[2]], collation=[[0]])",
+          "\n        PhysicalWindow(window#0=[window(partition {1} order by 
[2] aggs [MIN($2)])])",
+          "\n          
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[1]], 
collation=[[2]])",
+          "\n            PhysicalWindow(window#0=[window(partition {0} order 
by [2] aggs [MAX($2)])])",
+          "\n              
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]], 
collation=[[2]])",
+          "\n                PhysicalProject(col1=[$0], col2=[$1], col3=[$2])",
+          "\n                  PhysicalTableScan(table=[[default, a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "Multiple window groups",
+        "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT 
SUM(a.col3) OVER(ORDER BY a.col2), MIN(a.col3) OVER(PARTITION BY a.col2) FROM 
a",
+        "output": [
+          "Execution Plan",
+          "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n  PhysicalProject($0=[$2], $1=[$3])",
+          "\n    PhysicalWindow(window#0=[window(partition {0} aggs 
[MIN($1)])])",
+          "\n      PhysicalWindow(window#0=[window(order by [0] aggs 
[SUM($1)])])",
+          "\n        PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE], 
collation=[[0]])",
+          "\n          PhysicalProject(col2=[$1], col3=[$2])",
+          "\n            PhysicalTableScan(table=[[default, a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "Multiple window groups",
+        "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT 
SUM(a.col3) OVER(ORDER BY a.col2, a.col1), MIN(a.col3) OVER(ORDER BY a.col1, 
a.col2) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n  PhysicalProject($0=[$3], $1=[$4])",
+          "\n    PhysicalWindow(window#0=[window(order by [0, 1] aggs 
[MIN($2)])])",
+          "\n      PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE], 
collation=[[0, 1]])",
+          "\n        PhysicalWindow(window#0=[window(order by [1, 0] aggs 
[SUM($2)])])",
+          "\n          PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE], 
collation=[[1, 0]])",
+          "\n            PhysicalProject(col1=[$0], col2=[$1], col3=[$2])",
+          "\n              PhysicalTableScan(table=[[default, a]])",
+          "\n"
+        ]
+      }
+    ]
   }
 }
diff --git 
a/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json 
b/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json
index f1f6da0973f..6f1700c36f0 100644
--- a/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json
@@ -3601,33 +3601,78 @@
       },
       {
         "description": "Multiple window groups",
-        "notes": "not yet supported",
-        "ignored": true,
-        "sql": "EXPLAIN PLAN FOR SELECT MIN(a.col3) OVER(PARTITION BY a.col2 
ORDER BY a.col3), MAX(a.col3) OVER(PARTITION BY a.col1 ORDER BY a.col3) FROM a"
+        "sql": "EXPLAIN PLAN FOR SELECT MIN(a.col3) OVER(PARTITION BY a.col2 
ORDER BY a.col3), MAX(a.col3) OVER(PARTITION BY a.col1 ORDER BY a.col3) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$3], $1=[$4])",
+          "\n  LogicalWindow(window#0=[window(partition {0} order by [2] aggs 
[MAX($2)])])",
+          "\n    PinotLogicalSortExchange(distribution=[hash[0]], 
collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n      LogicalWindow(window#0=[window(partition {1} order by [2] 
aggs [MIN($2)])])",
+          "\n        PinotLogicalSortExchange(distribution=[hash[1]], 
collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n          LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
+          "\n            PinotLogicalTableScan(table=[[default, a]])",
+          "\n"
+        ]
       },
       {
         "description": "Multiple window groups",
-        "notes": "not yet supported",
-        "ignored": true,
-        "sql": "EXPLAIN PLAN FOR SELECT COUNT(a.col3) OVER(PARTITION BY 
a.col2), SUM(a.col3) OVER(PARTITION BY a.col2 ORDER BY a.col3) FROM a"
+        "sql": "EXPLAIN PLAN FOR SELECT COUNT(a.col3) OVER(PARTITION BY 
a.col2), SUM(a.col3) OVER(PARTITION BY a.col2 ORDER BY a.col3) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$2], $1=[$3])",
+          "\n  LogicalWindow(window#0=[window(partition {0} order by [1] aggs 
[SUM($1)])])",
+          "\n    PinotLogicalSortExchange(distribution=[hash[0]], 
collation=[[1]], isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n      LogicalWindow(window#0=[window(partition {0} aggs 
[COUNT($1)])])",
+          "\n        PinotLogicalExchange(distribution=[hash[0]])",
+          "\n          LogicalProject(col2=[$1], col3=[$2])",
+          "\n            PinotLogicalTableScan(table=[[default, a]])",
+          "\n"
+        ]
       },
       {
         "description": "Multiple window groups",
-        "notes": "not yet supported",
-        "ignored": true,
-        "sql": "EXPLAIN PLAN FOR SELECT AVG(a.col3) OVER(), MAX(a.col3) 
OVER(PARTITION BY a.col2) FROM a"
+        "sql": "EXPLAIN PLAN FOR SELECT AVG(a.col3) OVER(), MAX(a.col3) 
OVER(PARTITION BY a.col2) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(EXPR$0=[/(CAST($2):DOUBLE NOT NULL, $3)], 
EXPR$1=[$4])",
+          "\n  LogicalWindow(window#0=[window(partition {0} aggs [MAX($1)])])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
+          "\n      LogicalWindow(window#0=[window(aggs [SUM($1), 
COUNT($1)])])",
+          "\n        PinotLogicalExchange(distribution=[hash])",
+          "\n          LogicalProject(col2=[$1], col3=[$2])",
+          "\n            PinotLogicalTableScan(table=[[default, a]])",
+          "\n"
+        ]
       },
       {
         "description": "Multiple window groups",
-        "notes": "not yet supported",
-        "ignored": true,
-        "sql": "EXPLAIN PLAN FOR SELECT SUM(a.col3) OVER(ORDER BY a.col2), 
MIN(a.col3) OVER(PARTITION BY a.col2) FROM a"
+        "sql": "EXPLAIN PLAN FOR SELECT SUM(a.col3) OVER(ORDER BY a.col2), 
MIN(a.col3) OVER(PARTITION BY a.col2) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$2], $1=[$3])",
+          "\n  LogicalWindow(window#0=[window(partition {0} aggs [MIN($1)])])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
+          "\n      LogicalWindow(window#0=[window(order by [0] aggs 
[SUM($1)])])",
+          "\n        PinotLogicalSortExchange(distribution=[hash], 
collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n          LogicalProject(col2=[$1], col3=[$2])",
+          "\n            PinotLogicalTableScan(table=[[default, a]])",
+          "\n"
+        ]
       },
       {
         "description": "Multiple window groups",
-        "notes": "not yet supported",
-        "ignored": true,
-        "sql": "EXPLAIN PLAN FOR SELECT SUM(a.col3) OVER(ORDER BY a.col2, 
a.col1), MIN(a.col3) OVER(ORDER BY a.col1, a.col2) FROM a"
+        "sql": "EXPLAIN PLAN FOR SELECT SUM(a.col3) OVER(ORDER BY a.col2, 
a.col1), MIN(a.col3) OVER(ORDER BY a.col1, a.col2) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$3], $1=[$4])",
+          "\n  LogicalWindow(window#0=[window(order by [0, 1] aggs 
[MIN($2)])])",
+          "\n    PinotLogicalSortExchange(distribution=[hash], collation=[[0, 
1]], isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n      LogicalWindow(window#0=[window(order by [1, 0] aggs 
[SUM($2)])])",
+          "\n        PinotLogicalSortExchange(distribution=[hash], 
collation=[[1, 0]], isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n          LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
+          "\n            PinotLogicalTableScan(table=[[default, a]])",
+          "\n"
+        ]
       }
     ]
   },
@@ -3678,11 +3723,6 @@
         "sql": "EXPLAIN PLAN FOR SELECT ROW_NUMBER() OVER(PARTITION BY a.col1 
ORDER BY a.col2 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) FROM a",
         "expectedException": ".*ROW/RANGE not allowed with RANK, DENSE_RANK, 
ROW_NUMBER, PERCENTILE_CONT/DISC or LAG/LEAD functions.*"
       },
-      {
-        "description": "Apache Calcite failures with ROW_NUMBER() window 
functions - default frame for ROW_NUMBER is different from aggregation window 
functions, resulting in multiple window groups",
-        "sql": "EXPLAIN PLAN FOR SELECT ROW_NUMBER() OVER(PARTITION BY a.col1 
ORDER BY a.col2), SUM(a.col1) OVER(PARTITION BY a.col1 ORDER BY a.col2) FROM a",
-        "expectedException": ".*Currently only 1 window group is supported, 
query has 2 groups.*"
-      },
       {
         "description": "Apache Calcite failures with ROW_NUMBER() window 
functions - custom frames not allowed",
         "sql": "EXPLAIN PLAN FOR SELECT ROW_NUMBER() OVER(PARTITION BY a.col1 
RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) FROM a",


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to