kgyrtkirk commented on code in PR #15365:
URL: https://github.com/apache/druid/pull/15365#discussion_r1400681001


##########
processing/src/main/java/org/apache/druid/query/operator/window/WindowFrame.java:
##########
@@ -123,6 +149,36 @@ public String toString()
            ", lowerOffset=" + lowerOffset +
            ", upperUnbounded=" + upperUnbounded +
            ", upperOffset=" + upperOffset +
+           ", orderBy=" + orderBy +
            '}';
   }
+
+  public static WindowFrame forOrderBy(ColumnWithDirection... orderBy)
+  {
+    return new WindowFrame(PeerType.RANGE, true, 0, false, 0, 
Lists.newArrayList(orderBy));
+  }
+
+  public List<String> getOrderByColNames()
+  {
+    if (orderBy == null) {
+      return Collections.emptyList();
+    }
+    return 
orderBy.stream().map(ColumnWithDirection::getColumn).collect(Collectors.toList());
+  }
+
+  public int getLowerOffsetClamped(int maxValue)
+  {
+    if (lowerUnbounded) {
+      return maxValue;
+    }

Review Comment:
   `maxValue` should be renamed as `maxRows` or something....
   
   If we know there will be at most `n` rows; then the unbounded is technically 
`n`



##########
processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java:
##########
@@ -85,10 +88,234 @@ public RowsAndColumns aggregateAll(
         }
       }
     } else {
-      throw new UOE("RANGE peer groupings are unsupported");
+      return computeRangeAggregates(aggFactories, frame);
+    }
+  }
+
+  private RowsAndColumns computeRangeAggregates(
+      AggregatorFactory[] aggFactories,
+      WindowFrame frame)
+  {
+    RangeIteratorForWindow iter = new RangeIteratorForWindow(rac, frame);
+
+    int numRows = rac.numRows();
+    Object[][] results = new Object[aggFactories.length][numRows];
+
+    AggCell cell = new AggCell(rac, aggFactories);
+
+    for (Range xRange : iter) {
+
+      cell.moveTo(xRange.inputRows);
+      // TODO: if(xRange.outputRows.a ==0 && xRange.outputRows.b == numRows) { 
return Const };
+
+      // note: would be better with results.setX()?
+      cell.setOutputs(results, xRange.outputRows);
     }
+    return makeReturnRAC(aggFactories, results);
   }
 
+  static class RangeIteratorForWindow implements Iterable<Range>
+  {
+    private final int[] rangeToRowId;
+    private final int numRows;
+    private final int numRanges;
+    private final int lowerOffset;
+    private final int upperOffset;
+
+    public RangeIteratorForWindow(AppendableRowsAndColumns rac, WindowFrame 
frame)
+    {
+      assert (frame.getPeerType() == PeerType.RANGE);
+      rangeToRowId = 
ClusteredGroupPartitioner.fromRAC(rac).computeBoundaries(frame.getOrderByColNames());
+      numRows = rac.numRows();
+      numRanges = rangeToRowId.length - 1;
+      lowerOffset = frame.getLowerOffsetClamped(numRanges);
+      upperOffset = frame.getUpperOffsetClamped(numRanges) + 1;
+    }
+
+    @Override
+    public Iterator<Range> iterator()
+    {
+      return new Iterator<Range>()
+      {
+        int currentRowIndex = 0;
+        int currentRangeIndex = 0;
+
+        @Override
+        public boolean hasNext()
+        {
+          return currentRowIndex < numRows;
+        }
+
+        @Override
+        public Range next()
+        {
+          if (!hasNext()) {
+            throw new IllegalStateException();
+          }
+          // TODO: invert listing order at the end to get benefits of 
incremenental aggregations

Review Comment:
   TODO / FIXME and friends appear in  the Tasks view; which could be used as a 
navigation aid; I use it to navigate to place I want to decide what to do.
   
   I wanted to make a note that mentioned optimization is within pretty close 
reach - so I might add it in this PR.
   But I agree after the patch is comitted it might never get fixed - and an 
issue tracker should be used.
   
   Maybe some static check should ensure that no TODO / FIXME is added? That's 
kinda the reason I see that the distinction between a draft and a normal pr 
might be usefull; a `TODO` should not make the PR checks fail...but that would 
be the case if I would add a checkstyle rule...
   
   but I've went a little far :D I'll most likely address this before finishing 
with it - or create a ticket  :)
   



##########
processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java:
##########
@@ -85,10 +88,234 @@ public RowsAndColumns aggregateAll(
         }
       }
     } else {
-      throw new UOE("RANGE peer groupings are unsupported");
+      return computeRangeAggregates(aggFactories, frame);
+    }
+  }
+
+  private RowsAndColumns computeRangeAggregates(
+      AggregatorFactory[] aggFactories,
+      WindowFrame frame)
+  {
+    RangeIteratorForWindow iter = new RangeIteratorForWindow(rac, frame);
+
+    int numRows = rac.numRows();
+    Object[][] results = new Object[aggFactories.length][numRows];
+
+    AggCell cell = new AggCell(rac, aggFactories);
+
+    for (Range xRange : iter) {
+
+      cell.moveTo(xRange.inputRows);
+      // TODO: if(xRange.outputRows.a ==0 && xRange.outputRows.b == numRows) { 
return Const };
+
+      // note: would be better with results.setX()?
+      cell.setOutputs(results, xRange.outputRows);
     }
+    return makeReturnRAC(aggFactories, results);
   }
 
+  static class RangeIteratorForWindow implements Iterable<Range>
+  {
+    private final int[] rangeToRowId;
+    private final int numRows;
+    private final int numRanges;
+    private final int lowerOffset;
+    private final int upperOffset;
+
+    public RangeIteratorForWindow(AppendableRowsAndColumns rac, WindowFrame 
frame)
+    {
+      assert (frame.getPeerType() == PeerType.RANGE);
+      rangeToRowId = 
ClusteredGroupPartitioner.fromRAC(rac).computeBoundaries(frame.getOrderByColNames());
+      numRows = rac.numRows();
+      numRanges = rangeToRowId.length - 1;
+      lowerOffset = frame.getLowerOffsetClamped(numRanges);
+      upperOffset = frame.getUpperOffsetClamped(numRanges) + 1;
+    }
+
+    @Override
+    public Iterator<Range> iterator()
+    {
+      return new Iterator<Range>()
+      {
+        int currentRowIndex = 0;
+        int currentRangeIndex = 0;
+
+        @Override
+        public boolean hasNext()
+        {
+          return currentRowIndex < numRows;
+        }
+
+        @Override
+        public Range next()
+        {
+          if (!hasNext()) {
+            throw new IllegalStateException();
+          }
+          // TODO: invert listing order at the end to get benefits of 
incremenental aggregations
+          Range r = new Range(
+              Interval.of(
+                  rangeToRowIndex(relativeRangeId(0)),
+                  rangeToRowIndex(relativeRangeId(1))
+              ),
+              Interval.of(
+                  rangeToRowIndex(relativeRangeId(-lowerOffset)),
+                  rangeToRowIndex(relativeRangeId(upperOffset))
+              )
+          );
+
+          currentRowIndex = rangeToRowIndex(currentRangeIndex + 1);
+          currentRangeIndex++;
+          return r;
+        }
+
+        private int rangeToRowIndex(int rangeId)
+        {
+          return rangeToRowId[rangeId];
+        }
+
+        private int relativeRangeId(int rangeOffset)
+        {
+          int rangeId = currentRangeIndex + rangeOffset;
+          if (rangeId < 0) {
+            return 0;
+          }
+          if (rangeId >= numRanges) {
+            return numRanges;
+          }
+          return rangeId;
+        }
+      };
+    }
+  }
+
+
+  /**
+   * Basic [a,b) interval; left inclusive/right exclusive.
+   */
+  static class Interval implements Iterable<Integer>
+  {
+    int a;
+    int b;
+
+    public static Interval of(int u, int v)
+    {
+      return new Interval(u, v);
+    }
+
+    public Interval(int u, int v)
+    {
+      this.a = u;
+      this.b = v;
+    }
+
+    @Override
+    public Iterator<Integer> iterator()
+    {
+      return new Iterator<Integer>()
+      {
+        int current = a;
+
+        @Override
+        public Integer next()
+        {
+          if (!hasNext()) {
+            throw new IllegalStateException();
+          }
+          return current++;
+        }
+
+        @Override
+        public boolean hasNext()
+        {
+          return current < b;
+        }
+      };
+
+    }
+  }
+
+  /**
+   * Represents a range between U (inclusive) and V (exclusive).

Review Comment:
   outdate apidoc :facepalm: 



##########
processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java:
##########
@@ -85,10 +88,234 @@ public RowsAndColumns aggregateAll(
         }
       }
     } else {
-      throw new UOE("RANGE peer groupings are unsupported");
+      return computeRangeAggregates(aggFactories, frame);
+    }
+  }
+
+  private RowsAndColumns computeRangeAggregates(
+      AggregatorFactory[] aggFactories,
+      WindowFrame frame)
+  {
+    RangeIteratorForWindow iter = new RangeIteratorForWindow(rac, frame);
+
+    int numRows = rac.numRows();
+    Object[][] results = new Object[aggFactories.length][numRows];
+
+    AggCell cell = new AggCell(rac, aggFactories);
+
+    for (Range xRange : iter) {
+
+      cell.moveTo(xRange.inputRows);
+      // TODO: if(xRange.outputRows.a ==0 && xRange.outputRows.b == numRows) { 
return Const };
+
+      // note: would be better with results.setX()?
+      cell.setOutputs(results, xRange.outputRows);
     }
+    return makeReturnRAC(aggFactories, results);
   }
 
+  static class RangeIteratorForWindow implements Iterable<Range>
+  {
+    private final int[] rangeToRowId;
+    private final int numRows;
+    private final int numRanges;
+    private final int lowerOffset;
+    private final int upperOffset;
+
+    public RangeIteratorForWindow(AppendableRowsAndColumns rac, WindowFrame 
frame)
+    {
+      assert (frame.getPeerType() == PeerType.RANGE);
+      rangeToRowId = 
ClusteredGroupPartitioner.fromRAC(rac).computeBoundaries(frame.getOrderByColNames());
+      numRows = rac.numRows();
+      numRanges = rangeToRowId.length - 1;
+      lowerOffset = frame.getLowerOffsetClamped(numRanges);
+      upperOffset = frame.getUpperOffsetClamped(numRanges) + 1;
+    }
+
+    @Override
+    public Iterator<Range> iterator()
+    {
+      return new Iterator<Range>()
+      {
+        int currentRowIndex = 0;
+        int currentRangeIndex = 0;
+
+        @Override
+        public boolean hasNext()
+        {
+          return currentRowIndex < numRows;
+        }
+
+        @Override
+        public Range next()
+        {
+          if (!hasNext()) {
+            throw new IllegalStateException();
+          }
+          // TODO: invert listing order at the end to get benefits of 
incremenental aggregations
+          Range r = new Range(
+              Interval.of(
+                  rangeToRowIndex(relativeRangeId(0)),
+                  rangeToRowIndex(relativeRangeId(1))
+              ),
+              Interval.of(
+                  rangeToRowIndex(relativeRangeId(-lowerOffset)),
+                  rangeToRowIndex(relativeRangeId(upperOffset))
+              )
+          );
+
+          currentRowIndex = rangeToRowIndex(currentRangeIndex + 1);
+          currentRangeIndex++;
+          return r;
+        }
+
+        private int rangeToRowIndex(int rangeId)
+        {
+          return rangeToRowId[rangeId];
+        }
+
+        private int relativeRangeId(int rangeOffset)
+        {
+          int rangeId = currentRangeIndex + rangeOffset;
+          if (rangeId < 0) {
+            return 0;
+          }
+          if (rangeId >= numRanges) {
+            return numRanges;
+          }
+          return rangeId;
+        }
+      };
+    }
+  }
+
+
+  /**
+   * Basic [a,b) interval; left inclusive/right exclusive.
+   */
+  static class Interval implements Iterable<Integer>
+  {
+    int a;
+    int b;
+
+    public static Interval of(int u, int v)
+    {
+      return new Interval(u, v);
+    }
+
+    public Interval(int u, int v)
+    {
+      this.a = u;
+      this.b = v;
+    }
+
+    @Override
+    public Iterator<Integer> iterator()
+    {
+      return new Iterator<Integer>()
+      {
+        int current = a;
+
+        @Override
+        public Integer next()
+        {
+          if (!hasNext()) {
+            throw new IllegalStateException();
+          }
+          return current++;
+        }
+
+        @Override
+        public boolean hasNext()
+        {
+          return current < b;
+        }
+      };
+
+    }
+  }
+
+  /**
+   * Represents a range between U (inclusive) and V (exclusive).
+   */
+  static class Range
+  {
+    Interval outputRows;
+    Interval inputRows;
+
+    public Range(Interval outputRows, Interval inputRows)
+    {
+      this.outputRows = outputRows;
+      this.inputRows = inputRows;
+    }
+  }
+
+  static class AggCell
+  {
+    private AggregatorFactory[] aggFactories;
+    Interval currentRows = new Interval(0, 0);
+    private final AtomicInteger rowIdProvider;
+    private final ColumnSelectorFactory columnSelectorFactory;
+
+    private final Aggregator[] aggregators;
+
+
+    AggCell(AppendableRowsAndColumns rac, AggregatorFactory[] aggFactories)
+    {
+      this.aggFactories = aggFactories;
+      aggregators = new Aggregator[aggFactories.length];
+
+      rowIdProvider = new AtomicInteger(0);
+      columnSelectorFactory = 
ColumnSelectorFactoryMaker.fromRAC(rac).make(rowIdProvider);
+
+      newAggregators();
+    }
+
+    private void newAggregators()
+    {
+      for (int i = 0; i < aggFactories.length; i++) {
+        aggregators[i] = aggFactories[i].factorize(columnSelectorFactory);
+      }
+    }
+
+    /**
+     * Reposition aggregation window to reflect the given rows.
+     */
+    public void moveTo(Interval newRows)
+    {
+      // incremental addition of additional values
+      if (currentRows.a == newRows.a && currentRows.b < newRows.b) {
+        for (int i = currentRows.b; i < newRows.b; i++) {
+          aggregate(i);
+        }
+        currentRows = newRows;
+        return;
+      }
+      newAggregators();
+      for (int i : newRows) {
+        aggregate(i);
+      }
+      currentRows = newRows;
+    }
+
+    public void setOutputs(Object[][] results, Interval outputRows)
+    {
+      for (int aggIdx = 0; aggIdx < aggFactories.length; aggIdx++) {
+        Object aggValue = aggregators[aggIdx].get();
+        for (int rowIdx = outputRows.a; rowIdx < outputRows.b; rowIdx++) {
+          results[aggIdx][rowIdx] = aggValue;
+        }

Review Comment:
   Yes; I was also thinking about this - but all the aggregates I've taken a 
look have either:
   * used a primitive type (`max` / etc)
   * created some more complicated object 
   * done a copy(!) during return 
[HllSketchBuildAggregator](https://github.com/apache/druid/blob/dff5bcb0a6c693e6fc8131e91c5029dbab132fa1/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregator.java#L64)
   * ...but I've found at least one which was exposing its internals 
[Kll](https://github.com/apache/druid/blob/dff5bcb0a6c693e6fc8131e91c5029dbab132fa1/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllSketchBuildAggregator.java#L42)
 
   
   I think instead of introducing a complicated copy mechanisms possible 
alternatives could be:
   * either add to the documentation that `get()` in under any circumstances 
**may not return internal parts of the `Aggregator`** ; and possibly cover it 
with a test - so that existing stuff lives up to that contract
      * `get()` is not documented [at all](https://github.com/apache/druid
   
/blob/dff5bcb0a6c693e6fc8131e91c5029dbab132fa1/processing/src/main/java/org/apache/druid/query/aggregation/Aggregator.java#L67)
   * introduce a marker interface like `GetIsSafe` ; mark good 
aggregators...and throw an exception here  stating that the aggregator is 
not-good-enough if its not present
   
   



##########
processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java:
##########
@@ -85,10 +88,234 @@ public RowsAndColumns aggregateAll(
         }
       }
     } else {
-      throw new UOE("RANGE peer groupings are unsupported");
+      return computeRangeAggregates(aggFactories, frame);
+    }
+  }
+
+  private RowsAndColumns computeRangeAggregates(
+      AggregatorFactory[] aggFactories,
+      WindowFrame frame)
+  {
+    RangeIteratorForWindow iter = new RangeIteratorForWindow(rac, frame);
+
+    int numRows = rac.numRows();
+    Object[][] results = new Object[aggFactories.length][numRows];
+
+    AggCell cell = new AggCell(rac, aggFactories);
+
+    for (Range xRange : iter) {
+
+      cell.moveTo(xRange.inputRows);
+      // TODO: if(xRange.outputRows.a ==0 && xRange.outputRows.b == numRows) { 
return Const };
+
+      // note: would be better with results.setX()?
+      cell.setOutputs(results, xRange.outputRows);
     }
+    return makeReturnRAC(aggFactories, results);
   }
 
+  static class RangeIteratorForWindow implements Iterable<Range>
+  {
+    private final int[] rangeToRowId;
+    private final int numRows;
+    private final int numRanges;
+    private final int lowerOffset;
+    private final int upperOffset;
+
+    public RangeIteratorForWindow(AppendableRowsAndColumns rac, WindowFrame 
frame)
+    {
+      assert (frame.getPeerType() == PeerType.RANGE);
+      rangeToRowId = 
ClusteredGroupPartitioner.fromRAC(rac).computeBoundaries(frame.getOrderByColNames());
+      numRows = rac.numRows();
+      numRanges = rangeToRowId.length - 1;
+      lowerOffset = frame.getLowerOffsetClamped(numRanges);
+      upperOffset = frame.getUpperOffsetClamped(numRanges) + 1;
+    }
+
+    @Override
+    public Iterator<Range> iterator()
+    {
+      return new Iterator<Range>()
+      {
+        int currentRowIndex = 0;
+        int currentRangeIndex = 0;
+
+        @Override
+        public boolean hasNext()
+        {
+          return currentRowIndex < numRows;
+        }
+
+        @Override
+        public Range next()
+        {
+          if (!hasNext()) {
+            throw new IllegalStateException();
+          }
+          // TODO: invert listing order at the end to get benefits of 
incremenental aggregations
+          Range r = new Range(
+              Interval.of(
+                  rangeToRowIndex(relativeRangeId(0)),
+                  rangeToRowIndex(relativeRangeId(1))
+              ),
+              Interval.of(
+                  rangeToRowIndex(relativeRangeId(-lowerOffset)),
+                  rangeToRowIndex(relativeRangeId(upperOffset))
+              )
+          );
+
+          currentRowIndex = rangeToRowIndex(currentRangeIndex + 1);
+          currentRangeIndex++;
+          return r;
+        }
+
+        private int rangeToRowIndex(int rangeId)
+        {
+          return rangeToRowId[rangeId];
+        }
+
+        private int relativeRangeId(int rangeOffset)
+        {
+          int rangeId = currentRangeIndex + rangeOffset;
+          if (rangeId < 0) {
+            return 0;
+          }
+          if (rangeId >= numRanges) {
+            return numRanges;
+          }
+          return rangeId;
+        }
+      };
+    }
+  }
+
+
+  /**
+   * Basic [a,b) interval; left inclusive/right exclusive.
+   */
+  static class Interval implements Iterable<Integer>
+  {
+    int a;
+    int b;
+
+    public static Interval of(int u, int v)
+    {
+      return new Interval(u, v);
+    }
+
+    public Interval(int u, int v)
+    {
+      this.a = u;
+      this.b = v;
+    }
+
+    @Override
+    public Iterator<Integer> iterator()
+    {
+      return new Iterator<Integer>()
+      {
+        int current = a;
+
+        @Override
+        public Integer next()
+        {
+          if (!hasNext()) {
+            throw new IllegalStateException();
+          }
+          return current++;
+        }
+
+        @Override
+        public boolean hasNext()
+        {
+          return current < b;
+        }
+      };
+
+    }
+  }
+
+  /**
+   * Represents a range between U (inclusive) and V (exclusive).
+   */
+  static class Range
+  {
+    Interval outputRows;
+    Interval inputRows;
+
+    public Range(Interval outputRows, Interval inputRows)
+    {
+      this.outputRows = outputRows;
+      this.inputRows = inputRows;
+    }
+  }
+
+  static class AggCell
+  {
+    private AggregatorFactory[] aggFactories;
+    Interval currentRows = new Interval(0, 0);
+    private final AtomicInteger rowIdProvider;
+    private final ColumnSelectorFactory columnSelectorFactory;
+
+    private final Aggregator[] aggregators;
+
+
+    AggCell(AppendableRowsAndColumns rac, AggregatorFactory[] aggFactories)

Review Comment:
   this class has a contract to be able to aggregate stuff on its own - for 
that its a sideeffect that it needs to `own`  a `rowIdProvider`
   
   I don't want to give external control to that; as if that would be 
accessible someone might use it; and/or expect it to be at the same point after 
calling a method on this class...
   
   



##########
processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/FramedOnHeapAggregatableTest.java:
##########
@@ -456,4 +459,140 @@ public void testReverseCumulativeAggregation()
         .allColumnsRegistered()
         .validate(results);
   }
+
+
+
+  @Test
+  public void testRangeOrderBy()
+  {
+    WindowFrame frame = 
WindowFrame.forOrderBy(ColumnWithDirection.ascending("c1"));
+    int[] c1Vals = new int[] {0, 0, 0, 1, 1, 1, 2, 2, 2, 2};
+    int[] c2Vals = new int[] {1, 1, 2, 1, 1, 2, 1, 1, 1, 2};
+    int[] resVals = new int[] {4, 4, 4, 8, 8, 8, 13, 13, 13, 13};
+
+    simpleWindowingTest(frame, c1Vals, c2Vals, resVals);
+  }
+
+  @Test
+  public void testRangeB1()
+  {
+    WindowFrame frame = new WindowFrame(
+        PeerType.RANGE,
+        false,
+        1,
+        false,
+        0,
+        Collections.singletonList(ColumnWithDirection.ascending("c1"))
+    );
+
+    int[] c1Vals = new int[] {0, 1, 2, 2, 3, 4, 5};
+    int[] c2Vals = new int[] {0, 1, 1, 1, 3, 4, 5};
+    int[] resVals = new int[] {0, 1, 3, 3, 5, 7, 9};
+
+    simpleWindowingTest(frame, c1Vals, c2Vals, resVals);
+  }
+
+  @Test
+  public void testRangeA1()
+  {
+    WindowFrame frame = new WindowFrame(
+        PeerType.RANGE,
+        false,
+        0,
+        false,
+        1,
+        Collections.singletonList(ColumnWithDirection.ascending("c1"))
+    );
+
+    int[] c1Vals = new int[] {0, 1, 2, 2, 3, 4, 5};
+    int[] c2Vals = new int[] {0, 1, 1, 1, 3, 4, 5};
+    int[] resVals = new int[] {1, 3, 5, 5, 7, 9, 5};
+
+    simpleWindowingTest(frame, c1Vals, c2Vals, resVals);
+  }
+
+  @Test
+  public void testRangeB1A1()
+  {
+    WindowFrame frame = new WindowFrame(
+        PeerType.RANGE,
+        false,
+        1,
+        false,
+        1,
+        Collections.singletonList(ColumnWithDirection.ascending("c1"))
+    );
+
+    int[] c1Vals = new int[] {0, 1, 2, 3, 4, 5};
+    int[] c2Vals = new int[] {0, 1, 2, 3, 4, 5};
+    int[] resVals = new int[] {1, 3, 6, 9, 12, 9};
+
+    simpleWindowingTest(frame, c1Vals, c2Vals, resVals);
+  }
+
+
+  @Test
+  public void testRangeB1A1_2()
+  {
+    WindowFrame frame = new WindowFrame(
+        PeerType.RANGE,
+        false,
+        1,
+        false,
+        1,
+        Collections.singletonList(ColumnWithDirection.ascending("c1"))
+    );
+
+    int[] c1Vals = new int[] {0, 0, 1, 2, 3, 3, 4, 4, 5};
+    int[] c2Vals = new int[] {0, 0, 1, 2, 2, 1, 2, 2, 5};
+    int[] resVals = new int[] {1, 1, 3, 6, 9, 9, 12, 12, 9};
+
+    simpleWindowingTest(frame, c1Vals, c2Vals, resVals);
+  }
+
+  @Test
+  public void testRangeB1A2()
+  {
+    WindowFrame frame = new WindowFrame(
+        PeerType.RANGE,
+        false,
+        1,
+        false,
+        2,
+        Collections.singletonList(ColumnWithDirection.ascending("c1"))
+    );
+
+    int[] c1Vals = new int[] {0, 0, 0, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3};
+    int[] c2Vals = new int[] {1, 1, 2, 1, 1, 2, 1, 1, 1, 2, 1, 1, 1};
+    int[] resVals = new int[] {13, 13, 13, 16, 16, 16, 12, 12, 12, 12, 8, 8, 
8};
+
+    simpleWindowingTest(frame, c1Vals, c2Vals, resVals);
+  }
+
+
+  private void simpleWindowingTest(WindowFrame frame, int[] c1Vals, int[] 
c2Vals, int[] resVals)
+  {
+    Map<String, Column> map = new LinkedHashMap<>();
+    map.put("c1", new IntArrayColumn(c1Vals));
+    map.put("c2", new IntArrayColumn(c2Vals));
+
+    RowsAndColumns rac = make(MapOfColumnsRowsAndColumns.fromMap(map));
+
+    FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);
+
+    final RowsAndColumns results = agger.aggregateAll(
+        frame,
+        new AggregatorFactory[] {
+            new LongSumAggregatorFactory("res", "c2")
+        }
+    );
+
+    new RowsAndColumnsHelper()
+    .expectColumn("c1", c1Vals)
+    .expectColumn("c2", c2Vals)
+    .expectColumn("res", resVals)
+        .allColumnsRegistered()
+        .validate(results);
+  }

Review Comment:
   I've added one - and it just passed because it does a `copy` in the 
[get()](https://github.com/apache/druid/blob/dff5bcb0a6c693e6fc8131e91c5029dbab132fa1/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregator.java#L64)
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to