kgyrtkirk commented on code in PR #15365:
URL: https://github.com/apache/druid/pull/15365#discussion_r1400750724
##########
processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java:
##########
@@ -85,10 +88,234 @@ public RowsAndColumns aggregateAll(
}
}
} else {
- throw new UOE("RANGE peer groupings are unsupported");
+ return computeRangeAggregates(aggFactories, frame);
+ }
+ }
+
+ private RowsAndColumns computeRangeAggregates(
+ AggregatorFactory[] aggFactories,
+ WindowFrame frame)
+ {
+ RangeIteratorForWindow iter = new RangeIteratorForWindow(rac, frame);
+
+ int numRows = rac.numRows();
+ Object[][] results = new Object[aggFactories.length][numRows];
+
+ AggCell cell = new AggCell(rac, aggFactories);
+
+ for (Range xRange : iter) {
+
+ cell.moveTo(xRange.inputRows);
+ // TODO: if(xRange.outputRows.a ==0 && xRange.outputRows.b == numRows) {
return Const };
+
+ // note: would be better with results.setX()?
+ cell.setOutputs(results, xRange.outputRows);
}
+ return makeReturnRAC(aggFactories, results);
}
+ static class RangeIteratorForWindow implements Iterable<Range>
+ {
+ private final int[] rangeToRowId;
+ private final int numRows;
+ private final int numRanges;
+ private final int lowerOffset;
+ private final int upperOffset;
+
+ public RangeIteratorForWindow(AppendableRowsAndColumns rac, WindowFrame
frame)
+ {
+ assert (frame.getPeerType() == PeerType.RANGE);
+ rangeToRowId =
ClusteredGroupPartitioner.fromRAC(rac).computeBoundaries(frame.getOrderByColNames());
+ numRows = rac.numRows();
+ numRanges = rangeToRowId.length - 1;
+ lowerOffset = frame.getLowerOffsetClamped(numRanges);
+ upperOffset = frame.getUpperOffsetClamped(numRanges) + 1;
+ }
+
+ @Override
+ public Iterator<Range> iterator()
+ {
+ return new Iterator<Range>()
+ {
+ int currentRowIndex = 0;
+ int currentRangeIndex = 0;
+
+ @Override
+ public boolean hasNext()
+ {
+ return currentRowIndex < numRows;
+ }
+
+ @Override
+ public Range next()
+ {
+ if (!hasNext()) {
+ throw new IllegalStateException();
+ }
+ // TODO: invert listing order at the end to get benefits of
incremenental aggregations
+ Range r = new Range(
+ Interval.of(
+ rangeToRowIndex(relativeRangeId(0)),
+ rangeToRowIndex(relativeRangeId(1))
+ ),
+ Interval.of(
+ rangeToRowIndex(relativeRangeId(-lowerOffset)),
+ rangeToRowIndex(relativeRangeId(upperOffset))
+ )
+ );
+
+ currentRowIndex = rangeToRowIndex(currentRangeIndex + 1);
+ currentRangeIndex++;
+ return r;
+ }
+
+ private int rangeToRowIndex(int rangeId)
+ {
+ return rangeToRowId[rangeId];
+ }
+
+ private int relativeRangeId(int rangeOffset)
+ {
+ int rangeId = currentRangeIndex + rangeOffset;
+ if (rangeId < 0) {
+ return 0;
+ }
+ if (rangeId >= numRanges) {
+ return numRanges;
+ }
+ return rangeId;
+ }
+ };
+ }
+ }
+
+
+ /**
+ * Basic [a,b) interval; left inclusive/right exclusive.
+ */
+ static class Interval implements Iterable<Integer>
+ {
+ int a;
+ int b;
+
+ public static Interval of(int u, int v)
+ {
+ return new Interval(u, v);
+ }
+
+ public Interval(int u, int v)
+ {
+ this.a = u;
+ this.b = v;
+ }
+
+ @Override
+ public Iterator<Integer> iterator()
+ {
+ return new Iterator<Integer>()
+ {
+ int current = a;
+
+ @Override
+ public Integer next()
+ {
+ if (!hasNext()) {
+ throw new IllegalStateException();
+ }
+ return current++;
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ return current < b;
+ }
+ };
+
+ }
+ }
+
+ /**
+ * Represents a range between U (inclusive) and V (exclusive).
+ */
+ static class Range
+ {
+ Interval outputRows;
+ Interval inputRows;
+
+ public Range(Interval outputRows, Interval inputRows)
+ {
+ this.outputRows = outputRows;
+ this.inputRows = inputRows;
+ }
+ }
+
+ static class AggCell
+ {
+ private AggregatorFactory[] aggFactories;
+ Interval currentRows = new Interval(0, 0);
+ private final AtomicInteger rowIdProvider;
+ private final ColumnSelectorFactory columnSelectorFactory;
+
+ private final Aggregator[] aggregators;
+
+
+ AggCell(AppendableRowsAndColumns rac, AggregatorFactory[] aggFactories)
+ {
+ this.aggFactories = aggFactories;
+ aggregators = new Aggregator[aggFactories.length];
+
+ rowIdProvider = new AtomicInteger(0);
+ columnSelectorFactory =
ColumnSelectorFactoryMaker.fromRAC(rac).make(rowIdProvider);
+
+ newAggregators();
+ }
+
+ private void newAggregators()
+ {
+ for (int i = 0; i < aggFactories.length; i++) {
+ aggregators[i] = aggFactories[i].factorize(columnSelectorFactory);
+ }
+ }
+
+ /**
+ * Reposition aggregation window to reflect the given rows.
+ */
+ public void moveTo(Interval newRows)
+ {
+ // incremental addition of additional values
+ if (currentRows.a == newRows.a && currentRows.b < newRows.b) {
+ for (int i = currentRows.b; i < newRows.b; i++) {
+ aggregate(i);
+ }
+ currentRows = newRows;
+ return;
+ }
+ newAggregators();
+ for (int i : newRows) {
+ aggregate(i);
+ }
+ currentRows = newRows;
+ }
+
+ public void setOutputs(Object[][] results, Interval outputRows)
+ {
+ for (int aggIdx = 0; aggIdx < aggFactories.length; aggIdx++) {
+ Object aggValue = aggregators[aggIdx].get();
+ for (int rowIdx = outputRows.a; rowIdx < outputRows.b; rowIdx++) {
+ results[aggIdx][rowIdx] = aggValue;
+ }
Review Comment:
Yes; I was also thinking about this - but all the aggregates I've taken a
look have either:
* used a primitive type (`max` / etc)
* created some more complicated object
* done a copy(!) during return
[HllSketchBuildAggregator](https://github.com/apache/druid/blob/dff5bcb0a6c693e6fc8131e91c5029dbab132fa1/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregator.java#L64)
* ...but I've found at least one which was exposing its internals
[Kll](https://github.com/apache/druid/blob/dff5bcb0a6c693e6fc8131e91c5029dbab132fa1/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllSketchBuildAggregator.java#L42)
I think instead of introducing a complicated copy mechanisms possible
alternatives could be:
* either add to the documentation that `get()` in under any circumstances
**may not return internal parts of the `Aggregator`** ; and possibly cover it
with a test - so that existing stuff lives up to that contract
* `get()` is not documented [at
all](https://github.com/apache/druid/blob/dff5bcb0a6c693e6fc8131e91c5029dbab132fa1/processing/src/main/java/org/apache/druid/query/aggregation/Aggregator.java#L67)
* introduce a marker interface like `GetIsSafe` ; mark good
aggregators...and throw an exception here stating that the aggregator is
not-good-enough if its not present
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]