imply-cheddar commented on code in PR #15365:
URL: https://github.com/apache/druid/pull/15365#discussion_r1394472910
##########
processing/src/main/java/org/apache/druid/query/operator/window/WindowFrame.java:
##########
@@ -123,6 +149,36 @@ public String toString()
", lowerOffset=" + lowerOffset +
", upperUnbounded=" + upperUnbounded +
", upperOffset=" + upperOffset +
+ ", orderBy=" + orderBy +
'}';
}
+
+ public static WindowFrame forOrderBy(ColumnWithDirection... orderBy)
+ {
+ return new WindowFrame(PeerType.RANGE, true, 0, false, 0,
Lists.newArrayList(orderBy));
+ }
+
+ public List<String> getOrderByColNames()
+ {
+ if (orderBy == null) {
+ return Collections.emptyList();
+ }
+ return
orderBy.stream().map(ColumnWithDirection::getColumn).collect(Collectors.toList());
+ }
+
+ public int getLowerOffsetClamped(int maxValue)
+ {
+ if (lowerUnbounded) {
+ return maxValue;
+ }
Review Comment:
Shouldn't this be returning 0? Is 0 getting passed in as `maxValue`?
##########
processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java:
##########
@@ -85,10 +88,234 @@ public RowsAndColumns aggregateAll(
}
}
} else {
- throw new UOE("RANGE peer groupings are unsupported");
+ return computeRangeAggregates(aggFactories, frame);
+ }
+ }
+
+ private RowsAndColumns computeRangeAggregates(
+ AggregatorFactory[] aggFactories,
+ WindowFrame frame)
+ {
+ RangeIteratorForWindow iter = new RangeIteratorForWindow(rac, frame);
+
+ int numRows = rac.numRows();
+ Object[][] results = new Object[aggFactories.length][numRows];
+
+ AggCell cell = new AggCell(rac, aggFactories);
+
+ for (Range xRange : iter) {
+
+ cell.moveTo(xRange.inputRows);
+ // TODO: if(xRange.outputRows.a ==0 && xRange.outputRows.b == numRows) {
return Const };
+
+ // note: would be better with results.setX()?
+ cell.setOutputs(results, xRange.outputRows);
}
+ return makeReturnRAC(aggFactories, results);
}
+ static class RangeIteratorForWindow implements Iterable<Range>
+ {
+ private final int[] rangeToRowId;
+ private final int numRows;
+ private final int numRanges;
+ private final int lowerOffset;
+ private final int upperOffset;
+
+ public RangeIteratorForWindow(AppendableRowsAndColumns rac, WindowFrame
frame)
+ {
+ assert (frame.getPeerType() == PeerType.RANGE);
+ rangeToRowId =
ClusteredGroupPartitioner.fromRAC(rac).computeBoundaries(frame.getOrderByColNames());
+ numRows = rac.numRows();
+ numRanges = rangeToRowId.length - 1;
+ lowerOffset = frame.getLowerOffsetClamped(numRanges);
+ upperOffset = frame.getUpperOffsetClamped(numRanges) + 1;
+ }
+
+ @Override
+ public Iterator<Range> iterator()
+ {
+ return new Iterator<Range>()
+ {
+ int currentRowIndex = 0;
+ int currentRangeIndex = 0;
+
+ @Override
+ public boolean hasNext()
+ {
+ return currentRowIndex < numRows;
+ }
+
+ @Override
+ public Range next()
+ {
+ if (!hasNext()) {
+ throw new IllegalStateException();
+ }
+ // TODO: invert listing order at the end to get benefits of
incremenental aggregations
Review Comment:
I don't remember if I've given you the commentary on TODO's in code yet. I
tend to do it once. So, I'll put it here:
A TODO comment actually checked into the primary code base is generally one
of the most use-less comments existing in the code. They are usually written
so tersely that only the original author can understand them, and even then,
3-6 months after writing, the original author often doesn't even remember what
they were thinking when they wrote it. There are 3 reasons why a TODO is put
in the code:
1. Keeping track of work that you want to complete inside of the current PR.
These are TODOs that you are doing just to help keep track of stuff and that
you intend to resolve before merge. These are fine, just resolve them before
merge please.
2. A replacement for an issue tracker. These are TODOs that are written
because they are indicative of work that could or could not be done in the
future and thus needs prioritization/discussion before being actually
undertaken. This type of TODO should never survive, it should be moved to an
issue tracker or just completely deleted (when/if the thing is needed, people
will figure it out even without the TODO to tell them, most likely, they will
only discover the TODO after they already know what they are trying to do
anyway)
3. An attempt at providing a hint to some future developer about some cool
optimization that could be done to the code in the hopes that they understand
what the TODO means and can do the optimization. This is another TODO that
should never survive. Basically, the comment will never be written with enough
context to actually explain why the thing is a "TODO". There is never enough
context to know how high of a priority the TODO really is nor an ability for
whatever developer sees the comment to really think about it. As such, if you
want to keep this sort of comment around, you should rewrite it to have all of
this context and then remove the "TODO" words. Given the comment enough words
to speak for itself and then let it speak for itself.
Okay, now that I know I've talked about TODOs, please do the right thing
(whichever one it is) with this TODO.
Generally speaking (2) and (3)
##########
processing/src/main/java/org/apache/druid/query/operator/window/WindowFrame.java:
##########
@@ -44,21 +49,35 @@ public enum PeerType
private final int lowerOffset;
private final boolean upperUnbounded;
private final int upperOffset;
+ private final List<ColumnWithDirection> orderBy;
+
+
+ public WindowFrame(
+ PeerType peerType,
+ boolean lowerUnbounded,
+ int lowerOffset,
+ boolean upperUnbounded,
+ int upperOffset)
+ {
+ this(peerType, lowerUnbounded, lowerOffset, upperUnbounded, upperOffset,
null);
+ }
Review Comment:
Why the new constructor? Overloading things can slowly spiral into lots and
lots of different call sites. Usually it's done to avoid changing other parts
of the code that are dependent on the fewer-argument version. Personally, when
I find myself wanting to overload because there are too many direct usages of a
constructor, I decide that it's time to make a builder and convert all of the
things to builders. You don't have to do that here, but it's worth considering.
##########
processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java:
##########
@@ -85,10 +88,234 @@ public RowsAndColumns aggregateAll(
}
}
} else {
- throw new UOE("RANGE peer groupings are unsupported");
+ return computeRangeAggregates(aggFactories, frame);
+ }
+ }
+
+ private RowsAndColumns computeRangeAggregates(
+ AggregatorFactory[] aggFactories,
+ WindowFrame frame)
+ {
+ RangeIteratorForWindow iter = new RangeIteratorForWindow(rac, frame);
+
+ int numRows = rac.numRows();
+ Object[][] results = new Object[aggFactories.length][numRows];
+
+ AggCell cell = new AggCell(rac, aggFactories);
+
+ for (Range xRange : iter) {
+
+ cell.moveTo(xRange.inputRows);
+ // TODO: if(xRange.outputRows.a ==0 && xRange.outputRows.b == numRows) {
return Const };
+
+ // note: would be better with results.setX()?
+ cell.setOutputs(results, xRange.outputRows);
}
+ return makeReturnRAC(aggFactories, results);
}
+ static class RangeIteratorForWindow implements Iterable<Range>
+ {
+ private final int[] rangeToRowId;
+ private final int numRows;
+ private final int numRanges;
+ private final int lowerOffset;
+ private final int upperOffset;
+
+ public RangeIteratorForWindow(AppendableRowsAndColumns rac, WindowFrame
frame)
+ {
+ assert (frame.getPeerType() == PeerType.RANGE);
+ rangeToRowId =
ClusteredGroupPartitioner.fromRAC(rac).computeBoundaries(frame.getOrderByColNames());
+ numRows = rac.numRows();
+ numRanges = rangeToRowId.length - 1;
+ lowerOffset = frame.getLowerOffsetClamped(numRanges);
+ upperOffset = frame.getUpperOffsetClamped(numRanges) + 1;
+ }
+
+ @Override
+ public Iterator<Range> iterator()
+ {
+ return new Iterator<Range>()
+ {
+ int currentRowIndex = 0;
+ int currentRangeIndex = 0;
+
+ @Override
+ public boolean hasNext()
+ {
+ return currentRowIndex < numRows;
+ }
+
+ @Override
+ public Range next()
+ {
+ if (!hasNext()) {
+ throw new IllegalStateException();
+ }
+ // TODO: invert listing order at the end to get benefits of
incremenental aggregations
+ Range r = new Range(
+ Interval.of(
+ rangeToRowIndex(relativeRangeId(0)),
+ rangeToRowIndex(relativeRangeId(1))
+ ),
+ Interval.of(
+ rangeToRowIndex(relativeRangeId(-lowerOffset)),
+ rangeToRowIndex(relativeRangeId(upperOffset))
+ )
+ );
+
+ currentRowIndex = rangeToRowIndex(currentRangeIndex + 1);
+ currentRangeIndex++;
+ return r;
+ }
+
+ private int rangeToRowIndex(int rangeId)
+ {
+ return rangeToRowId[rangeId];
+ }
+
+ private int relativeRangeId(int rangeOffset)
+ {
+ int rangeId = currentRangeIndex + rangeOffset;
+ if (rangeId < 0) {
+ return 0;
+ }
+ if (rangeId >= numRanges) {
+ return numRanges;
+ }
+ return rangeId;
+ }
+ };
+ }
+ }
+
+
+ /**
+ * Basic [a,b) interval; left inclusive/right exclusive.
+ */
+ static class Interval implements Iterable<Integer>
+ {
+ int a;
+ int b;
+
+ public static Interval of(int u, int v)
+ {
+ return new Interval(u, v);
+ }
+
+ public Interval(int u, int v)
+ {
+ this.a = u;
+ this.b = v;
+ }
+
+ @Override
+ public Iterator<Integer> iterator()
+ {
+ return new Iterator<Integer>()
+ {
+ int current = a;
+
+ @Override
+ public Integer next()
+ {
+ if (!hasNext()) {
+ throw new IllegalStateException();
+ }
+ return current++;
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ return current < b;
+ }
+ };
+
+ }
+ }
+
+ /**
+ * Represents a range between U (inclusive) and V (exclusive).
+ */
+ static class Range
+ {
+ Interval outputRows;
+ Interval inputRows;
+
+ public Range(Interval outputRows, Interval inputRows)
+ {
+ this.outputRows = outputRows;
+ this.inputRows = inputRows;
+ }
+ }
+
+ static class AggCell
+ {
+ private AggregatorFactory[] aggFactories;
+ Interval currentRows = new Interval(0, 0);
+ private final AtomicInteger rowIdProvider;
+ private final ColumnSelectorFactory columnSelectorFactory;
+
+ private final Aggregator[] aggregators;
+
+
+ AggCell(AppendableRowsAndColumns rac, AggregatorFactory[] aggFactories)
+ {
+ this.aggFactories = aggFactories;
+ aggregators = new Aggregator[aggFactories.length];
+
+ rowIdProvider = new AtomicInteger(0);
+ columnSelectorFactory =
ColumnSelectorFactoryMaker.fromRAC(rac).make(rowIdProvider);
+
+ newAggregators();
+ }
+
+ private void newAggregators()
+ {
+ for (int i = 0; i < aggFactories.length; i++) {
+ aggregators[i] = aggFactories[i].factorize(columnSelectorFactory);
+ }
+ }
+
+ /**
+ * Reposition aggregation window to reflect the given rows.
+ */
+ public void moveTo(Interval newRows)
+ {
+ // incremental addition of additional values
+ if (currentRows.a == newRows.a && currentRows.b < newRows.b) {
+ for (int i = currentRows.b; i < newRows.b; i++) {
+ aggregate(i);
+ }
+ currentRows = newRows;
+ return;
+ }
+ newAggregators();
+ for (int i : newRows) {
+ aggregate(i);
+ }
+ currentRows = newRows;
+ }
+
+ public void setOutputs(Object[][] results, Interval outputRows)
+ {
+ for (int aggIdx = 0; aggIdx < aggFactories.length; aggIdx++) {
+ Object aggValue = aggregators[aggIdx].get();
+ for (int rowIdx = outputRows.a; rowIdx < outputRows.b; rowIdx++) {
+ results[aggIdx][rowIdx] = aggValue;
+ }
Review Comment:
If, e.g., this method is called, then `aggregate()` is called before
`newAggregators()` is called, it's totally acceptable (though not currently
documented on the `Aggregator` interface) that the underlying object itself
gets mutated as part of the subsequent `aggregate()` call. For a cumulative
aggregation, you need to do a `.get()` and then feed that as the first input
back into a new Aggregator object.
##########
processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java:
##########
@@ -85,10 +88,234 @@ public RowsAndColumns aggregateAll(
}
}
} else {
- throw new UOE("RANGE peer groupings are unsupported");
+ return computeRangeAggregates(aggFactories, frame);
+ }
+ }
+
+ private RowsAndColumns computeRangeAggregates(
+ AggregatorFactory[] aggFactories,
+ WindowFrame frame)
+ {
+ RangeIteratorForWindow iter = new RangeIteratorForWindow(rac, frame);
+
+ int numRows = rac.numRows();
+ Object[][] results = new Object[aggFactories.length][numRows];
+
+ AggCell cell = new AggCell(rac, aggFactories);
+
+ for (Range xRange : iter) {
+
+ cell.moveTo(xRange.inputRows);
+ // TODO: if(xRange.outputRows.a ==0 && xRange.outputRows.b == numRows) {
return Const };
+
+ // note: would be better with results.setX()?
+ cell.setOutputs(results, xRange.outputRows);
}
+ return makeReturnRAC(aggFactories, results);
}
+ static class RangeIteratorForWindow implements Iterable<Range>
+ {
+ private final int[] rangeToRowId;
+ private final int numRows;
+ private final int numRanges;
+ private final int lowerOffset;
+ private final int upperOffset;
+
+ public RangeIteratorForWindow(AppendableRowsAndColumns rac, WindowFrame
frame)
+ {
+ assert (frame.getPeerType() == PeerType.RANGE);
+ rangeToRowId =
ClusteredGroupPartitioner.fromRAC(rac).computeBoundaries(frame.getOrderByColNames());
+ numRows = rac.numRows();
+ numRanges = rangeToRowId.length - 1;
+ lowerOffset = frame.getLowerOffsetClamped(numRanges);
+ upperOffset = frame.getUpperOffsetClamped(numRanges) + 1;
+ }
+
+ @Override
+ public Iterator<Range> iterator()
+ {
+ return new Iterator<Range>()
+ {
+ int currentRowIndex = 0;
+ int currentRangeIndex = 0;
+
+ @Override
+ public boolean hasNext()
+ {
+ return currentRowIndex < numRows;
+ }
+
+ @Override
+ public Range next()
+ {
+ if (!hasNext()) {
+ throw new IllegalStateException();
+ }
+ // TODO: invert listing order at the end to get benefits of
incremenental aggregations
+ Range r = new Range(
+ Interval.of(
+ rangeToRowIndex(relativeRangeId(0)),
+ rangeToRowIndex(relativeRangeId(1))
+ ),
+ Interval.of(
+ rangeToRowIndex(relativeRangeId(-lowerOffset)),
+ rangeToRowIndex(relativeRangeId(upperOffset))
+ )
+ );
+
+ currentRowIndex = rangeToRowIndex(currentRangeIndex + 1);
+ currentRangeIndex++;
+ return r;
+ }
+
+ private int rangeToRowIndex(int rangeId)
+ {
+ return rangeToRowId[rangeId];
+ }
+
+ private int relativeRangeId(int rangeOffset)
+ {
+ int rangeId = currentRangeIndex + rangeOffset;
+ if (rangeId < 0) {
+ return 0;
+ }
+ if (rangeId >= numRanges) {
+ return numRanges;
+ }
+ return rangeId;
+ }
+ };
+ }
+ }
+
+
+ /**
+ * Basic [a,b) interval; left inclusive/right exclusive.
+ */
+ static class Interval implements Iterable<Integer>
+ {
+ int a;
+ int b;
+
+ public static Interval of(int u, int v)
+ {
+ return new Interval(u, v);
+ }
+
+ public Interval(int u, int v)
+ {
+ this.a = u;
+ this.b = v;
+ }
+
+ @Override
+ public Iterator<Integer> iterator()
+ {
+ return new Iterator<Integer>()
+ {
+ int current = a;
+
+ @Override
+ public Integer next()
+ {
+ if (!hasNext()) {
+ throw new IllegalStateException();
+ }
+ return current++;
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ return current < b;
+ }
+ };
+
+ }
+ }
+
+ /**
+ * Represents a range between U (inclusive) and V (exclusive).
+ */
+ static class Range
+ {
+ Interval outputRows;
+ Interval inputRows;
+
+ public Range(Interval outputRows, Interval inputRows)
+ {
+ this.outputRows = outputRows;
+ this.inputRows = inputRows;
+ }
+ }
+
+ static class AggCell
+ {
+ private AggregatorFactory[] aggFactories;
+ Interval currentRows = new Interval(0, 0);
+ private final AtomicInteger rowIdProvider;
+ private final ColumnSelectorFactory columnSelectorFactory;
+
+ private final Aggregator[] aggregators;
+
+
+ AggCell(AppendableRowsAndColumns rac, AggregatorFactory[] aggFactories)
+ {
+ this.aggFactories = aggFactories;
+ aggregators = new Aggregator[aggFactories.length];
+
+ rowIdProvider = new AtomicInteger(0);
+ columnSelectorFactory =
ColumnSelectorFactoryMaker.fromRAC(rac).make(rowIdProvider);
+
+ newAggregators();
+ }
+
+ private void newAggregators()
+ {
+ for (int i = 0; i < aggFactories.length; i++) {
+ aggregators[i] = aggFactories[i].factorize(columnSelectorFactory);
+ }
+ }
+
+ /**
+ * Reposition aggregation window to reflect the given rows.
+ */
+ public void moveTo(Interval newRows)
+ {
+ // incremental addition of additional values
+ if (currentRows.a == newRows.a && currentRows.b < newRows.b) {
+ for (int i = currentRows.b; i < newRows.b; i++) {
+ aggregate(i);
+ }
+ currentRows = newRows;
+ return;
+ }
Review Comment:
In reading this method, I ended up getting stuck a bit because I missed this
`return` statement. Given that this method is either one or the other, an
`if/else` would've likely saved me a bit of time in realizing that it was an
either-or branch.
##########
processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java:
##########
@@ -85,10 +88,234 @@ public RowsAndColumns aggregateAll(
}
}
} else {
- throw new UOE("RANGE peer groupings are unsupported");
+ return computeRangeAggregates(aggFactories, frame);
+ }
+ }
+
+ private RowsAndColumns computeRangeAggregates(
+ AggregatorFactory[] aggFactories,
+ WindowFrame frame)
+ {
+ RangeIteratorForWindow iter = new RangeIteratorForWindow(rac, frame);
+
+ int numRows = rac.numRows();
+ Object[][] results = new Object[aggFactories.length][numRows];
+
+ AggCell cell = new AggCell(rac, aggFactories);
+
+ for (Range xRange : iter) {
+
+ cell.moveTo(xRange.inputRows);
+ // TODO: if(xRange.outputRows.a ==0 && xRange.outputRows.b == numRows) {
return Const };
+
+ // note: would be better with results.setX()?
+ cell.setOutputs(results, xRange.outputRows);
}
+ return makeReturnRAC(aggFactories, results);
}
+ static class RangeIteratorForWindow implements Iterable<Range>
+ {
+ private final int[] rangeToRowId;
+ private final int numRows;
+ private final int numRanges;
+ private final int lowerOffset;
+ private final int upperOffset;
+
+ public RangeIteratorForWindow(AppendableRowsAndColumns rac, WindowFrame
frame)
+ {
+ assert (frame.getPeerType() == PeerType.RANGE);
+ rangeToRowId =
ClusteredGroupPartitioner.fromRAC(rac).computeBoundaries(frame.getOrderByColNames());
+ numRows = rac.numRows();
+ numRanges = rangeToRowId.length - 1;
+ lowerOffset = frame.getLowerOffsetClamped(numRanges);
+ upperOffset = frame.getUpperOffsetClamped(numRanges) + 1;
+ }
Review Comment:
Just a bit of a design thing. Passing in the RowsAndColumns to compute the
dependencies of this object makes for a bit of a weird structuring. It would
be better to make this depend on what it actually needs (the frame and the
`int[]`) and then implement based on that.
Having the constructor depend on what it actually needs sets this up to be
much more testable and also eliminates work in the constructor (which is
usually indicative of a bit of a code smell)
##########
processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java:
##########
@@ -85,10 +88,234 @@ public RowsAndColumns aggregateAll(
}
}
} else {
- throw new UOE("RANGE peer groupings are unsupported");
+ return computeRangeAggregates(aggFactories, frame);
+ }
+ }
+
+ private RowsAndColumns computeRangeAggregates(
+ AggregatorFactory[] aggFactories,
+ WindowFrame frame)
+ {
+ RangeIteratorForWindow iter = new RangeIteratorForWindow(rac, frame);
+
+ int numRows = rac.numRows();
+ Object[][] results = new Object[aggFactories.length][numRows];
+
+ AggCell cell = new AggCell(rac, aggFactories);
+
+ for (Range xRange : iter) {
+
+ cell.moveTo(xRange.inputRows);
+ // TODO: if(xRange.outputRows.a ==0 && xRange.outputRows.b == numRows) {
return Const };
+
+ // note: would be better with results.setX()?
+ cell.setOutputs(results, xRange.outputRows);
}
+ return makeReturnRAC(aggFactories, results);
}
+ static class RangeIteratorForWindow implements Iterable<Range>
+ {
+ private final int[] rangeToRowId;
+ private final int numRows;
+ private final int numRanges;
+ private final int lowerOffset;
+ private final int upperOffset;
+
+ public RangeIteratorForWindow(AppendableRowsAndColumns rac, WindowFrame
frame)
+ {
+ assert (frame.getPeerType() == PeerType.RANGE);
+ rangeToRowId =
ClusteredGroupPartitioner.fromRAC(rac).computeBoundaries(frame.getOrderByColNames());
+ numRows = rac.numRows();
+ numRanges = rangeToRowId.length - 1;
Review Comment:
This isn't really "numRanges", it's "numRows - 1"?
##########
processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java:
##########
@@ -85,10 +88,234 @@ public RowsAndColumns aggregateAll(
}
}
} else {
- throw new UOE("RANGE peer groupings are unsupported");
+ return computeRangeAggregates(aggFactories, frame);
+ }
+ }
+
+ private RowsAndColumns computeRangeAggregates(
+ AggregatorFactory[] aggFactories,
+ WindowFrame frame)
+ {
+ RangeIteratorForWindow iter = new RangeIteratorForWindow(rac, frame);
+
+ int numRows = rac.numRows();
+ Object[][] results = new Object[aggFactories.length][numRows];
+
+ AggCell cell = new AggCell(rac, aggFactories);
+
+ for (Range xRange : iter) {
+
+ cell.moveTo(xRange.inputRows);
+ // TODO: if(xRange.outputRows.a ==0 && xRange.outputRows.b == numRows) {
return Const };
+
+ // note: would be better with results.setX()?
+ cell.setOutputs(results, xRange.outputRows);
}
+ return makeReturnRAC(aggFactories, results);
}
+ static class RangeIteratorForWindow implements Iterable<Range>
+ {
+ private final int[] rangeToRowId;
+ private final int numRows;
+ private final int numRanges;
+ private final int lowerOffset;
+ private final int upperOffset;
+
+ public RangeIteratorForWindow(AppendableRowsAndColumns rac, WindowFrame
frame)
+ {
+ assert (frame.getPeerType() == PeerType.RANGE);
Review Comment:
Don't have asserts please. They show up in jvm byte code and have shown up
in profilers before.
If the assert is important enough, then just have a conditional and an
exception.
##########
processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java:
##########
@@ -85,10 +88,234 @@ public RowsAndColumns aggregateAll(
}
}
} else {
- throw new UOE("RANGE peer groupings are unsupported");
+ return computeRangeAggregates(aggFactories, frame);
+ }
+ }
+
+ private RowsAndColumns computeRangeAggregates(
+ AggregatorFactory[] aggFactories,
+ WindowFrame frame)
+ {
+ RangeIteratorForWindow iter = new RangeIteratorForWindow(rac, frame);
+
+ int numRows = rac.numRows();
+ Object[][] results = new Object[aggFactories.length][numRows];
+
+ AggCell cell = new AggCell(rac, aggFactories);
+
+ for (Range xRange : iter) {
+
+ cell.moveTo(xRange.inputRows);
+ // TODO: if(xRange.outputRows.a ==0 && xRange.outputRows.b == numRows) {
return Const };
+
+ // note: would be better with results.setX()?
+ cell.setOutputs(results, xRange.outputRows);
Review Comment:
The method names being used here don't really resonate with me. That is,
when I read this code, I don't get a clear picture of what the code is trying
to do. `.moveTo()` seems to be doing the work, but `.moveTo` makes me think
that I'm dealing with a cursor and I'm setting a pointer?
##########
sql/src/test/resources/calcite/tests/window/range_handling.sqlTest:
##########
@@ -0,0 +1,18 @@
+type: "operatorValidation"
+
+sql: |
+ SELECT
+ FLOOR(m1/3),
+ --DENSE_RANK() OVER (ORDER BY FLOOR(m1/3)),
Review Comment:
Remove the comments before merge?
##########
processing/src/main/java/org/apache/druid/query/operator/window/WindowFramedAggregateProcessor.java:
##########
@@ -96,4 +96,32 @@ public String toString()
", aggregations=" + Arrays.toString(aggregations) +
'}';
}
+
+ @Override
+ public int hashCode()
+ {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + Arrays.hashCode(aggregations);
+ result = prime * result + Objects.hash(frame);
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ WindowFramedAggregateProcessor other = (WindowFramedAggregateProcessor)
obj;
+ return Arrays.equals(aggregations, other.aggregations) &&
Objects.equals(frame, other.frame);
+ }
Review Comment:
Why did you need these? If it was for validating tests, the tests have been
depending on `validateEquivalent` instead of `equals` in order to validate just
the interesting bits.
##########
processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java:
##########
@@ -85,10 +88,234 @@ public RowsAndColumns aggregateAll(
}
}
} else {
- throw new UOE("RANGE peer groupings are unsupported");
+ return computeRangeAggregates(aggFactories, frame);
+ }
+ }
+
+ private RowsAndColumns computeRangeAggregates(
+ AggregatorFactory[] aggFactories,
+ WindowFrame frame)
+ {
+ RangeIteratorForWindow iter = new RangeIteratorForWindow(rac, frame);
+
+ int numRows = rac.numRows();
+ Object[][] results = new Object[aggFactories.length][numRows];
+
+ AggCell cell = new AggCell(rac, aggFactories);
+
+ for (Range xRange : iter) {
+
+ cell.moveTo(xRange.inputRows);
+ // TODO: if(xRange.outputRows.a ==0 && xRange.outputRows.b == numRows) {
return Const };
+
+ // note: would be better with results.setX()?
+ cell.setOutputs(results, xRange.outputRows);
}
+ return makeReturnRAC(aggFactories, results);
}
+ static class RangeIteratorForWindow implements Iterable<Range>
+ {
+ private final int[] rangeToRowId;
+ private final int numRows;
+ private final int numRanges;
+ private final int lowerOffset;
+ private final int upperOffset;
+
+ public RangeIteratorForWindow(AppendableRowsAndColumns rac, WindowFrame
frame)
+ {
+ assert (frame.getPeerType() == PeerType.RANGE);
+ rangeToRowId =
ClusteredGroupPartitioner.fromRAC(rac).computeBoundaries(frame.getOrderByColNames());
+ numRows = rac.numRows();
+ numRanges = rangeToRowId.length - 1;
+ lowerOffset = frame.getLowerOffsetClamped(numRanges);
+ upperOffset = frame.getUpperOffsetClamped(numRanges) + 1;
+ }
+
+ @Override
+ public Iterator<Range> iterator()
+ {
+ return new Iterator<Range>()
+ {
+ int currentRowIndex = 0;
+ int currentRangeIndex = 0;
+
+ @Override
+ public boolean hasNext()
+ {
+ return currentRowIndex < numRows;
+ }
+
+ @Override
+ public Range next()
+ {
+ if (!hasNext()) {
+ throw new IllegalStateException();
+ }
+ // TODO: invert listing order at the end to get benefits of
incremenental aggregations
+ Range r = new Range(
+ Interval.of(
+ rangeToRowIndex(relativeRangeId(0)),
+ rangeToRowIndex(relativeRangeId(1))
+ ),
+ Interval.of(
+ rangeToRowIndex(relativeRangeId(-lowerOffset)),
+ rangeToRowIndex(relativeRangeId(upperOffset))
+ )
+ );
+
+ currentRowIndex = rangeToRowIndex(currentRangeIndex + 1);
+ currentRangeIndex++;
+ return r;
+ }
+
+ private int rangeToRowIndex(int rangeId)
+ {
+ return rangeToRowId[rangeId];
+ }
+
+ private int relativeRangeId(int rangeOffset)
+ {
+ int rangeId = currentRangeIndex + rangeOffset;
+ if (rangeId < 0) {
+ return 0;
+ }
+ if (rangeId >= numRanges) {
+ return numRanges;
+ }
+ return rangeId;
+ }
+ };
+ }
+ }
+
+
+ /**
+ * Basic [a,b) interval; left inclusive/right exclusive.
+ */
+ static class Interval implements Iterable<Integer>
+ {
+ int a;
+ int b;
+
+ public static Interval of(int u, int v)
+ {
+ return new Interval(u, v);
+ }
+
+ public Interval(int u, int v)
+ {
+ this.a = u;
+ this.b = v;
+ }
+
+ @Override
+ public Iterator<Integer> iterator()
+ {
+ return new Iterator<Integer>()
+ {
+ int current = a;
+
+ @Override
+ public Integer next()
+ {
+ if (!hasNext()) {
+ throw new IllegalStateException();
+ }
+ return current++;
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ return current < b;
+ }
+ };
+
+ }
+ }
+
+ /**
+ * Represents a range between U (inclusive) and V (exclusive).
Review Comment:
What is `U` and `V`? I see `outputRows` and `inputRows`?
##########
sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java:
##########
@@ -64,7 +64,7 @@ public class CalciteWindowQueryTest extends
BaseCalciteQueryTest
public static final boolean DUMP_ACTUAL_RESULTS = Boolean.parseBoolean(
System.getProperty("druid.tests.sql.dumpActualResults")
- );
+ ) || true;
Review Comment:
I'm thinking that you added this just for simplicity while developing and
meant to remove this before merge?
##########
processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java:
##########
@@ -85,10 +88,234 @@ public RowsAndColumns aggregateAll(
}
}
} else {
- throw new UOE("RANGE peer groupings are unsupported");
+ return computeRangeAggregates(aggFactories, frame);
+ }
+ }
+
+ private RowsAndColumns computeRangeAggregates(
+ AggregatorFactory[] aggFactories,
+ WindowFrame frame)
+ {
+ RangeIteratorForWindow iter = new RangeIteratorForWindow(rac, frame);
+
+ int numRows = rac.numRows();
+ Object[][] results = new Object[aggFactories.length][numRows];
+
+ AggCell cell = new AggCell(rac, aggFactories);
+
+ for (Range xRange : iter) {
+
+ cell.moveTo(xRange.inputRows);
+ // TODO: if(xRange.outputRows.a ==0 && xRange.outputRows.b == numRows) {
return Const };
+
+ // note: would be better with results.setX()?
+ cell.setOutputs(results, xRange.outputRows);
}
+ return makeReturnRAC(aggFactories, results);
}
+ static class RangeIteratorForWindow implements Iterable<Range>
+ {
+ private final int[] rangeToRowId;
+ private final int numRows;
+ private final int numRanges;
+ private final int lowerOffset;
+ private final int upperOffset;
+
+ public RangeIteratorForWindow(AppendableRowsAndColumns rac, WindowFrame
frame)
+ {
+ assert (frame.getPeerType() == PeerType.RANGE);
+ rangeToRowId =
ClusteredGroupPartitioner.fromRAC(rac).computeBoundaries(frame.getOrderByColNames());
+ numRows = rac.numRows();
+ numRanges = rangeToRowId.length - 1;
+ lowerOffset = frame.getLowerOffsetClamped(numRanges);
+ upperOffset = frame.getUpperOffsetClamped(numRanges) + 1;
+ }
+
+ @Override
+ public Iterator<Range> iterator()
+ {
+ return new Iterator<Range>()
+ {
+ int currentRowIndex = 0;
+ int currentRangeIndex = 0;
+
+ @Override
+ public boolean hasNext()
+ {
+ return currentRowIndex < numRows;
+ }
+
+ @Override
+ public Range next()
+ {
+ if (!hasNext()) {
+ throw new IllegalStateException();
+ }
+ // TODO: invert listing order at the end to get benefits of
incremenental aggregations
+ Range r = new Range(
+ Interval.of(
+ rangeToRowIndex(relativeRangeId(0)),
+ rangeToRowIndex(relativeRangeId(1))
+ ),
+ Interval.of(
+ rangeToRowIndex(relativeRangeId(-lowerOffset)),
+ rangeToRowIndex(relativeRangeId(upperOffset))
+ )
+ );
+
+ currentRowIndex = rangeToRowIndex(currentRangeIndex + 1);
+ currentRangeIndex++;
+ return r;
+ }
+
+ private int rangeToRowIndex(int rangeId)
+ {
+ return rangeToRowId[rangeId];
+ }
+
+ private int relativeRangeId(int rangeOffset)
+ {
+ int rangeId = currentRangeIndex + rangeOffset;
+ if (rangeId < 0) {
+ return 0;
+ }
+ if (rangeId >= numRanges) {
+ return numRanges;
+ }
+ return rangeId;
+ }
+ };
+ }
+ }
+
+
+ /**
+ * Basic [a,b) interval; left inclusive/right exclusive.
+ */
+ static class Interval implements Iterable<Integer>
+ {
+ int a;
+ int b;
+
+ public static Interval of(int u, int v)
+ {
+ return new Interval(u, v);
+ }
+
+ public Interval(int u, int v)
+ {
+ this.a = u;
+ this.b = v;
+ }
+
+ @Override
+ public Iterator<Integer> iterator()
+ {
+ return new Iterator<Integer>()
+ {
+ int current = a;
+
+ @Override
+ public Integer next()
+ {
+ if (!hasNext()) {
+ throw new IllegalStateException();
+ }
+ return current++;
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ return current < b;
+ }
+ };
+
+ }
+ }
+
+ /**
+ * Represents a range between U (inclusive) and V (exclusive).
+ */
+ static class Range
+ {
+ Interval outputRows;
+ Interval inputRows;
+
+ public Range(Interval outputRows, Interval inputRows)
+ {
+ this.outputRows = outputRows;
+ this.inputRows = inputRows;
+ }
+ }
+
+ static class AggCell
+ {
+ private AggregatorFactory[] aggFactories;
+ Interval currentRows = new Interval(0, 0);
+ private final AtomicInteger rowIdProvider;
+ private final ColumnSelectorFactory columnSelectorFactory;
+
+ private final Aggregator[] aggregators;
+
+
+ AggCell(AppendableRowsAndColumns rac, AggregatorFactory[] aggFactories)
Review Comment:
Instead of this depending on an `AppendableRowsAndColumns` I'd suggest that
you make it depend on a `ColumnSelectorFactoryMaker`. On top of that, if it
was me, I'd write it as a static creator method and have this constructor take
the `AtomicInteger, ColumnSelectorFactory, AggregatorFactory[]` tuple.
##########
processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/FramedOnHeapAggregatableTest.java:
##########
@@ -456,4 +459,140 @@ public void testReverseCumulativeAggregation()
.allColumnsRegistered()
.validate(results);
}
+
+
+
+ @Test
+ public void testRangeOrderBy()
+ {
+ WindowFrame frame =
WindowFrame.forOrderBy(ColumnWithDirection.ascending("c1"));
+ int[] c1Vals = new int[] {0, 0, 0, 1, 1, 1, 2, 2, 2, 2};
+ int[] c2Vals = new int[] {1, 1, 2, 1, 1, 2, 1, 1, 1, 2};
+ int[] resVals = new int[] {4, 4, 4, 8, 8, 8, 13, 13, 13, 13};
+
+ simpleWindowingTest(frame, c1Vals, c2Vals, resVals);
+ }
+
+ @Test
+ public void testRangeB1()
+ {
+ WindowFrame frame = new WindowFrame(
+ PeerType.RANGE,
+ false,
+ 1,
+ false,
+ 0,
+ Collections.singletonList(ColumnWithDirection.ascending("c1"))
+ );
+
+ int[] c1Vals = new int[] {0, 1, 2, 2, 3, 4, 5};
+ int[] c2Vals = new int[] {0, 1, 1, 1, 3, 4, 5};
+ int[] resVals = new int[] {0, 1, 3, 3, 5, 7, 9};
+
+ simpleWindowingTest(frame, c1Vals, c2Vals, resVals);
+ }
+
+ @Test
+ public void testRangeA1()
+ {
+ WindowFrame frame = new WindowFrame(
+ PeerType.RANGE,
+ false,
+ 0,
+ false,
+ 1,
+ Collections.singletonList(ColumnWithDirection.ascending("c1"))
+ );
+
+ int[] c1Vals = new int[] {0, 1, 2, 2, 3, 4, 5};
+ int[] c2Vals = new int[] {0, 1, 1, 1, 3, 4, 5};
+ int[] resVals = new int[] {1, 3, 5, 5, 7, 9, 5};
+
+ simpleWindowingTest(frame, c1Vals, c2Vals, resVals);
+ }
+
+ @Test
+ public void testRangeB1A1()
+ {
+ WindowFrame frame = new WindowFrame(
+ PeerType.RANGE,
+ false,
+ 1,
+ false,
+ 1,
+ Collections.singletonList(ColumnWithDirection.ascending("c1"))
+ );
+
+ int[] c1Vals = new int[] {0, 1, 2, 3, 4, 5};
+ int[] c2Vals = new int[] {0, 1, 2, 3, 4, 5};
+ int[] resVals = new int[] {1, 3, 6, 9, 12, 9};
+
+ simpleWindowingTest(frame, c1Vals, c2Vals, resVals);
+ }
+
+
+ @Test
+ public void testRangeB1A1_2()
+ {
+ WindowFrame frame = new WindowFrame(
+ PeerType.RANGE,
+ false,
+ 1,
+ false,
+ 1,
+ Collections.singletonList(ColumnWithDirection.ascending("c1"))
+ );
+
+ int[] c1Vals = new int[] {0, 0, 1, 2, 3, 3, 4, 4, 5};
+ int[] c2Vals = new int[] {0, 0, 1, 2, 2, 1, 2, 2, 5};
+ int[] resVals = new int[] {1, 1, 3, 6, 9, 9, 12, 12, 9};
+
+ simpleWindowingTest(frame, c1Vals, c2Vals, resVals);
+ }
+
+ @Test
+ public void testRangeB1A2()
+ {
+ WindowFrame frame = new WindowFrame(
+ PeerType.RANGE,
+ false,
+ 1,
+ false,
+ 2,
+ Collections.singletonList(ColumnWithDirection.ascending("c1"))
+ );
+
+ int[] c1Vals = new int[] {0, 0, 0, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3};
+ int[] c2Vals = new int[] {1, 1, 2, 1, 1, 2, 1, 1, 1, 2, 1, 1, 1};
+ int[] resVals = new int[] {13, 13, 13, 16, 16, 16, 12, 12, 12, 12, 8, 8,
8};
+
+ simpleWindowingTest(frame, c1Vals, c2Vals, resVals);
+ }
+
+
+ private void simpleWindowingTest(WindowFrame frame, int[] c1Vals, int[]
c2Vals, int[] resVals)
+ {
+ Map<String, Column> map = new LinkedHashMap<>();
+ map.put("c1", new IntArrayColumn(c1Vals));
+ map.put("c2", new IntArrayColumn(c2Vals));
+
+ RowsAndColumns rac = make(MapOfColumnsRowsAndColumns.fromMap(map));
+
+ FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);
+
+ final RowsAndColumns results = agger.aggregateAll(
+ frame,
+ new AggregatorFactory[] {
+ new LongSumAggregatorFactory("res", "c2")
+ }
+ );
+
+ new RowsAndColumnsHelper()
+ .expectColumn("c1", c1Vals)
+ .expectColumn("c2", c2Vals)
+ .expectColumn("res", resVals)
+ .allColumnsRegistered()
+ .validate(results);
+ }
Review Comment:
It might be good to also have some tests using the HLL aggregators. I
expect them to be a good thing that exercises the whole "the object gets
mutated after `.get()`" thing that I mentioned in a different comment.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]