This is an automated email from the ASF dual-hosted git repository.

kgyrtkirk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e3fab5bf96 Make WindowFrames more specific (#16741)
7e3fab5bf96 is described below

commit 7e3fab5bf961e2fbfdfc60e24b63ccc0b200e900
Author: Zoltan Haindrich <[email protected]>
AuthorDate: Thu Jul 25 04:57:36 2024 +0200

    Make WindowFrames more specific (#16741)
    
    Changes the WindowFrame internals / representation a bit; introduces 
dedicated frametypes for rows and groups which corresponds to the implemented 
processing methods
---
 docs/querying/sql-window-functions.md              |   3 -
 .../org/apache/druid/msq/exec/MSQWindowTest.java   |  69 ++----
 .../druid/query/operator/window/WindowFrame.java   | 259 ++++++++++++---------
 .../semantic/DefaultFramedOnHeapAggregatable.java  |  46 ++--
 .../query/operator/window/WindowFrameTest.java     |  70 ++++++
 .../window/WindowFramedAggregateProcessorTest.java |   4 +-
 .../semantic/FramedOnHeapAggregatableTest.java     |  77 ++----
 .../apache/druid/sql/calcite/rel/Windowing.java    |  23 +-
 .../apache/druid/sql/calcite/CalciteQueryTest.java |   2 +-
 .../druid/sql/calcite/CalciteWindowQueryTest.java  |  50 ++--
 .../calcite/tests/window/WindowOpReorder.sqlTest   |  16 +-
 .../calcite/tests/window/aggregateConstant.sqlTest |   2 +-
 .../tests/window/defaultBoundCurrentRow.sqlTest    |  43 ++--
 .../calcite/tests/window/no_grouping.sqlTest       |  28 ++-
 .../calcite/tests/window/no_grouping2.sqlTest      |   2 +-
 .../calcite/tests/window/rank_handling.sqlTest     |   3 +
 .../calcite/tests/window/simpleSum.sqlTest         |   9 +-
 .../calcite/tests/window/virtualColumns.sqlTest    |   2 +-
 .../wikipediaAggregationsMultipleOrdering.sqlTest  |   9 +-
 .../window/wikipediaCumulativeOrdered.sqlTest      |  11 +-
 .../window/wikipediaFramedAggregations.sqlTest     |   8 +-
 .../tests/window/wikipediaSimplePartition.sqlTest  |   2 +-
 .../wikipediaSimplePartitionInitialSort.sqlTest    |   2 +-
 .../tests/window/windowed_long_null.sqlTest        |   4 +-
 24 files changed, 375 insertions(+), 369 deletions(-)

diff --git a/docs/querying/sql-window-functions.md 
b/docs/querying/sql-window-functions.md
index d64538779f0..7c2c3aef53e 100644
--- a/docs/querying/sql-window-functions.md
+++ b/docs/querying/sql-window-functions.md
@@ -246,11 +246,8 @@ Druid has guardrail logic to prevent you from executing 
window function queries
 
 For example:
 - You cannot set expressions as bounds for window frames.
-- You cannot use two FOLLOWING expressions in the window frame. For example: 
`ROWS BETWEEN 2 ROWS FOLLOWING and 3 ROWS FOLLOWING`.
 - You can only use a RANGE frames when both endpoints are unbounded or current 
row.
 
-If you write a query that violates one of these conditions, Druid throws an 
error: "The query contains a window frame which may return incorrect results. 
To disregard this warning, set `windowingStrictValidation` to false in the 
query context."
-
 ## Window function reference
 
 |Function|Notes|
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java
index afbedb7d704..5cc84ac6ee6 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java
@@ -106,7 +106,7 @@ public class MSQWindowTest extends MSQTestBase
                                            .build();
 
 
-    final WindowFrame theFrame = new WindowFrame(WindowFrame.PeerType.ROWS, 
true, 0, true, 0, null);
+    final WindowFrame theFrame = WindowFrame.unbounded();
     final AggregatorFactory[] theAggs = {
         new DoubleSumAggregatorFactory("w0", "d0")
     };
@@ -196,7 +196,7 @@ public class MSQWindowTest extends MSQTestBase
                                            .build();
 
 
-    final WindowFrame theFrame = new WindowFrame(WindowFrame.PeerType.ROWS, 
true, 0, true, 0, null);
+    final WindowFrame theFrame = WindowFrame.unbounded();
     final AggregatorFactory[] theAggs = {
         new DoubleSumAggregatorFactory("w0", "d1")
     };
@@ -306,7 +306,7 @@ public class MSQWindowTest extends MSQTestBase
                                            .build();
 
 
-    final WindowFrame theFrame = new WindowFrame(WindowFrame.PeerType.ROWS, 
true, 0, true, 0, null);
+    final WindowFrame theFrame = WindowFrame.unbounded();
     final AggregatorFactory[] theAggs = {
         new DoubleSumAggregatorFactory("w0", "d1")
     };
@@ -419,7 +419,7 @@ public class MSQWindowTest extends MSQTestBase
                                            .build();
 
 
-    final WindowFrame theFrame = new WindowFrame(WindowFrame.PeerType.ROWS, 
true, 0, true, 0, null);
+    final WindowFrame theFrame = WindowFrame.unbounded();
     final AggregatorFactory[] theAggs = {
         new DoubleSumAggregatorFactory("w0", "d0")
     };
@@ -523,7 +523,7 @@ public class MSQWindowTest extends MSQTestBase
                                            .build();
 
 
-    final WindowFrame theFrame = new WindowFrame(WindowFrame.PeerType.ROWS, 
true, 0, true, 0, null);
+    final WindowFrame theFrame = WindowFrame.unbounded();
     final AggregatorFactory[] theAggs = {
         new DoubleSumAggregatorFactory("w0", "d0")
     };
@@ -589,7 +589,7 @@ public class MSQWindowTest extends MSQTestBase
                                             .add("cc", ColumnType.DOUBLE)
                                             .build();
 
-    final WindowFrame theFrame = new WindowFrame(WindowFrame.PeerType.ROWS, 
true, 0, true, 0, null);
+    final WindowFrame theFrame = WindowFrame.unbounded();
     final AggregatorFactory[] theAggs = {
         new DoubleSumAggregatorFactory("w0", "m1")
     };
@@ -654,7 +654,7 @@ public class MSQWindowTest extends MSQTestBase
                                             .add("cc", ColumnType.DOUBLE)
                                             .build();
 
-    final WindowFrame theFrame = new WindowFrame(WindowFrame.PeerType.ROWS, 
true, 0, true, 0, null);
+    final WindowFrame theFrame = WindowFrame.unbounded();
     final AggregatorFactory[] theAggs = {
         new DoubleSumAggregatorFactory("w0", "m1")
     };
@@ -725,7 +725,7 @@ public class MSQWindowTest extends MSQTestBase
                                             .add("cc", ColumnType.DOUBLE)
                                             .build();
 
-    final WindowFrame theFrame = new WindowFrame(WindowFrame.PeerType.ROWS, 
true, 0, true, 0, null);
+    final WindowFrame theFrame = WindowFrame.unbounded();
     final AggregatorFactory[] theAggs = {
         new DoubleSumAggregatorFactory("w0", "m1")
     };
@@ -793,7 +793,7 @@ public class MSQWindowTest extends MSQTestBase
                                             .add("cc", ColumnType.DOUBLE)
                                             .build();
 
-    final WindowFrame theFrame = new WindowFrame(WindowFrame.PeerType.ROWS, 
true, 0, true, 0, null);
+    final WindowFrame theFrame = WindowFrame.unbounded();
     final AggregatorFactory[] theAggs = {
         new DoubleSumAggregatorFactory("w0", "d1")
     };
@@ -878,7 +878,7 @@ public class MSQWindowTest extends MSQTestBase
                                             .add("cc", ColumnType.DOUBLE)
                                             .build();
 
-    final WindowFrame theFrame = new WindowFrame(WindowFrame.PeerType.ROWS, 
true, 0, true, 0, null);
+    final WindowFrame theFrame = WindowFrame.unbounded();
     final AggregatorFactory[] theAggs = {
         new DoubleSumAggregatorFactory("w0", "m1")
     };
@@ -951,7 +951,7 @@ public class MSQWindowTest extends MSQTestBase
                                             .add("cc", ColumnType.DOUBLE)
                                             .build();
 
-    final WindowFrame theFrame = new WindowFrame(WindowFrame.PeerType.ROWS, 
true, 0, true, 0, null);
+    final WindowFrame theFrame = WindowFrame.unbounded();
     final AggregatorFactory[] theAggs = {
         new DoubleSumAggregatorFactory("w0", "m1")
     };
@@ -1028,17 +1028,7 @@ public class MSQWindowTest extends MSQTestBase
                                             .add("m2", ColumnType.DOUBLE)
                                             .build();
 
-    final WindowFrame theFrame = new WindowFrame(
-        WindowFrame.PeerType.RANGE,
-        true,
-        0,
-        false,
-        0,
-        ImmutableList.of(new ColumnWithDirection(
-            "m1",
-            ColumnWithDirection.Direction.ASC
-        ))
-    );
+    final WindowFrame theFrame = WindowFrame.forOrderBy("m1");
     final AggregatorFactory[] theAggs = {
         new DoubleSumAggregatorFactory("w0", "m1")
     };
@@ -1142,14 +1132,7 @@ public class MSQWindowTest extends MSQTestBase
                                             .add("m2", ColumnType.DOUBLE)
                                             .build();
 
-    final WindowFrame theFrame = new WindowFrame(
-        WindowFrame.PeerType.ROWS,
-        true,
-        0,
-        true,
-        0,
-        null
-    );
+    final WindowFrame theFrame = WindowFrame.unbounded();
     final AggregatorFactory[] theAggs = {
         new DoubleSumAggregatorFactory("w0", "m1")
     };
@@ -1233,7 +1216,7 @@ public class MSQWindowTest extends MSQTestBase
                                             .add("cc", ColumnType.DOUBLE)
                                             .build();
 
-    final WindowFrame theFrame = new WindowFrame(WindowFrame.PeerType.ROWS, 
true, 0, true, 0, null);
+    final WindowFrame theFrame = WindowFrame.unbounded();
     final AggregatorFactory[] theAggs = {
         new DoubleSumAggregatorFactory("w0", "m1")
     };
@@ -1322,14 +1305,7 @@ public class MSQWindowTest extends MSQTestBase
                                             .add("d3", ColumnType.STRING)
                                             .build();
 
-    final WindowFrame theFrame = new WindowFrame(
-        WindowFrame.PeerType.ROWS,
-        true,
-        0,
-        true,
-        0,
-        null
-    );
+    final WindowFrame theFrame = WindowFrame.unbounded();
     final AggregatorFactory[] theAggs = {
         new DoubleSumAggregatorFactory("w0", "m1")
     };
@@ -1412,14 +1388,7 @@ public class MSQWindowTest extends MSQTestBase
                                             .add("d3", ColumnType.STRING)
                                             .build();
 
-    final WindowFrame theFrame = new WindowFrame(
-        WindowFrame.PeerType.ROWS,
-        true,
-        0,
-        true,
-        0,
-        null
-    );
+    final WindowFrame theFrame = WindowFrame.unbounded();
     final AggregatorFactory[] theAggs = {
         new DoubleSumAggregatorFactory("w0", "m1")
     };
@@ -1796,7 +1765,7 @@ public class MSQWindowTest extends MSQTestBase
                                             .add("cc", ColumnType.LONG)
                                             .build();
 
-    final WindowFrame theFrame = new WindowFrame(WindowFrame.PeerType.ROWS, 
true, 0, true, 0, null);
+    final WindowFrame theFrame = WindowFrame.unbounded();
     final AggregatorFactory[] theAggs = {
         new LongSumAggregatorFactory("w0", "added")
     };
@@ -1887,7 +1856,7 @@ public class MSQWindowTest extends MSQTestBase
                                             .add("cc", ColumnType.LONG)
                                             .build();
 
-    final WindowFrame theFrame = new WindowFrame(WindowFrame.PeerType.ROWS, 
true, 0, true, 0, null);
+    final WindowFrame theFrame = WindowFrame.unbounded();
     final AggregatorFactory[] theAggs = {
         new LongSumAggregatorFactory("w0", "added")
     };
@@ -2001,7 +1970,7 @@ public class MSQWindowTest extends MSQTestBase
                                            .build();
 
 
-    final WindowFrame theFrame = new WindowFrame(WindowFrame.PeerType.ROWS, 
true, 0, true, 0, null);
+    final WindowFrame theFrame = WindowFrame.unbounded();
     final AggregatorFactory[] theAggs = {
         new LongSumAggregatorFactory("w0", "d1")
     };
diff --git 
a/processing/src/main/java/org/apache/druid/query/operator/window/WindowFrame.java
 
b/processing/src/main/java/org/apache/druid/query/operator/window/WindowFrame.java
index fca50c25b28..2dd827d323e 100644
--- 
a/processing/src/main/java/org/apache/druid/query/operator/window/WindowFrame.java
+++ 
b/processing/src/main/java/org/apache/druid/query/operator/window/WindowFrame.java
@@ -21,159 +21,192 @@ package org.apache.druid.query.operator.window;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
-import org.apache.druid.query.operator.ColumnWithDirection;
+import org.apache.druid.annotations.SubclassesMustOverrideEqualsAndHashCode;
+
+import javax.annotation.Nullable;
 
-import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
-import java.util.stream.Collectors;
 
-public class WindowFrame
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
+@JsonSubTypes(value = {
+    @JsonSubTypes.Type(name = "rows", value = WindowFrame.Rows.class),
+    @JsonSubTypes.Type(name = "groups", value = WindowFrame.Groups.class),
+})
+@SubclassesMustOverrideEqualsAndHashCode
+public interface WindowFrame
 {
-  public static WindowFrame unbounded()
+  static WindowFrame unbounded()
   {
-    return new WindowFrame(PeerType.ROWS, true, 0, true, 0, null);
+    return rows(null, null);
   }
 
-  @SuppressWarnings("unused")
-  public enum PeerType
+  static Rows rows(Integer lowerOffset, Integer upperOffset)
   {
-    ROWS,
-    RANGE
+    return new WindowFrame.Rows(lowerOffset, upperOffset);
   }
 
-  // Will likely need to add the order by columns to also be able to deal with 
RANGE peer type.
-  private final PeerType peerType;
-  private final boolean lowerUnbounded;
-  private final int lowerOffset;
-  private final boolean upperUnbounded;
-  private final int upperOffset;
-  private final List<ColumnWithDirection> orderBy;
-
-  @JsonCreator
-  public WindowFrame(
-      @JsonProperty("peerType") PeerType peerType,
-      @JsonProperty("lowUnbounded") boolean lowerUnbounded,
-      @JsonProperty("lowOffset") int lowerOffset,
-      @JsonProperty("uppUnbounded") boolean upperUnbounded,
-      @JsonProperty("uppOffset") int upperOffset,
-      @JsonProperty("orderBy") List<ColumnWithDirection> orderBy
-  )
+  static Groups groups(Integer lowerOffset, Integer upperOffset, List<String> 
orderByColumns)
   {
-    this.peerType = peerType;
-    this.lowerUnbounded = lowerUnbounded;
-    this.lowerOffset = lowerOffset;
-    this.upperUnbounded = upperUnbounded;
-    this.upperOffset = upperOffset;
-    this.orderBy = orderBy;
+    return new WindowFrame.Groups(lowerOffset, upperOffset, orderByColumns);
   }
 
-  @JsonProperty("peerType")
-  public PeerType getPeerType()
+  static WindowFrame forOrderBy(String... orderByColumns)
   {
-    return peerType;
+    return groups(null, 0, Lists.newArrayList(orderByColumns));
   }
 
-  @JsonProperty("lowUnbounded")
-  public boolean isLowerUnbounded()
+  abstract class OffsetFrame implements WindowFrame
   {
-    return lowerUnbounded;
-  }
+    @JsonProperty
+    public final Integer lowerOffset;
+    @JsonProperty
+    public final Integer upperOffset;
 
-  @JsonProperty("lowOffset")
-  public int getLowerOffset()
-  {
-    return lowerOffset;
-  }
+    @JsonCreator
+    public OffsetFrame(
+        @JsonProperty("lowerOffset") Integer lowerOffset,
+        @JsonProperty("upperOffset") Integer upperOffset)
+    {
+      this.lowerOffset = lowerOffset;
+      this.upperOffset = upperOffset;
+    }
 
-  @JsonProperty("uppUnbounded")
-  public boolean isUpperUnbounded()
-  {
-    return upperUnbounded;
-  }
+    @Override
+    public int hashCode()
+    {
+      return Objects.hash(lowerOffset, upperOffset);
+    }
 
-  @JsonProperty("uppOffset")
-  public int getUpperOffset()
-  {
-    return upperOffset;
-  }
+    /**
+     * Calculates the applicable lower offset if the max number of rows is
+     * known.
+     */
+    public int getLowerOffsetClamped(int maxRows)
+    {
+      if (lowerOffset == null) {
+        return -maxRows;
+      }
+      return Math.max(-maxRows, lowerOffset);
+    }
 
-  @JsonProperty("orderBy")
-  public List<ColumnWithDirection> getOrderBy()
-  {
-    return orderBy;
+    /**
+     * Calculates the applicable upper offset if the max number of rows is
+     * known.
+     */
+    public int getUpperOffsetClamped(int maxRows)
+    {
+      if (upperOffset == null) {
+        return maxRows;
+      }
+      return Math.min(maxRows, upperOffset);
+    }
+
+    @Override
+    public boolean equals(Object obj)
+    {
+      if (this == obj) {
+        return true;
+      }
+      if (obj == null) {
+        return false;
+      }
+      if (getClass() != obj.getClass()) {
+        return false;
+      }
+      OffsetFrame other = (OffsetFrame) obj;
+      return Objects.equals(lowerOffset, other.lowerOffset) && 
Objects.equals(upperOffset, other.upperOffset);
+    }
+
+    @Override
+    public abstract String toString();
   }
 
-  @Override
-  public boolean equals(Object o)
+  class Rows extends OffsetFrame
   {
-    if (this == o) {
-      return true;
+    @JsonCreator
+    public Rows(
+        @JsonProperty("lowerOffset") Integer lowerOffset,
+        @JsonProperty("upperOffset") Integer upperOffset)
+    {
+      super(lowerOffset, upperOffset);
     }
-    if (!(o instanceof WindowFrame)) {
-      return false;
+
+    @Override
+    public String toString()
+    {
+      return "WindowFrame.Rows ["
+          + "lowerOffset=" + lowerOffset +
+          ", upperOffset=" + upperOffset +
+          "]";
     }
-    WindowFrame that = (WindowFrame) o;
-    return lowerUnbounded == that.lowerUnbounded
-           && lowerOffset == that.lowerOffset
-           && upperUnbounded == that.upperUnbounded
-           && upperOffset == that.upperOffset
-           && peerType == that.peerType
-           && Objects.equals(orderBy, that.orderBy);
   }
 
-  @Override
-  public int hashCode()
+  class Groups extends OffsetFrame
   {
-    return Objects.hash(peerType, lowerUnbounded, lowerOffset, upperUnbounded, 
upperOffset, orderBy);
-  }
+    @JsonProperty
+    private final ImmutableList<String> orderByColumns;
 
-  @Override
-  public String toString()
-  {
-    return "WindowFrame{" +
-           "peerType=" + peerType +
-           ", lowerUnbounded=" + lowerUnbounded +
-           ", lowerOffset=" + lowerOffset +
-           ", upperUnbounded=" + upperUnbounded +
-           ", upperOffset=" + upperOffset +
-           ", orderBy=" + orderBy +
-           '}';
-  }
+    @JsonCreator
+    public Groups(
+        @JsonProperty("lowerOffset") Integer lowerOffset,
+        @JsonProperty("upperOffset") Integer upperOffset,
+        @JsonProperty("orderByColumns") List<String> orderByColumns)
+    {
+      super(lowerOffset, upperOffset);
+      this.orderByColumns = ImmutableList.copyOf(orderByColumns);
+    }
 
-  public static WindowFrame forOrderBy(ColumnWithDirection... orderBy)
-  {
-    return new WindowFrame(PeerType.RANGE, true, 0, false, 0, 
Lists.newArrayList(orderBy));
-  }
+    public List<String> getOrderByColumns()
+    {
+      return orderByColumns;
+    }
 
-  public List<String> getOrderByColNames()
-  {
-    if (orderBy == null) {
-      return Collections.emptyList();
+    @Override
+    public int hashCode()
+    {
+      return Objects.hash(lowerOffset, orderByColumns, upperOffset);
     }
-    return 
orderBy.stream().map(ColumnWithDirection::getColumn).collect(Collectors.toList());
-  }
 
-  /**
-   * Calculates the applicable lower offset if the max number of rows is known.
-   */
-  public int getLowerOffsetClamped(int maxRows)
-  {
-    if (lowerUnbounded) {
-      return -maxRows;
+    @Override
+    public boolean equals(Object obj)
+    {
+      if (this == obj) {
+        return true;
+      }
+      if (obj == null) {
+        return false;
+      }
+      if (getClass() != obj.getClass()) {
+        return false;
+      }
+      Groups other = (Groups) obj;
+      return Objects.equals(lowerOffset, other.lowerOffset)
+          && Objects.equals(orderByColumns, other.orderByColumns)
+          && Objects.equals(upperOffset, other.upperOffset);
+    }
+
+    @Override
+    public String toString()
+    {
+      return "WindowFrame.Groups [" +
+          "lowerOffset=" + lowerOffset +
+          ", upperOffset=" + upperOffset +
+          ", orderByColumns=" + orderByColumns + "]";
     }
-    return Math.max(-maxRows, lowerOffset);
   }
 
-  /**
-   * Calculates the applicable upper offset if the max number of rows is known.
-   */
-  public int getUpperOffsetClamped(int maxRows)
+  @SuppressWarnings("unchecked")
+  @Nullable
+  default <T extends WindowFrame> T unwrap(Class<T> clazz)
   {
-    if (upperUnbounded) {
-      return maxRows;
+    if (clazz.isInstance(this)) {
+      return (T) this;
     }
-    return Math.min(maxRows, upperOffset);
+    return null;
   }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java
 
b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java
index 83952873050..7130fafd867 100644
--- 
a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java
+++ 
b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java
@@ -21,6 +21,7 @@ package org.apache.druid.query.rowsandcols.semantic;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import org.apache.druid.error.DruidException;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.UOE;
 import org.apache.druid.query.aggregation.Aggregator;
@@ -28,6 +29,9 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.dimension.DimensionSpec;
 import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
 import org.apache.druid.query.operator.window.WindowFrame;
+import org.apache.druid.query.operator.window.WindowFrame.Groups;
+import org.apache.druid.query.operator.window.WindowFrame.OffsetFrame;
+import org.apache.druid.query.operator.window.WindowFrame.Rows;
 import org.apache.druid.query.rowsandcols.RowsAndColumns;
 import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn;
 import org.apache.druid.segment.ColumnSelectorFactory;
@@ -106,22 +110,38 @@ public class DefaultFramedOnHeapAggregatable implements 
FramedOnHeapAggregatable
   public static Iterable<AggInterval> 
buildIteratorFor(AppendableRowsAndColumns rac, WindowFrame frame)
   {
     int numRows = rac.numRows();
-    if (frame.getLowerOffsetClamped(numRows) == -numRows && 
frame.getUpperOffsetClamped(numRows) == numRows) {
-      return buildUnboundedIteratorFor(rac, frame);
-    } else if (frame.getPeerType() == WindowFrame.PeerType.RANGE) {
-      return buildGroupIteratorFor(rac, frame);
-    } else {
-      return buildRowIteratorFor(rac, frame);
+    if (isEffectivelyUnbounded(frame, numRows)) {
+      return buildUnboundedIteratorFor(rac);
     }
+    Rows rowsFrame = frame.unwrap(WindowFrame.Rows.class);
+    if (rowsFrame != null) {
+      return buildRowIteratorFor(rac, rowsFrame);
+    }
+    Groups groupsFrame = frame.unwrap(WindowFrame.Groups.class);
+    if (groupsFrame != null) {
+      return buildGroupIteratorFor(rac, groupsFrame);
+    }
+    throw DruidException.defensive("Unable to handle WindowFrame [%s]!", 
frame);
   }
 
-  private static Iterable<AggInterval> 
buildUnboundedIteratorFor(AppendableRowsAndColumns rac, WindowFrame frame)
+  private static boolean isEffectivelyUnbounded(WindowFrame frame, int numRows)
   {
-    int[] groupBoundaries = new int[]{0, rac.numRows()};
-    return new GroupIteratorForWindowFrame(frame, groupBoundaries);
+    OffsetFrame offsetFrame = frame.unwrap(WindowFrame.OffsetFrame.class);
+    if (offsetFrame.getLowerOffsetClamped(numRows) == -numRows
+        && offsetFrame.getUpperOffsetClamped(numRows) == numRows) {
+      // regardless the actual mode; all rows will be inside the frame!
+      return true;
+    }
+    return false;
+  }
+
+  private static Iterable<AggInterval> 
buildUnboundedIteratorFor(AppendableRowsAndColumns rac)
+  {
+    int[] groupBoundaries = new int[] {0, rac.numRows()};
+    return new GroupIteratorForWindowFrame(WindowFrame.rows(null, null), 
groupBoundaries);
   }
 
-  private static Iterable<AggInterval> 
buildRowIteratorFor(AppendableRowsAndColumns rac, WindowFrame frame)
+  private static Iterable<AggInterval> 
buildRowIteratorFor(AppendableRowsAndColumns rac, WindowFrame.Rows frame)
   {
     int[] groupBoundaries = new int[rac.numRows() + 1];
     for (int j = 0; j < groupBoundaries.length; j++) {
@@ -130,9 +150,9 @@ public class DefaultFramedOnHeapAggregatable implements 
FramedOnHeapAggregatable
     return new GroupIteratorForWindowFrame(frame, groupBoundaries);
   }
 
-  private static Iterable<AggInterval> 
buildGroupIteratorFor(AppendableRowsAndColumns rac, WindowFrame frame)
+  private static Iterable<AggInterval> 
buildGroupIteratorFor(AppendableRowsAndColumns rac, WindowFrame.Groups frame)
   {
-    int[] groupBoundaries = 
ClusteredGroupPartitioner.fromRAC(rac).computeBoundaries(frame.getOrderByColNames());
+    int[] groupBoundaries = 
ClusteredGroupPartitioner.fromRAC(rac).computeBoundaries(frame.getOrderByColumns());
     return new GroupIteratorForWindowFrame(frame, groupBoundaries);
   }
 
@@ -145,7 +165,7 @@ public class DefaultFramedOnHeapAggregatable implements 
FramedOnHeapAggregatable
     // upper exclusive
     private final int upperOffset;
 
-    public GroupIteratorForWindowFrame(WindowFrame frame, int[] 
groupBoundaries)
+    public GroupIteratorForWindowFrame(WindowFrame.OffsetFrame frame, int[] 
groupBoundaries)
     {
       this.groupBoundaries = groupBoundaries;
       numGroups = groupBoundaries.length - 1;
diff --git 
a/processing/src/test/java/org/apache/druid/query/operator/window/WindowFrameTest.java
 
b/processing/src/test/java/org/apache/druid/query/operator/window/WindowFrameTest.java
new file mode 100644
index 00000000000..855f4694f43
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/druid/query/operator/window/WindowFrameTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.operator.window;
+
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.query.operator.window.WindowFrame.OffsetFrame;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class WindowFrameTest
+{
+  @Test
+  public void testEqualsRows()
+  {
+    EqualsVerifier.forClass(WindowFrame.Rows.class)
+        .usingGetClass()
+        .verify();
+  }
+
+  @Test
+  public void testEqualsGroups()
+  {
+    EqualsVerifier.forClass(WindowFrame.Groups.class)
+        .usingGetClass()
+        .verify();
+  }
+
+  @Test
+  public void testOffsetFrameUnbounded()
+  {
+    OffsetFrame of = new WindowFrame.Rows(null, null);
+    assertEquals(-100, of.getLowerOffsetClamped(100));
+    assertEquals(100, of.getUpperOffsetClamped(100));
+  }
+
+  @Test
+  public void testOffsetFrameNormal()
+  {
+    OffsetFrame of = new WindowFrame.Rows(-1, 2);
+    assertEquals(-1, of.getLowerOffsetClamped(100));
+    assertEquals(2, of.getUpperOffsetClamped(100));
+  }
+
+  @Test
+  public void testOffsetFrameUnbounded2()
+  {
+    OffsetFrame of = new WindowFrame.Rows(-200, 200);
+    assertEquals(-100, of.getLowerOffsetClamped(100));
+    assertEquals(100, of.getUpperOffsetClamped(100));
+  }
+
+}
diff --git 
a/processing/src/test/java/org/apache/druid/query/operator/window/WindowFramedAggregateProcessorTest.java
 
b/processing/src/test/java/org/apache/druid/query/operator/window/WindowFramedAggregateProcessorTest.java
index 5af321b53c8..9bae78bc2cc 100644
--- 
a/processing/src/test/java/org/apache/druid/query/operator/window/WindowFramedAggregateProcessorTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/operator/window/WindowFramedAggregateProcessorTest.java
@@ -46,7 +46,7 @@ public class WindowFramedAggregateProcessorTest
   @Test
   public void testIsPassThruWhenRACReturnsSemanticInterface()
   {
-    final WindowFrame theFrame = new WindowFrame(WindowFrame.PeerType.ROWS, 
true, 0, false, 0, null);
+    final WindowFrame theFrame = WindowFrame.rows(null, 0);
     final AggregatorFactory[] theAggs = {
         new LongMaxAggregatorFactory("cummMax", "intCol"),
         new DoubleSumAggregatorFactory("cummSum", "doubleCol")
@@ -78,7 +78,7 @@ public class WindowFramedAggregateProcessorTest
   @Test
   public void testDoesStuffWhenNoSemanticInterfacesAvailable()
   {
-    final WindowFrame theFrame = new WindowFrame(WindowFrame.PeerType.ROWS, 
true, 0, false, 0, null);
+    final WindowFrame theFrame = WindowFrame.rows(null, 0);
     final AggregatorFactory[] theAggs = {
         new LongSumAggregatorFactory("sum", "intCol")
     };
diff --git 
a/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/FramedOnHeapAggregatableTest.java
 
b/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/FramedOnHeapAggregatableTest.java
index d5b11f7a612..41ceb315a04 100644
--- 
a/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/FramedOnHeapAggregatableTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/FramedOnHeapAggregatableTest.java
@@ -25,10 +25,8 @@ import 
org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
 import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
 import org.apache.druid.query.aggregation.LongMinAggregatorFactory;
 import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
-import org.apache.druid.query.operator.ColumnWithDirection;
 import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
 import org.apache.druid.query.operator.window.WindowFrame;
-import org.apache.druid.query.operator.window.WindowFrame.PeerType;
 import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
 import org.apache.druid.query.rowsandcols.RowsAndColumns;
 import org.apache.druid.query.rowsandcols.column.Column;
@@ -65,7 +63,7 @@ public class FramedOnHeapAggregatableTest extends 
SemanticTestBase
     FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);
 
     final RowsAndColumns results = agger.aggregateAll(
-        new WindowFrame(WindowFrame.PeerType.ROWS, false, 0, false, 0, null),
+        WindowFrame.rows(0, 0),
         new AggregatorFactory[]{
             new LongSumAggregatorFactory("sumFromLong", "intCol"),
             new DoubleMaxAggregatorFactory("maxFromInt", "intCol"),
@@ -91,7 +89,7 @@ public class FramedOnHeapAggregatableTest extends 
SemanticTestBase
     FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);
 
     final RowsAndColumns results = agger.aggregateAll(
-        new WindowFrame(WindowFrame.PeerType.ROWS, false, -1, false, 2, null),
+        WindowFrame.rows(-1, 2),
         new AggregatorFactory[]{
             new LongSumAggregatorFactory("sumFromLong", "intCol"),
             new DoubleMaxAggregatorFactory("maxFromInt", "intCol"),
@@ -117,7 +115,7 @@ public class FramedOnHeapAggregatableTest extends 
SemanticTestBase
     FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);
 
     final RowsAndColumns results = agger.aggregateAll(
-        new WindowFrame(WindowFrame.PeerType.ROWS, false, 0, false, 2, null),
+        WindowFrame.rows(0, 2),
         new AggregatorFactory[]{
             new LongSumAggregatorFactory("sumFromLong", "intCol"),
             new DoubleMaxAggregatorFactory("maxFromInt", "intCol"),
@@ -143,7 +141,7 @@ public class FramedOnHeapAggregatableTest extends 
SemanticTestBase
     FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);
 
     final RowsAndColumns results = agger.aggregateAll(
-        new WindowFrame(WindowFrame.PeerType.ROWS, false, -2, false, 0, null),
+        WindowFrame.rows(-2, 0),
         new AggregatorFactory[]{
             new LongSumAggregatorFactory("sumFromLong", "intCol"),
             new DoubleMaxAggregatorFactory("maxFromInt", "intCol"),
@@ -169,7 +167,7 @@ public class FramedOnHeapAggregatableTest extends 
SemanticTestBase
     FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);
 
     final RowsAndColumns results = agger.aggregateAll(
-        new WindowFrame(WindowFrame.PeerType.ROWS, false, -5, false, 7, null),
+        WindowFrame.rows(-5, 7),
         new AggregatorFactory[]{
             new LongSumAggregatorFactory("sumFromLong", "intCol"),
             new DoubleMaxAggregatorFactory("maxFromInt", "intCol"),
@@ -197,7 +195,7 @@ public class FramedOnHeapAggregatableTest extends 
SemanticTestBase
     FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);
 
     final RowsAndColumns results = agger.aggregateAll(
-        new WindowFrame(WindowFrame.PeerType.ROWS, false, -5, false, 1, null),
+        WindowFrame.rows(-5, 1),
         new AggregatorFactory[]{
             new LongSumAggregatorFactory("sumFromLong", "intCol"),
             new DoubleMaxAggregatorFactory("maxFromInt", "intCol"),
@@ -225,7 +223,7 @@ public class FramedOnHeapAggregatableTest extends 
SemanticTestBase
     FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);
 
     final RowsAndColumns results = agger.aggregateAll(
-        new WindowFrame(WindowFrame.PeerType.ROWS, false, -5, false, 0, null),
+        WindowFrame.rows(-5, 0),
         new AggregatorFactory[]{
             new LongSumAggregatorFactory("sumFromLong", "intCol"),
             new DoubleMaxAggregatorFactory("maxFromInt", "intCol"),
@@ -253,7 +251,7 @@ public class FramedOnHeapAggregatableTest extends 
SemanticTestBase
     FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);
 
     final RowsAndColumns results = agger.aggregateAll(
-        new WindowFrame(WindowFrame.PeerType.ROWS, false, -1, false, 7, null),
+        WindowFrame.rows(-1, 7),
         new AggregatorFactory[]{
             new LongSumAggregatorFactory("sumFromLong", "intCol"),
             new DoubleMaxAggregatorFactory("maxFromInt", "intCol"),
@@ -281,7 +279,7 @@ public class FramedOnHeapAggregatableTest extends 
SemanticTestBase
     FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);
 
     final RowsAndColumns results = agger.aggregateAll(
-        new WindowFrame(WindowFrame.PeerType.ROWS, false, 0, false, 7, null),
+        WindowFrame.rows(0, 7),
         new AggregatorFactory[]{
             new LongSumAggregatorFactory("sumFromLong", "intCol"),
             new DoubleMaxAggregatorFactory("maxFromInt", "intCol"),
@@ -309,7 +307,7 @@ public class FramedOnHeapAggregatableTest extends 
SemanticTestBase
     FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);
 
     final RowsAndColumns results = agger.aggregateAll(
-        new WindowFrame(WindowFrame.PeerType.ROWS, false, 0, false, 7, null),
+        WindowFrame.rows(0, 7),
         new AggregatorFactory[]{
             new LongSumAggregatorFactory("sumFromLong", "intCol"),
             new DoubleMaxAggregatorFactory("maxFromInt", "intCol"),
@@ -337,7 +335,7 @@ public class FramedOnHeapAggregatableTest extends 
SemanticTestBase
     FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);
 
     final RowsAndColumns results = agger.aggregateAll(
-        new WindowFrame(WindowFrame.PeerType.ROWS, false, -5, false, 0, null),
+        WindowFrame.rows(-5, 0),
         new AggregatorFactory[]{
             new LongSumAggregatorFactory("sumFromLong", "intCol"),
             new DoubleMaxAggregatorFactory("maxFromInt", "intCol"),
@@ -371,7 +369,7 @@ public class FramedOnHeapAggregatableTest extends 
SemanticTestBase
     FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);
 
     final RowsAndColumns results = agger.aggregateAll(
-        new WindowFrame(WindowFrame.PeerType.ROWS, true, 0, true, 0, null),
+        WindowFrame.unbounded(),
         new AggregatorFactory[]{
             new LongSumAggregatorFactory("sumFromLong", "intCol"),
             new LongSumAggregatorFactory("sumFromDouble", "doubleCol"),
@@ -409,7 +407,7 @@ public class FramedOnHeapAggregatableTest extends 
SemanticTestBase
     FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);
 
     final RowsAndColumns results = agger.aggregateAll(
-        new WindowFrame(WindowFrame.PeerType.ROWS, true, 0, false, 0, null),
+        WindowFrame.rows(null, 0),
         new AggregatorFactory[]{
             new LongMaxAggregatorFactory("cummMax", "intCol"),
             new DoubleSumAggregatorFactory("cummSum", "doubleCol")
@@ -443,7 +441,7 @@ public class FramedOnHeapAggregatableTest extends 
SemanticTestBase
     FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);
 
     final RowsAndColumns results = agger.aggregateAll(
-        new WindowFrame(WindowFrame.PeerType.ROWS, false, 0, true, 0, null),
+        WindowFrame.rows(0, null),
         new AggregatorFactory[]{
             new LongMaxAggregatorFactory("cummMax", "intCol"),
             new DoubleSumAggregatorFactory("cummSum", "doubleCol")
@@ -465,7 +463,7 @@ public class FramedOnHeapAggregatableTest extends 
SemanticTestBase
   @Test
   public void testRangeOrderBy()
   {
-    WindowFrame frame = 
WindowFrame.forOrderBy(ColumnWithDirection.ascending("c1"));
+    WindowFrame frame = WindowFrame.forOrderBy("c1");
     int[] c1Vals = new int[] {0, 0, 0, 1, 1, 1, 2, 2, 2, 2};
     int[] c2Vals = new int[] {1, 1, 2, 1, 1, 2, 1, 1, 1, 2};
     int[] resVals = new int[] {4, 4, 4, 8, 8, 8, 13, 13, 13, 13};
@@ -476,14 +474,7 @@ public class FramedOnHeapAggregatableTest extends 
SemanticTestBase
   @Test
   public void testRangeB1()
   {
-    WindowFrame frame = new WindowFrame(
-        PeerType.RANGE,
-        false,
-        -1,
-        false,
-        0,
-        Collections.singletonList(ColumnWithDirection.ascending("c1"))
-    );
+    WindowFrame frame = WindowFrame.groups(-1, 0, 
Collections.singletonList("c1"));
 
     int[] c1Vals = new int[] {0, 1, 2, 2, 3, 4, 5};
     int[] c2Vals = new int[] {0, 1, 1, 1, 3, 4, 5};
@@ -495,14 +486,7 @@ public class FramedOnHeapAggregatableTest extends 
SemanticTestBase
   @Test
   public void testRangeA1()
   {
-    WindowFrame frame = new WindowFrame(
-        PeerType.RANGE,
-        false,
-        0,
-        false,
-        1,
-        Collections.singletonList(ColumnWithDirection.ascending("c1"))
-    );
+    WindowFrame frame = WindowFrame.groups(0, 1, 
Collections.singletonList("c1"));
 
     int[] c1Vals = new int[] {0, 1, 2, 2, 3, 4, 5};
     int[] c2Vals = new int[] {0, 1, 1, 1, 3, 4, 5};
@@ -514,14 +498,7 @@ public class FramedOnHeapAggregatableTest extends 
SemanticTestBase
   @Test
   public void testRangeB1A1()
   {
-    WindowFrame frame = new WindowFrame(
-        PeerType.RANGE,
-        false,
-        -1,
-        false,
-        1,
-        Collections.singletonList(ColumnWithDirection.ascending("c1"))
-    );
+    WindowFrame frame = WindowFrame.groups(-1, 1, 
Collections.singletonList("c1"));
 
     int[] c1Vals = new int[] {0, 1, 2, 3, 4, 5};
     int[] c2Vals = new int[] {0, 1, 2, 3, 4, 5};
@@ -534,14 +511,7 @@ public class FramedOnHeapAggregatableTest extends 
SemanticTestBase
   @Test
   public void testRangeB1A1_2()
   {
-    WindowFrame frame = new WindowFrame(
-        PeerType.RANGE,
-        false,
-        -1,
-        false,
-        1,
-        Collections.singletonList(ColumnWithDirection.ascending("c1"))
-    );
+    WindowFrame frame = WindowFrame.groups(-1, 1, 
Collections.singletonList("c1"));
 
     int[] c1Vals = new int[] {0, 0, 1, 2, 3, 3, 4, 4, 5};
     int[] c2Vals = new int[] {0, 0, 1, 2, 2, 1, 2, 2, 5};
@@ -553,14 +523,7 @@ public class FramedOnHeapAggregatableTest extends 
SemanticTestBase
   @Test
   public void testRangeB1A2()
   {
-    WindowFrame frame = new WindowFrame(
-        PeerType.RANGE,
-        false,
-        -1,
-        false,
-        2,
-        Collections.singletonList(ColumnWithDirection.ascending("c1"))
-    );
+    WindowFrame frame = WindowFrame.groups(-1, 2, 
Collections.singletonList("c1"));
 
     int[] c1Vals = new int[] {0, 0, 0, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3};
     int[] c2Vals = new int[] {1, 1, 2, 1, 1, 2, 1, 1, 1, 2, 1, 1, 1};
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java
index 39c180530f0..8e1fc3ee275 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java
@@ -449,19 +449,22 @@ public class Windowing
       if (group.lowerBound.isUnbounded() && group.upperBound.isUnbounded()) {
         return WindowFrame.unbounded();
       }
-      return new WindowFrame(
-          group.isRows ? WindowFrame.PeerType.ROWS : 
WindowFrame.PeerType.RANGE,
-          group.lowerBound.isUnbounded(),
-          figureOutOffset(group.lowerBound),
-          group.upperBound.isUnbounded(),
-          figureOutOffset(group.upperBound),
-          group.isRows ? null : getOrdering()
-      );
+      if (group.isRows) {
+        return WindowFrame.rows(getBoundAsInteger(group.lowerBound), 
getBoundAsInteger(group.upperBound));
+      } else {
+        /* Right now we support GROUPS based framing in the native layer;
+         * but the SQL layer doesn't accept that as of now.
+         */
+        return WindowFrame.groups(getBoundAsInteger(group.lowerBound), 
getBoundAsInteger(group.upperBound), getOrderingColumNames());
+      }
     }
 
-    private int figureOutOffset(RexWindowBound bound)
+    private Integer getBoundAsInteger(RexWindowBound bound)
     {
-      if (bound.isUnbounded() || bound.isCurrentRow()) {
+      if (bound.isUnbounded()) {
+        return null;
+      }
+      if (bound.isCurrentRow()) {
         return 0;
       }
 
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
index 7b5749bd8a1..5ceba91eb37 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
@@ -15900,7 +15900,7 @@ public class CalciteQueryTest extends 
BaseCalciteQueryTest
                     OperatorFactoryBuilders.naivePartitionOperator(),
                     OperatorFactoryBuilders.windowOperators(
                         OperatorFactoryBuilders.framedAggregateProcessor(
-                            
WindowFrame.forOrderBy(ColumnWithDirection.ascending("d0")),
+                            WindowFrame.forOrderBy("d0"),
                             new LongSumAggregatorFactory("w0", "a0")
                         )
                     )
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java
index 9fdd73fb9c7..165b4aa3f63 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java
@@ -45,8 +45,10 @@ import org.junit.jupiter.params.provider.MethodSource;
 import java.io.File;
 import java.net.URL;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
 import java.util.Objects;
 
 import static org.junit.Assert.assertEquals;
@@ -66,6 +68,11 @@ public class CalciteWindowQueryTest extends 
BaseCalciteQueryTest
 
   private static final ObjectMapper YAML_JACKSON = new DefaultObjectMapper(new 
YAMLFactory(), "tests");
 
+  private static final Map<String, Object> DEFAULT_QUERY_CONTEXT = 
ImmutableMap.of(
+      PlannerContext.CTX_ENABLE_WINDOW_FNS, true,
+      QueryContexts.ENABLE_DEBUG, true
+  );
+
   public static Object[] parametersForWindowQueryTest() throws Exception
   {
     final URL windowFolderUrl = 
ClassLoader.getSystemResource("calcite/tests/window");
@@ -184,34 +191,17 @@ public class CalciteWindowQueryTest extends 
BaseCalciteQueryTest
         log.info("Actual results:\n%s", sb.toString());
       }
     }
-  }
-
-  @MethodSource("parametersForWindowQueryTest")
-  @ParameterizedTest(name = "{0}")
-  @SuppressWarnings("unchecked")
-  public void windowQueryTest(String filename) throws Exception
-  {
-    TestCase testCase = new TestCase(filename);
-
-    assumeTrue(testCase.getType() != TestType.failingTest);
 
-    if (testCase.getType() == TestType.operatorValidation) {
-      testBuilder()
-          .skipVectorize(true)
-          .sql(testCase.getSql())
-          .queryContext(ImmutableMap.of(
-              PlannerContext.CTX_ENABLE_WINDOW_FNS, true,
-              QueryContexts.ENABLE_DEBUG, true
-              ))
-          .addCustomVerification(QueryVerification.ofResults(testCase))
-          .run();
+    public Map<? extends String, ? extends Object> getQueryContext()
+    {
+      return input.queryContext == null ? Collections.emptyMap() : 
input.queryContext;
     }
   }
 
   @MethodSource("parametersForWindowQueryTest")
   @ParameterizedTest(name = "{0}")
   @SuppressWarnings("unchecked")
-  public void windowQueryTestWithCustomContextMaxSubqueryBytes(String 
filename) throws Exception
+  public void windowQueryTest(String filename) throws Exception
   {
     TestCase testCase = new TestCase(filename);
 
@@ -221,10 +211,11 @@ public class CalciteWindowQueryTest extends 
BaseCalciteQueryTest
       testBuilder()
           .skipVectorize(true)
           .sql(testCase.getSql())
-          .queryContext(ImmutableMap.of(QueryContexts.ENABLE_DEBUG, true,
-                                        PlannerContext.CTX_ENABLE_WINDOW_FNS, 
true,
-                                        QueryContexts.MAX_SUBQUERY_BYTES_KEY, 
"100000"
-                        )
+          .queryContext(
+              ImmutableMap.<String, Object>builder()
+                  .putAll(DEFAULT_QUERY_CONTEXT)
+                  .putAll(testCase.getQueryContext())
+                  .build()
           )
           .addCustomVerification(QueryVerification.ofResults(testCase))
           .run();
@@ -241,10 +232,7 @@ public class CalciteWindowQueryTest extends 
BaseCalciteQueryTest
              + "where countryName in ('Austria', 'Republic of Korea') "
              + "and (cityName in ('Vienna', 'Seoul') or cityName is null)\n"
              + "group by countryName, cityName, channel")
-        .queryContext(ImmutableMap.of(
-            PlannerContext.CTX_ENABLE_WINDOW_FNS, true,
-            QueryContexts.ENABLE_DEBUG, true
-        ))
+        .queryContext(DEFAULT_QUERY_CONTEXT)
         .expectedResults(
             ResultMatchMode.RELAX_NULLS,
             ImmutableList.of(
@@ -277,9 +265,13 @@ public class CalciteWindowQueryTest extends 
BaseCalciteQueryTest
       failingTest,
       operatorValidation
     }
+
     @JsonProperty
     public TestType type;
 
+    @JsonProperty
+    public Map<String, String> queryContext;
+
     @JsonProperty
     public String sql;
 
diff --git 
a/sql/src/test/resources/calcite/tests/window/WindowOpReorder.sqlTest 
b/sql/src/test/resources/calcite/tests/window/WindowOpReorder.sqlTest
index 0c9d88b5041..a2f82ff2905 100644
--- a/sql/src/test/resources/calcite/tests/window/WindowOpReorder.sqlTest
+++ b/sql/src/test/resources/calcite/tests/window/WindowOpReorder.sqlTest
@@ -15,13 +15,7 @@ expectedOperators:
   - type: "window"
     processor:
       type: "framedAgg"
-      frame:
-        peerType: "ROWS"
-        lowUnbounded: true
-        lowOffset: 0
-        uppUnbounded: true
-        uppOffset: 0
-        orderBy: null
+      frame: { type: "rows" }
       aggregations:
         - { "type": "doubleSum", "name": "w1", "fieldName": "_d1" }
   - type: "naiveSort"
@@ -33,13 +27,7 @@ expectedOperators:
   - type: "window"
     processor:
       type: "framedAgg"
-      frame:
-        peerType: "ROWS"
-        lowUnbounded: true
-        lowOffset: 0
-        uppUnbounded: true
-        uppOffset: 0
-        orderBy: null
+      frame: { type: "rows" }
       aggregations:
         - { "type": "doubleSum", "name": "w0", "fieldName": "_d0" }
 expectedResults:
diff --git 
a/sql/src/test/resources/calcite/tests/window/aggregateConstant.sqlTest 
b/sql/src/test/resources/calcite/tests/window/aggregateConstant.sqlTest
index 16dbe924fdb..e65e27e8794 100644
--- a/sql/src/test/resources/calcite/tests/window/aggregateConstant.sqlTest
+++ b/sql/src/test/resources/calcite/tests/window/aggregateConstant.sqlTest
@@ -13,7 +13,7 @@ expectedOperators:
   - type: "window"
     processor:
       type: "framedAgg"
-      frame: { peerType: "ROWS", lowUnbounded: true, lowOffset: 0, 
uppUnbounded: true, uppOffset: 0 }
+      frame: { type: "rows" }
       aggregations:
         - { type: "count", name: "w0" }
 
diff --git 
a/sql/src/test/resources/calcite/tests/window/defaultBoundCurrentRow.sqlTest 
b/sql/src/test/resources/calcite/tests/window/defaultBoundCurrentRow.sqlTest
index d5a324c9e2d..aa0a4a2a019 100644
--- a/sql/src/test/resources/calcite/tests/window/defaultBoundCurrentRow.sqlTest
+++ b/sql/src/test/resources/calcite/tests/window/defaultBoundCurrentRow.sqlTest
@@ -19,60 +19,43 @@ expectedOperators:
     processor:
       type: "framedAgg"
       frame:
-          peerType: "ROWS"
-          lowUnbounded: true
-          lowOffset: 0
-          uppUnbounded: false
-          uppOffset: 0
-          orderBy: null
+          type: "rows"
+          upperOffset: 0
       aggregations:
         - { type: "count", name: "w0" }
   - type: "window"
     processor:
       type: "framedAgg"
       frame:
-          peerType: "ROWS"
-          lowUnbounded: false
-          lowOffset: -1
-          uppUnbounded: false
-          uppOffset: 0
-          orderBy: null
+          type: "rows"
+          lowerOffset: -1
+          upperOffset: 0
       aggregations:
         - { type: "count", name: "w1" }
   - type: "window"
     processor:
       type: "framedAgg"
       frame:
-          peerType: "ROWS"
-          lowUnbounded: false
-          lowOffset: 0
-          uppUnbounded: false
-          uppOffset: 0
-          orderBy: null
+          type: "rows"
+          lowerOffset: 0
+          upperOffset: 0
       aggregations:
         - { type: "count", name: "w2" }
   - type: "window"
     processor:
       type: "framedAgg"
       frame:
-          peerType: "ROWS"
-          lowUnbounded: false
-          lowOffset: 0
-          uppUnbounded: false
-          uppOffset: 1
-          orderBy: null
+          type: "rows"
+          lowerOffset: 0
+          upperOffset: 1
       aggregations:
         - { type: "count", name: "w3" }
   - type: "window"
     processor:
       type: "framedAgg"
       frame:
-          peerType: "ROWS"
-          lowUnbounded: false
-          lowOffset: 0
-          uppUnbounded: true
-          uppOffset: 0
-          orderBy: null
+          type: "rows"
+          lowerOffset: 0
       aggregations:
         - { type: "count", name: "w4" }
 
diff --git a/sql/src/test/resources/calcite/tests/window/no_grouping.sqlTest 
b/sql/src/test/resources/calcite/tests/window/no_grouping.sqlTest
index 7c9dae4aad3..2b6f7f7fddb 100644
--- a/sql/src/test/resources/calcite/tests/window/no_grouping.sqlTest
+++ b/sql/src/test/resources/calcite/tests/window/no_grouping.sqlTest
@@ -1,9 +1,9 @@
-type: "failingTest"
+type: "operatorValidation"
 
 sql: |
   SELECT
     m1,
-    COUNT(m1) OVER () cc
+    SUM(m1) OVER () cc
   FROM druid.foo
 
 expectedOperators:
@@ -12,18 +12,16 @@ expectedOperators:
   - type: "window"
     processor:
       type: "framedAgg"
-      frame: { peerType: "ROWS", lowUnbounded: true, lowOffset: 0, 
uppUnbounded: true, uppOffset: 0 }
+      frame: { type: "rows" }
       aggregations:
-        - type: "filtered"
-          aggregator: {"type":"count","name":"w0"}
-          filter:
-            type: not
-            field: {"type":"null","column":"m1"}
-          name: null
+        - type: doubleSum
+          name: w0
+          fieldName: m1
+
 expectedResults:
-  - [1.0,6]
-  - [2.0,6]
-  - [3.0,6]
-  - [4.0,6]
-  - [5.0,6]
-  - [6.0,6]
+  - [1.0,21.0]
+  - [2.0,21.0]
+  - [3.0,21.0]
+  - [4.0,21.0]
+  - [5.0,21.0]
+  - [6.0,21.0]
diff --git a/sql/src/test/resources/calcite/tests/window/no_grouping2.sqlTest 
b/sql/src/test/resources/calcite/tests/window/no_grouping2.sqlTest
index 7a579c3fc33..4d78b197e31 100644
--- a/sql/src/test/resources/calcite/tests/window/no_grouping2.sqlTest
+++ b/sql/src/test/resources/calcite/tests/window/no_grouping2.sqlTest
@@ -12,7 +12,7 @@ expectedOperators:
   - type: "window"
     processor:
       type: "framedAgg"
-      frame: { peerType: "ROWS", lowUnbounded: true, lowOffset: 0, 
uppUnbounded: true, uppOffset: 0 }
+      frame: { type: rows }
       aggregations:
         - type: "doubleSum"
           name: "w0"
diff --git a/sql/src/test/resources/calcite/tests/window/rank_handling.sqlTest 
b/sql/src/test/resources/calcite/tests/window/rank_handling.sqlTest
index 0e66ed87460..1e4de22dfca 100644
--- a/sql/src/test/resources/calcite/tests/window/rank_handling.sqlTest
+++ b/sql/src/test/resources/calcite/tests/window/rank_handling.sqlTest
@@ -1,5 +1,8 @@
 type: "operatorValidation"
 
+queryContext:
+  maxSubqueryBytes: 100000
+
 sql: |
   SELECT
       __time
diff --git a/sql/src/test/resources/calcite/tests/window/simpleSum.sqlTest 
b/sql/src/test/resources/calcite/tests/window/simpleSum.sqlTest
index 9ca9f88e850..84bd5ca71af 100644
--- a/sql/src/test/resources/calcite/tests/window/simpleSum.sqlTest
+++ b/sql/src/test/resources/calcite/tests/window/simpleSum.sqlTest
@@ -14,12 +14,9 @@ expectedOperators:
     processor:
       type: "framedAgg"
       frame:
-          peerType: "RANGE"
-          lowUnbounded: true
-          lowOffset: 0
-          uppUnbounded: false
-          uppOffset: 0
-          orderBy: [ {column: "d0", direction: ASC} ]
+          type: groups
+          upperOffset: 0
+          orderByColumns: [ "d0" ]
       aggregations:
         - { type: "longSum", name: "w0", fieldName: "a0" }
 
diff --git a/sql/src/test/resources/calcite/tests/window/virtualColumns.sqlTest 
b/sql/src/test/resources/calcite/tests/window/virtualColumns.sqlTest
index 0a86a691e26..9b5aa6e1365 100644
--- a/sql/src/test/resources/calcite/tests/window/virtualColumns.sqlTest
+++ b/sql/src/test/resources/calcite/tests/window/virtualColumns.sqlTest
@@ -19,7 +19,7 @@ expectedOperators:
   - type: "window"
     processor:
       type: "framedAgg"
-      frame: { peerType: "ROWS", lowUnbounded: true, lowOffset: 0, 
uppUnbounded: true, uppOffset: 0 }
+      frame: { type: "rows" }
       aggregations:
         - { type: "doubleMin", name: "w0", fieldName: "_v0" }
         - { type: "longMin", name: "w1", fieldName: "v1" }
diff --git 
a/sql/src/test/resources/calcite/tests/window/wikipediaAggregationsMultipleOrdering.sqlTest
 
b/sql/src/test/resources/calcite/tests/window/wikipediaAggregationsMultipleOrdering.sqlTest
index cc59868482a..b4ef8006ea9 100644
--- 
a/sql/src/test/resources/calcite/tests/window/wikipediaAggregationsMultipleOrdering.sqlTest
+++ 
b/sql/src/test/resources/calcite/tests/window/wikipediaAggregationsMultipleOrdering.sqlTest
@@ -16,12 +16,9 @@ expectedOperators:
     processor:
       type: "framedAgg"
       frame:
-        peerType: "ROWS"
-        lowUnbounded: false
-        lowOffset: -3
-        uppUnbounded: false
-        uppOffset: 2
-        orderBy: null
+        type: rows
+        lowerOffset: -3
+        upperOffset: 2
       aggregations:
         - { type: "longSum", name: "w0", fieldName: "a0" }
   - { type: "naiveSort", columns: [ { column: "d1", direction: "ASC" }, { 
column: "a0", direction: "ASC"} ]}
diff --git 
a/sql/src/test/resources/calcite/tests/window/wikipediaCumulativeOrdered.sqlTest
 
b/sql/src/test/resources/calcite/tests/window/wikipediaCumulativeOrdered.sqlTest
index 9368f00e9b4..ebcc060eaa5 100644
--- 
a/sql/src/test/resources/calcite/tests/window/wikipediaCumulativeOrdered.sqlTest
+++ 
b/sql/src/test/resources/calcite/tests/window/wikipediaCumulativeOrdered.sqlTest
@@ -39,14 +39,9 @@ expectedOperators:
         - { "type": "cumeDist", "group": [ "a0" ], "outputColumn": "w9" }
         - type: "framedAgg"
           frame:
-            peerType: "RANGE"
-            lowUnbounded: true
-            lowOffset: 0
-            uppUnbounded: false
-            uppOffset: 0
-            orderBy:
-              - column: a0
-                direction: ASC
+            type: groups
+            upperOffset: 0
+            orderByColumns: [ a0 ]
           aggregations:
             - { "type": "longSum", "name": "w0", "fieldName": "a0" }
 
diff --git 
a/sql/src/test/resources/calcite/tests/window/wikipediaFramedAggregations.sqlTest
 
b/sql/src/test/resources/calcite/tests/window/wikipediaFramedAggregations.sqlTest
index c25f1ff0352..87873d44c48 100644
--- 
a/sql/src/test/resources/calcite/tests/window/wikipediaFramedAggregations.sqlTest
+++ 
b/sql/src/test/resources/calcite/tests/window/wikipediaFramedAggregations.sqlTest
@@ -15,11 +15,9 @@ expectedOperators:
     processor:
       type: "framedAgg"
       frame:
-          peerType: "ROWS"
-          lowUnbounded: false
-          lowOffset: -3
-          uppUnbounded: false
-          uppOffset: 2
+          type: "rows"
+          lowerOffset: -3
+          upperOffset: 2
           orderBy: null
       aggregations:
         - { type: "longSum", name: "w0", fieldName: "a0" }
diff --git 
a/sql/src/test/resources/calcite/tests/window/wikipediaSimplePartition.sqlTest 
b/sql/src/test/resources/calcite/tests/window/wikipediaSimplePartition.sqlTest
index 1e75e69b97b..3843519aa79 100644
--- 
a/sql/src/test/resources/calcite/tests/window/wikipediaSimplePartition.sqlTest
+++ 
b/sql/src/test/resources/calcite/tests/window/wikipediaSimplePartition.sqlTest
@@ -19,7 +19,7 @@ expectedOperators:
   - type: "window"
     processor:
       type: "framedAgg"
-      frame: { peerType: "ROWS", lowUnbounded: true, lowOffset: 0, 
uppUnbounded: true, uppOffset: 0 }
+      frame: { type: "rows" }
       aggregations:
         - { "type": "longSum", "name": "w0", "fieldName": "a0" }
   - type: "window"
diff --git 
a/sql/src/test/resources/calcite/tests/window/wikipediaSimplePartitionInitialSort.sqlTest
 
b/sql/src/test/resources/calcite/tests/window/wikipediaSimplePartitionInitialSort.sqlTest
index d310f6a8f1c..4939057621e 100644
--- 
a/sql/src/test/resources/calcite/tests/window/wikipediaSimplePartitionInitialSort.sqlTest
+++ 
b/sql/src/test/resources/calcite/tests/window/wikipediaSimplePartitionInitialSort.sqlTest
@@ -28,7 +28,7 @@ expectedOperators:
         - { "type": "last", "inputColumn": "a0", "outputColumn": "w2" }
         - { "type": "percentile", "outputColumn": "w3", "numBuckets": 3 }
         - type: "framedAgg"
-          frame: { peerType: "ROWS", lowUnbounded: true, lowOffset: 0, 
uppUnbounded: true, uppOffset: 0 }
+          frame: { type: "rows" }
           aggregations:
             - { "type": "longSum", "name": "w0", "fieldName": "a0" }
   - type: "window"
diff --git 
a/sql/src/test/resources/calcite/tests/window/windowed_long_null.sqlTest 
b/sql/src/test/resources/calcite/tests/window/windowed_long_null.sqlTest
index c96b979c0da..7c7fd03c3c8 100644
--- a/sql/src/test/resources/calcite/tests/window/windowed_long_null.sqlTest
+++ b/sql/src/test/resources/calcite/tests/window/windowed_long_null.sqlTest
@@ -19,7 +19,7 @@ expectedOperators:
   - type: "window"
     processor:
       type: "framedAgg"
-      frame: { peerType: "ROWS", lowUnbounded: true, lowOffset: 0, 
uppUnbounded: true, uppOffset: 0 }
+      frame: { type: rows }
       aggregations:
         - { type: "longMin", name: "w0", fieldName: "l2" }
   - type: "naiveSort"
@@ -31,7 +31,7 @@ expectedOperators:
   - type: "window"
     processor:
       type: "framedAgg"
-      frame: { peerType: "RANGE", lowUnbounded: true, lowOffset: 0, 
uppUnbounded: false, uppOffset: 0, orderBy: [{ column: l1, direction: ASC }] }
+      frame: { type: groups, upperOffset: 0, orderByColumns: [ l1 ] }
       aggregations:
         - { type: "longMin", name: "w1", fieldName: "l2" }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to