kgyrtkirk commented on code in PR #15365:
URL: https://github.com/apache/druid/pull/15365#discussion_r1400681001
##########
processing/src/main/java/org/apache/druid/query/operator/window/WindowFrame.java:
##########
@@ -123,6 +149,36 @@ public String toString()
", lowerOffset=" + lowerOffset +
", upperUnbounded=" + upperUnbounded +
", upperOffset=" + upperOffset +
+ ", orderBy=" + orderBy +
'}';
}
+
+ public static WindowFrame forOrderBy(ColumnWithDirection... orderBy)
+ {
+ return new WindowFrame(PeerType.RANGE, true, 0, false, 0,
Lists.newArrayList(orderBy));
+ }
+
+ public List<String> getOrderByColNames()
+ {
+ if (orderBy == null) {
+ return Collections.emptyList();
+ }
+ return
orderBy.stream().map(ColumnWithDirection::getColumn).collect(Collectors.toList());
+ }
+
+ public int getLowerOffsetClamped(int maxValue)
+ {
+ if (lowerUnbounded) {
+ return maxValue;
+ }
Review Comment:
`maxValue` should be renamed as `maxRows` or something....
If we know there will be at most `n` rows; then the unbounded is technically
`n`
##########
processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java:
##########
@@ -85,10 +88,234 @@ public RowsAndColumns aggregateAll(
}
}
} else {
- throw new UOE("RANGE peer groupings are unsupported");
+ return computeRangeAggregates(aggFactories, frame);
+ }
+ }
+
+ private RowsAndColumns computeRangeAggregates(
+ AggregatorFactory[] aggFactories,
+ WindowFrame frame)
+ {
+ RangeIteratorForWindow iter = new RangeIteratorForWindow(rac, frame);
+
+ int numRows = rac.numRows();
+ Object[][] results = new Object[aggFactories.length][numRows];
+
+ AggCell cell = new AggCell(rac, aggFactories);
+
+ for (Range xRange : iter) {
+
+ cell.moveTo(xRange.inputRows);
+ // TODO: if(xRange.outputRows.a ==0 && xRange.outputRows.b == numRows) {
return Const };
+
+ // note: would be better with results.setX()?
+ cell.setOutputs(results, xRange.outputRows);
}
+ return makeReturnRAC(aggFactories, results);
}
+ static class RangeIteratorForWindow implements Iterable<Range>
+ {
+ private final int[] rangeToRowId;
+ private final int numRows;
+ private final int numRanges;
+ private final int lowerOffset;
+ private final int upperOffset;
+
+ public RangeIteratorForWindow(AppendableRowsAndColumns rac, WindowFrame
frame)
+ {
+ assert (frame.getPeerType() == PeerType.RANGE);
+ rangeToRowId =
ClusteredGroupPartitioner.fromRAC(rac).computeBoundaries(frame.getOrderByColNames());
+ numRows = rac.numRows();
+ numRanges = rangeToRowId.length - 1;
+ lowerOffset = frame.getLowerOffsetClamped(numRanges);
+ upperOffset = frame.getUpperOffsetClamped(numRanges) + 1;
+ }
+
+ @Override
+ public Iterator<Range> iterator()
+ {
+ return new Iterator<Range>()
+ {
+ int currentRowIndex = 0;
+ int currentRangeIndex = 0;
+
+ @Override
+ public boolean hasNext()
+ {
+ return currentRowIndex < numRows;
+ }
+
+ @Override
+ public Range next()
+ {
+ if (!hasNext()) {
+ throw new IllegalStateException();
+ }
+ // TODO: invert listing order at the end to get benefits of
incremenental aggregations
Review Comment:
TODO / FIXME and friends appear in the Tasks view; which could be used as a
navigation aid; I use it to navigate to place I want to decide what to do.
I wanted to make a note that mentioned optimization is within pretty close
reach - so I might add it in this PR.
But I agree after the patch is comitted it might never get fixed - and an
issue tracker should be used.
Maybe some static check should ensure that no TODO / FIXME is added? That's
kinda the reason I see that the distinction between a draft and a normal pr
might be usefull; a `TODO` should not make the PR checks fail...but that would
be the case if I would add a checkstyle rule...
but I've went a little far :D I'll most likely address this before finishing
with it - or create a ticket :)
##########
processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java:
##########
@@ -85,10 +88,234 @@ public RowsAndColumns aggregateAll(
}
}
} else {
- throw new UOE("RANGE peer groupings are unsupported");
+ return computeRangeAggregates(aggFactories, frame);
+ }
+ }
+
+ private RowsAndColumns computeRangeAggregates(
+ AggregatorFactory[] aggFactories,
+ WindowFrame frame)
+ {
+ RangeIteratorForWindow iter = new RangeIteratorForWindow(rac, frame);
+
+ int numRows = rac.numRows();
+ Object[][] results = new Object[aggFactories.length][numRows];
+
+ AggCell cell = new AggCell(rac, aggFactories);
+
+ for (Range xRange : iter) {
+
+ cell.moveTo(xRange.inputRows);
+ // TODO: if(xRange.outputRows.a ==0 && xRange.outputRows.b == numRows) {
return Const };
+
+ // note: would be better with results.setX()?
+ cell.setOutputs(results, xRange.outputRows);
}
+ return makeReturnRAC(aggFactories, results);
}
+ static class RangeIteratorForWindow implements Iterable<Range>
+ {
+ private final int[] rangeToRowId;
+ private final int numRows;
+ private final int numRanges;
+ private final int lowerOffset;
+ private final int upperOffset;
+
+ public RangeIteratorForWindow(AppendableRowsAndColumns rac, WindowFrame
frame)
+ {
+ assert (frame.getPeerType() == PeerType.RANGE);
+ rangeToRowId =
ClusteredGroupPartitioner.fromRAC(rac).computeBoundaries(frame.getOrderByColNames());
+ numRows = rac.numRows();
+ numRanges = rangeToRowId.length - 1;
+ lowerOffset = frame.getLowerOffsetClamped(numRanges);
+ upperOffset = frame.getUpperOffsetClamped(numRanges) + 1;
+ }
+
+ @Override
+ public Iterator<Range> iterator()
+ {
+ return new Iterator<Range>()
+ {
+ int currentRowIndex = 0;
+ int currentRangeIndex = 0;
+
+ @Override
+ public boolean hasNext()
+ {
+ return currentRowIndex < numRows;
+ }
+
+ @Override
+ public Range next()
+ {
+ if (!hasNext()) {
+ throw new IllegalStateException();
+ }
+ // TODO: invert listing order at the end to get benefits of
incremenental aggregations
+ Range r = new Range(
+ Interval.of(
+ rangeToRowIndex(relativeRangeId(0)),
+ rangeToRowIndex(relativeRangeId(1))
+ ),
+ Interval.of(
+ rangeToRowIndex(relativeRangeId(-lowerOffset)),
+ rangeToRowIndex(relativeRangeId(upperOffset))
+ )
+ );
+
+ currentRowIndex = rangeToRowIndex(currentRangeIndex + 1);
+ currentRangeIndex++;
+ return r;
+ }
+
+ private int rangeToRowIndex(int rangeId)
+ {
+ return rangeToRowId[rangeId];
+ }
+
+ private int relativeRangeId(int rangeOffset)
+ {
+ int rangeId = currentRangeIndex + rangeOffset;
+ if (rangeId < 0) {
+ return 0;
+ }
+ if (rangeId >= numRanges) {
+ return numRanges;
+ }
+ return rangeId;
+ }
+ };
+ }
+ }
+
+
+ /**
+ * Basic [a,b) interval; left inclusive/right exclusive.
+ */
+ static class Interval implements Iterable<Integer>
+ {
+ int a;
+ int b;
+
+ public static Interval of(int u, int v)
+ {
+ return new Interval(u, v);
+ }
+
+ public Interval(int u, int v)
+ {
+ this.a = u;
+ this.b = v;
+ }
+
+ @Override
+ public Iterator<Integer> iterator()
+ {
+ return new Iterator<Integer>()
+ {
+ int current = a;
+
+ @Override
+ public Integer next()
+ {
+ if (!hasNext()) {
+ throw new IllegalStateException();
+ }
+ return current++;
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ return current < b;
+ }
+ };
+
+ }
+ }
+
+ /**
+ * Represents a range between U (inclusive) and V (exclusive).
Review Comment:
outdate apidoc :facepalm:
##########
processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java:
##########
@@ -85,10 +88,234 @@ public RowsAndColumns aggregateAll(
}
}
} else {
- throw new UOE("RANGE peer groupings are unsupported");
+ return computeRangeAggregates(aggFactories, frame);
+ }
+ }
+
+ private RowsAndColumns computeRangeAggregates(
+ AggregatorFactory[] aggFactories,
+ WindowFrame frame)
+ {
+ RangeIteratorForWindow iter = new RangeIteratorForWindow(rac, frame);
+
+ int numRows = rac.numRows();
+ Object[][] results = new Object[aggFactories.length][numRows];
+
+ AggCell cell = new AggCell(rac, aggFactories);
+
+ for (Range xRange : iter) {
+
+ cell.moveTo(xRange.inputRows);
+ // TODO: if(xRange.outputRows.a ==0 && xRange.outputRows.b == numRows) {
return Const };
+
+ // note: would be better with results.setX()?
+ cell.setOutputs(results, xRange.outputRows);
}
+ return makeReturnRAC(aggFactories, results);
}
+ static class RangeIteratorForWindow implements Iterable<Range>
+ {
+ private final int[] rangeToRowId;
+ private final int numRows;
+ private final int numRanges;
+ private final int lowerOffset;
+ private final int upperOffset;
+
+ public RangeIteratorForWindow(AppendableRowsAndColumns rac, WindowFrame
frame)
+ {
+ assert (frame.getPeerType() == PeerType.RANGE);
+ rangeToRowId =
ClusteredGroupPartitioner.fromRAC(rac).computeBoundaries(frame.getOrderByColNames());
+ numRows = rac.numRows();
+ numRanges = rangeToRowId.length - 1;
+ lowerOffset = frame.getLowerOffsetClamped(numRanges);
+ upperOffset = frame.getUpperOffsetClamped(numRanges) + 1;
+ }
+
+ @Override
+ public Iterator<Range> iterator()
+ {
+ return new Iterator<Range>()
+ {
+ int currentRowIndex = 0;
+ int currentRangeIndex = 0;
+
+ @Override
+ public boolean hasNext()
+ {
+ return currentRowIndex < numRows;
+ }
+
+ @Override
+ public Range next()
+ {
+ if (!hasNext()) {
+ throw new IllegalStateException();
+ }
+ // TODO: invert listing order at the end to get benefits of
incremenental aggregations
+ Range r = new Range(
+ Interval.of(
+ rangeToRowIndex(relativeRangeId(0)),
+ rangeToRowIndex(relativeRangeId(1))
+ ),
+ Interval.of(
+ rangeToRowIndex(relativeRangeId(-lowerOffset)),
+ rangeToRowIndex(relativeRangeId(upperOffset))
+ )
+ );
+
+ currentRowIndex = rangeToRowIndex(currentRangeIndex + 1);
+ currentRangeIndex++;
+ return r;
+ }
+
+ private int rangeToRowIndex(int rangeId)
+ {
+ return rangeToRowId[rangeId];
+ }
+
+ private int relativeRangeId(int rangeOffset)
+ {
+ int rangeId = currentRangeIndex + rangeOffset;
+ if (rangeId < 0) {
+ return 0;
+ }
+ if (rangeId >= numRanges) {
+ return numRanges;
+ }
+ return rangeId;
+ }
+ };
+ }
+ }
+
+
+ /**
+ * Basic [a,b) interval; left inclusive/right exclusive.
+ */
+ static class Interval implements Iterable<Integer>
+ {
+ int a;
+ int b;
+
+ public static Interval of(int u, int v)
+ {
+ return new Interval(u, v);
+ }
+
+ public Interval(int u, int v)
+ {
+ this.a = u;
+ this.b = v;
+ }
+
+ @Override
+ public Iterator<Integer> iterator()
+ {
+ return new Iterator<Integer>()
+ {
+ int current = a;
+
+ @Override
+ public Integer next()
+ {
+ if (!hasNext()) {
+ throw new IllegalStateException();
+ }
+ return current++;
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ return current < b;
+ }
+ };
+
+ }
+ }
+
+ /**
+ * Represents a range between U (inclusive) and V (exclusive).
+ */
+ static class Range
+ {
+ Interval outputRows;
+ Interval inputRows;
+
+ public Range(Interval outputRows, Interval inputRows)
+ {
+ this.outputRows = outputRows;
+ this.inputRows = inputRows;
+ }
+ }
+
+ static class AggCell
+ {
+ private AggregatorFactory[] aggFactories;
+ Interval currentRows = new Interval(0, 0);
+ private final AtomicInteger rowIdProvider;
+ private final ColumnSelectorFactory columnSelectorFactory;
+
+ private final Aggregator[] aggregators;
+
+
+ AggCell(AppendableRowsAndColumns rac, AggregatorFactory[] aggFactories)
+ {
+ this.aggFactories = aggFactories;
+ aggregators = new Aggregator[aggFactories.length];
+
+ rowIdProvider = new AtomicInteger(0);
+ columnSelectorFactory =
ColumnSelectorFactoryMaker.fromRAC(rac).make(rowIdProvider);
+
+ newAggregators();
+ }
+
+ private void newAggregators()
+ {
+ for (int i = 0; i < aggFactories.length; i++) {
+ aggregators[i] = aggFactories[i].factorize(columnSelectorFactory);
+ }
+ }
+
+ /**
+ * Reposition aggregation window to reflect the given rows.
+ */
+ public void moveTo(Interval newRows)
+ {
+ // incremental addition of additional values
+ if (currentRows.a == newRows.a && currentRows.b < newRows.b) {
+ for (int i = currentRows.b; i < newRows.b; i++) {
+ aggregate(i);
+ }
+ currentRows = newRows;
+ return;
+ }
+ newAggregators();
+ for (int i : newRows) {
+ aggregate(i);
+ }
+ currentRows = newRows;
+ }
+
+ public void setOutputs(Object[][] results, Interval outputRows)
+ {
+ for (int aggIdx = 0; aggIdx < aggFactories.length; aggIdx++) {
+ Object aggValue = aggregators[aggIdx].get();
+ for (int rowIdx = outputRows.a; rowIdx < outputRows.b; rowIdx++) {
+ results[aggIdx][rowIdx] = aggValue;
+ }
Review Comment:
Yes; I was also thinking about this - but all the aggregates I've taken a
look have either:
* used a primitive type (`max` / etc)
* created some more complicated object
* done a copy(!) during return
[HllSketchBuildAggregator](https://github.com/apache/druid/blob/dff5bcb0a6c693e6fc8131e91c5029dbab132fa1/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregator.java#L64)
* ...but I've found at least one which was exposing its internals
[Kll](https://github.com/apache/druid/blob/dff5bcb0a6c693e6fc8131e91c5029dbab132fa1/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllSketchBuildAggregator.java#L42)
I think instead of introducing a complicated copy mechanisms possible
alternatives could be:
* either add to the documentation that `get()` in under any circumstances
**may not return internal parts of the `Aggregator`** ; and possibly cover it
with a test - so that existing stuff lives up to that contract
* `get()` is not documented [at all](https://github.com/apache/druid
/blob/dff5bcb0a6c693e6fc8131e91c5029dbab132fa1/processing/src/main/java/org/apache/druid/query/aggregation/Aggregator.java#L67)
* introduce a marker interface like `GetIsSafe` ; mark good
aggregators...and throw an exception here stating that the aggregator is
not-good-enough if its not present
##########
processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java:
##########
@@ -85,10 +88,234 @@ public RowsAndColumns aggregateAll(
}
}
} else {
- throw new UOE("RANGE peer groupings are unsupported");
+ return computeRangeAggregates(aggFactories, frame);
+ }
+ }
+
+ private RowsAndColumns computeRangeAggregates(
+ AggregatorFactory[] aggFactories,
+ WindowFrame frame)
+ {
+ RangeIteratorForWindow iter = new RangeIteratorForWindow(rac, frame);
+
+ int numRows = rac.numRows();
+ Object[][] results = new Object[aggFactories.length][numRows];
+
+ AggCell cell = new AggCell(rac, aggFactories);
+
+ for (Range xRange : iter) {
+
+ cell.moveTo(xRange.inputRows);
+ // TODO: if(xRange.outputRows.a ==0 && xRange.outputRows.b == numRows) {
return Const };
+
+ // note: would be better with results.setX()?
+ cell.setOutputs(results, xRange.outputRows);
}
+ return makeReturnRAC(aggFactories, results);
}
+ static class RangeIteratorForWindow implements Iterable<Range>
+ {
+ private final int[] rangeToRowId;
+ private final int numRows;
+ private final int numRanges;
+ private final int lowerOffset;
+ private final int upperOffset;
+
+ public RangeIteratorForWindow(AppendableRowsAndColumns rac, WindowFrame
frame)
+ {
+ assert (frame.getPeerType() == PeerType.RANGE);
+ rangeToRowId =
ClusteredGroupPartitioner.fromRAC(rac).computeBoundaries(frame.getOrderByColNames());
+ numRows = rac.numRows();
+ numRanges = rangeToRowId.length - 1;
+ lowerOffset = frame.getLowerOffsetClamped(numRanges);
+ upperOffset = frame.getUpperOffsetClamped(numRanges) + 1;
+ }
+
+ @Override
+ public Iterator<Range> iterator()
+ {
+ return new Iterator<Range>()
+ {
+ int currentRowIndex = 0;
+ int currentRangeIndex = 0;
+
+ @Override
+ public boolean hasNext()
+ {
+ return currentRowIndex < numRows;
+ }
+
+ @Override
+ public Range next()
+ {
+ if (!hasNext()) {
+ throw new IllegalStateException();
+ }
+ // TODO: invert listing order at the end to get benefits of
incremenental aggregations
+ Range r = new Range(
+ Interval.of(
+ rangeToRowIndex(relativeRangeId(0)),
+ rangeToRowIndex(relativeRangeId(1))
+ ),
+ Interval.of(
+ rangeToRowIndex(relativeRangeId(-lowerOffset)),
+ rangeToRowIndex(relativeRangeId(upperOffset))
+ )
+ );
+
+ currentRowIndex = rangeToRowIndex(currentRangeIndex + 1);
+ currentRangeIndex++;
+ return r;
+ }
+
+ private int rangeToRowIndex(int rangeId)
+ {
+ return rangeToRowId[rangeId];
+ }
+
+ private int relativeRangeId(int rangeOffset)
+ {
+ int rangeId = currentRangeIndex + rangeOffset;
+ if (rangeId < 0) {
+ return 0;
+ }
+ if (rangeId >= numRanges) {
+ return numRanges;
+ }
+ return rangeId;
+ }
+ };
+ }
+ }
+
+
+ /**
+ * Basic [a,b) interval; left inclusive/right exclusive.
+ */
+ static class Interval implements Iterable<Integer>
+ {
+ int a;
+ int b;
+
+ public static Interval of(int u, int v)
+ {
+ return new Interval(u, v);
+ }
+
+ public Interval(int u, int v)
+ {
+ this.a = u;
+ this.b = v;
+ }
+
+ @Override
+ public Iterator<Integer> iterator()
+ {
+ return new Iterator<Integer>()
+ {
+ int current = a;
+
+ @Override
+ public Integer next()
+ {
+ if (!hasNext()) {
+ throw new IllegalStateException();
+ }
+ return current++;
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ return current < b;
+ }
+ };
+
+ }
+ }
+
+ /**
+ * Represents a range between U (inclusive) and V (exclusive).
+ */
+ static class Range
+ {
+ Interval outputRows;
+ Interval inputRows;
+
+ public Range(Interval outputRows, Interval inputRows)
+ {
+ this.outputRows = outputRows;
+ this.inputRows = inputRows;
+ }
+ }
+
+ static class AggCell
+ {
+ private AggregatorFactory[] aggFactories;
+ Interval currentRows = new Interval(0, 0);
+ private final AtomicInteger rowIdProvider;
+ private final ColumnSelectorFactory columnSelectorFactory;
+
+ private final Aggregator[] aggregators;
+
+
+ AggCell(AppendableRowsAndColumns rac, AggregatorFactory[] aggFactories)
Review Comment:
this class has a contract to be able to aggregate stuff on its own - for
that its a sideeffect that it needs to `own` a `rowIdProvider`
I don't want to give external control to that; as if that would be
accessible someone might use it; and/or expect it to be at the same point after
calling a method on this class...
##########
processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/FramedOnHeapAggregatableTest.java:
##########
@@ -456,4 +459,140 @@ public void testReverseCumulativeAggregation()
.allColumnsRegistered()
.validate(results);
}
+
+
+
+ @Test
+ public void testRangeOrderBy()
+ {
+ WindowFrame frame =
WindowFrame.forOrderBy(ColumnWithDirection.ascending("c1"));
+ int[] c1Vals = new int[] {0, 0, 0, 1, 1, 1, 2, 2, 2, 2};
+ int[] c2Vals = new int[] {1, 1, 2, 1, 1, 2, 1, 1, 1, 2};
+ int[] resVals = new int[] {4, 4, 4, 8, 8, 8, 13, 13, 13, 13};
+
+ simpleWindowingTest(frame, c1Vals, c2Vals, resVals);
+ }
+
+ @Test
+ public void testRangeB1()
+ {
+ WindowFrame frame = new WindowFrame(
+ PeerType.RANGE,
+ false,
+ 1,
+ false,
+ 0,
+ Collections.singletonList(ColumnWithDirection.ascending("c1"))
+ );
+
+ int[] c1Vals = new int[] {0, 1, 2, 2, 3, 4, 5};
+ int[] c2Vals = new int[] {0, 1, 1, 1, 3, 4, 5};
+ int[] resVals = new int[] {0, 1, 3, 3, 5, 7, 9};
+
+ simpleWindowingTest(frame, c1Vals, c2Vals, resVals);
+ }
+
+ @Test
+ public void testRangeA1()
+ {
+ WindowFrame frame = new WindowFrame(
+ PeerType.RANGE,
+ false,
+ 0,
+ false,
+ 1,
+ Collections.singletonList(ColumnWithDirection.ascending("c1"))
+ );
+
+ int[] c1Vals = new int[] {0, 1, 2, 2, 3, 4, 5};
+ int[] c2Vals = new int[] {0, 1, 1, 1, 3, 4, 5};
+ int[] resVals = new int[] {1, 3, 5, 5, 7, 9, 5};
+
+ simpleWindowingTest(frame, c1Vals, c2Vals, resVals);
+ }
+
+ @Test
+ public void testRangeB1A1()
+ {
+ WindowFrame frame = new WindowFrame(
+ PeerType.RANGE,
+ false,
+ 1,
+ false,
+ 1,
+ Collections.singletonList(ColumnWithDirection.ascending("c1"))
+ );
+
+ int[] c1Vals = new int[] {0, 1, 2, 3, 4, 5};
+ int[] c2Vals = new int[] {0, 1, 2, 3, 4, 5};
+ int[] resVals = new int[] {1, 3, 6, 9, 12, 9};
+
+ simpleWindowingTest(frame, c1Vals, c2Vals, resVals);
+ }
+
+
+ @Test
+ public void testRangeB1A1_2()
+ {
+ WindowFrame frame = new WindowFrame(
+ PeerType.RANGE,
+ false,
+ 1,
+ false,
+ 1,
+ Collections.singletonList(ColumnWithDirection.ascending("c1"))
+ );
+
+ int[] c1Vals = new int[] {0, 0, 1, 2, 3, 3, 4, 4, 5};
+ int[] c2Vals = new int[] {0, 0, 1, 2, 2, 1, 2, 2, 5};
+ int[] resVals = new int[] {1, 1, 3, 6, 9, 9, 12, 12, 9};
+
+ simpleWindowingTest(frame, c1Vals, c2Vals, resVals);
+ }
+
+ @Test
+ public void testRangeB1A2()
+ {
+ WindowFrame frame = new WindowFrame(
+ PeerType.RANGE,
+ false,
+ 1,
+ false,
+ 2,
+ Collections.singletonList(ColumnWithDirection.ascending("c1"))
+ );
+
+ int[] c1Vals = new int[] {0, 0, 0, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3};
+ int[] c2Vals = new int[] {1, 1, 2, 1, 1, 2, 1, 1, 1, 2, 1, 1, 1};
+ int[] resVals = new int[] {13, 13, 13, 16, 16, 16, 12, 12, 12, 12, 8, 8,
8};
+
+ simpleWindowingTest(frame, c1Vals, c2Vals, resVals);
+ }
+
+
+ private void simpleWindowingTest(WindowFrame frame, int[] c1Vals, int[]
c2Vals, int[] resVals)
+ {
+ Map<String, Column> map = new LinkedHashMap<>();
+ map.put("c1", new IntArrayColumn(c1Vals));
+ map.put("c2", new IntArrayColumn(c2Vals));
+
+ RowsAndColumns rac = make(MapOfColumnsRowsAndColumns.fromMap(map));
+
+ FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);
+
+ final RowsAndColumns results = agger.aggregateAll(
+ frame,
+ new AggregatorFactory[] {
+ new LongSumAggregatorFactory("res", "c2")
+ }
+ );
+
+ new RowsAndColumnsHelper()
+ .expectColumn("c1", c1Vals)
+ .expectColumn("c2", c2Vals)
+ .expectColumn("res", resVals)
+ .allColumnsRegistered()
+ .validate(results);
+ }
Review Comment:
I've added one - and it just passed because it does a `copy` in the
[get()](https://github.com/apache/druid/blob/dff5bcb0a6c693e6fc8131e91c5029dbab132fa1/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregator.java#L64)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]