clintropolis commented on code in PR #14408:
URL: https://github.com/apache/druid/pull/14408#discussion_r1236456703
##########
processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregatorFactory.java:
##########
@@ -125,6 +138,23 @@ public BufferAggregator
factorizeBuffered(ColumnSelectorFactory metricFactory)
}
}
+ @Override
+ public VectorAggregator factorizeVector(
+ VectorColumnSelectorFactory columnSelectorFactory
+ )
+ {
+ ColumnCapabilities capabilities =
columnSelectorFactory.getColumnCapabilities(fieldName);
+ VectorValueSelector valueSelector =
columnSelectorFactory.makeValueSelector(fieldName);
+ //time is always long
+ BaseLongVectorValueSelector timeSelector = (BaseLongVectorValueSelector)
columnSelectorFactory.makeValueSelector(
+ timeColumn);
+ if (capabilities == null || capabilities.isNumeric()) {
Review Comment:
in the vectorized engine, capabilities being null means the column doesn't
exist, and so you can use the nil aggregation i think?
##########
benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java:
##########
@@ -205,7 +205,7 @@ public String getFormatString()
"SELECT TIME_SHIFT(MILLIS_TO_TIMESTAMP(long4), 'PT1H', 1), string2,
SUM(long1 * double4) FROM foo GROUP BY 1,2 ORDER BY 3",
// 37: time shift + expr agg (group by), uniform distribution high
cardinality
"SELECT TIME_SHIFT(MILLIS_TO_TIMESTAMP(long5), 'PT1H', 1), string2,
SUM(long1 * double4) FROM foo GROUP BY 1,2 ORDER BY 3",
- // 38: LATEST aggregator
+ // 38: LATEST aggregator long
"SELECT LATEST(long1) FROM foo",
Review Comment:
nit: fwiw these benchmarks were primarily meant for testing vectorized
expression virtual columns, `SqlBenchmark` is the general purpose place for
measuring stuff, that said these don't hurt being here and they have a bit less
baggage than `SqlBenchmark`
##########
processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstVectorAggregator.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.first;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+/**
+ * Class for vectorized version of first/earliest aggregator over numeric types
+ */
+public abstract class NumericFirstVectorAggregator implements VectorAggregator
+{
+ static final int NULL_OFFSET = Long.BYTES;
+ static final int VALUE_OFFSET = NULL_OFFSET + Byte.BYTES;
+ final VectorValueSelector valueSelector;
+ private final boolean useDefault = NullHandling.replaceWithDefault();
+ private final VectorValueSelector timeSelector;
+ private long firstTime;
+
+ public NumericFirstVectorAggregator(VectorValueSelector timeSelector,
VectorValueSelector valueSelector)
+ {
+ this.timeSelector = timeSelector;
+ this.valueSelector = valueSelector;
+ firstTime = Long.MAX_VALUE;
+ }
+
+ @Override
+ public void init(ByteBuffer buf, int position)
+ {
+ buf.putLong(position, Long.MAX_VALUE);
+ buf.put(position + NULL_OFFSET, useDefault ? NullHandling.IS_NOT_NULL_BYTE
: NullHandling.IS_NULL_BYTE);
+ initValue(buf, position + VALUE_OFFSET);
+ }
+
+ @Override
+ public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+ {
+ final long[] timeVector = timeSelector.getLongVector();
+ final boolean[] nullValueVector = valueSelector.getNullVector();
+ boolean nullAbsent = false;
+ firstTime = buf.getLong(position);
+ // check if nullVector is found or not
+ // the nullVector is null if no null values are found
+ // set the nullAbsent flag accordingly
+ if (nullValueVector == null) {
+ nullAbsent = true;
+ }
+
+ // the time vector is already sorted so the first element would be the
earliest
+ // traverse accordingly
+ int index = startRow;
+ if (!useDefault && !nullAbsent) {
+ for (int i = startRow; i < endRow; i++) {
+ if (!nullValueVector[i]) {
+ index = i;
+ break;
+ }
+ }
+ }
+
+ final long earliestTime = timeVector[index];
+ if (earliestTime < firstTime) {
+ firstTime = earliestTime;
+ if (useDefault || nullValueVector == null || !nullValueVector[index]) {
+ updateTimeWithValue(buf, position, firstTime, index);
+ } else {
+ updateTimeWithNull(buf, position, firstTime);
+ }
+ }
+ }
+
+ /**
+ *
+ * Checks if the aggregated value at a position in the buffer is null or not
+ *
+ * @param buf byte buffer storing the byte array representation of
the aggregate
+ * @param position offset within the byte buffer at which the current
aggregate value is stored
+ * @return
+ */
+ boolean isValueNull(ByteBuffer buf, int position)
+ {
+ return buf.get(position + NULL_OFFSET) == NullHandling.IS_NULL_BYTE;
+ }
+
+ @Override
+ public void aggregate(
+ ByteBuffer buf,
+ int numRows,
+ int[] positions,
+ @Nullable int[] rows,
+ int positionOffset
+ )
+ {
+ boolean[] nulls = useDefault ? null : valueSelector.getNullVector();
+ long[] timeVector = timeSelector.getLongVector();
+
+ for (int i = 0; i < numRows; i++) {
+ int position = positions[i] + positionOffset;
+ int row = rows == null ? i : rows[i];
+ long firstTime = buf.getLong(position);
+ if (timeVector[row] < firstTime) {
+ if (useDefault || nulls == null || !nulls[row]) {
+ updateTimeWithValue(buf, position, timeVector[row], row);
+ } else {
+ updateTimeWithNull(buf, position, timeVector[row]);
+ }
+ }
+ }
+ }
+
+ /**
+ * Updates the time and the non null values to the appropriate position in
buffer
+ *
+ * @param buf byte buffer storing the byte array representation of
the aggregate
+ * @param position offset within the byte buffer at which the current
aggregate value is stored
+ * @param time the time to be updated in the buffer as the last time
+ * @param index the index of the vectorized vector which is the last
value
+ */
+ void updateTimeWithValue(ByteBuffer buf, int position, long time, int index)
+ {
+ buf.putLong(position, time);
+ buf.put(position + NULL_OFFSET, NullHandling.IS_NOT_NULL_BYTE);
+ putValue(buf, position + VALUE_OFFSET, index);
+ }
+
+ /**
+ *Updates the time only to the appropriate position in buffer as the value
is null
Review Comment:
nit formatting (space after *)
##########
processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstVectorAggregator.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.first;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+/**
+ * Class for vectorized version of first/earliest aggregator over numeric types
+ */
+public abstract class NumericFirstVectorAggregator implements VectorAggregator
+{
+ static final int NULL_OFFSET = Long.BYTES;
+ static final int VALUE_OFFSET = NULL_OFFSET + Byte.BYTES;
+ final VectorValueSelector valueSelector;
+ private final boolean useDefault = NullHandling.replaceWithDefault();
+ private final VectorValueSelector timeSelector;
+ private long firstTime;
+
+ public NumericFirstVectorAggregator(VectorValueSelector timeSelector,
VectorValueSelector valueSelector)
+ {
+ this.timeSelector = timeSelector;
+ this.valueSelector = valueSelector;
+ firstTime = Long.MAX_VALUE;
+ }
+
+ @Override
+ public void init(ByteBuffer buf, int position)
+ {
+ buf.putLong(position, Long.MAX_VALUE);
+ buf.put(position + NULL_OFFSET, useDefault ? NullHandling.IS_NOT_NULL_BYTE
: NullHandling.IS_NULL_BYTE);
+ initValue(buf, position + VALUE_OFFSET);
+ }
+
+ @Override
+ public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+ {
+ final long[] timeVector = timeSelector.getLongVector();
+ final boolean[] nullValueVector = valueSelector.getNullVector();
+ boolean nullAbsent = false;
+ firstTime = buf.getLong(position);
+ // check if nullVector is found or not
+ // the nullVector is null if no null values are found
+ // set the nullAbsent flag accordingly
+ if (nullValueVector == null) {
+ nullAbsent = true;
+ }
+
+ // the time vector is already sorted so the first element would be the
earliest
+ // traverse accordingly
+ int index = startRow;
+ if (!useDefault && !nullAbsent) {
+ for (int i = startRow; i < endRow; i++) {
+ if (!nullValueVector[i]) {
+ index = i;
+ break;
+ }
+ }
+ }
+
+ final long earliestTime = timeVector[index];
+ if (earliestTime < firstTime) {
+ firstTime = earliestTime;
+ if (useDefault || nullValueVector == null || !nullValueVector[index]) {
+ updateTimeWithValue(buf, position, firstTime, index);
+ } else {
+ updateTimeWithNull(buf, position, firstTime);
+ }
+ }
+ }
+
+ /**
+ *
+ * Checks if the aggregated value at a position in the buffer is null or not
+ *
+ * @param buf byte buffer storing the byte array representation of
the aggregate
+ * @param position offset within the byte buffer at which the current
aggregate value is stored
+ * @return
+ */
+ boolean isValueNull(ByteBuffer buf, int position)
+ {
+ return buf.get(position + NULL_OFFSET) == NullHandling.IS_NULL_BYTE;
+ }
+
+ @Override
+ public void aggregate(
+ ByteBuffer buf,
+ int numRows,
+ int[] positions,
+ @Nullable int[] rows,
+ int positionOffset
+ )
+ {
+ boolean[] nulls = useDefault ? null : valueSelector.getNullVector();
+ long[] timeVector = timeSelector.getLongVector();
+
+ for (int i = 0; i < numRows; i++) {
Review Comment:
since this is a hot loop, it might be worth splitting up the two loops into
'has a null vector' and 'doesnt have a null vector' cases, though that's worth
measuring to see if it makes a difference
##########
sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java:
##########
@@ -14721,4 +14743,39 @@ public void testFilterWithNVLAndNotIn()
)
);
}
+
+ @Test
+ public void testEarliestVectorAggregators()
Review Comment:
same comment about maybe redundant test
##########
sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java:
##########
@@ -1374,12 +1367,46 @@ public void testStringAnyInSubquery()
);
}
+ @Test
+ public void testOffHeapEarliestGroupBy()
Review Comment:
this seems already covered by other tests that removed 'skipVectorize'
statements?
##########
processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstVectorAggregator.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.first;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+/**
+ * Class for vectorized version of first/earliest aggregator over numeric types
+ */
+public abstract class NumericFirstVectorAggregator implements VectorAggregator
+{
+ static final int NULL_OFFSET = Long.BYTES;
+ static final int VALUE_OFFSET = NULL_OFFSET + Byte.BYTES;
+ final VectorValueSelector valueSelector;
+ private final boolean useDefault = NullHandling.replaceWithDefault();
+ private final VectorValueSelector timeSelector;
+ private long firstTime;
+
+ public NumericFirstVectorAggregator(VectorValueSelector timeSelector,
VectorValueSelector valueSelector)
+ {
+ this.timeSelector = timeSelector;
+ this.valueSelector = valueSelector;
+ firstTime = Long.MAX_VALUE;
+ }
+
+ @Override
+ public void init(ByteBuffer buf, int position)
+ {
+ buf.putLong(position, Long.MAX_VALUE);
+ buf.put(position + NULL_OFFSET, useDefault ? NullHandling.IS_NOT_NULL_BYTE
: NullHandling.IS_NULL_BYTE);
+ initValue(buf, position + VALUE_OFFSET);
+ }
+
+ @Override
+ public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+ {
+ final long[] timeVector = timeSelector.getLongVector();
+ final boolean[] nullValueVector = valueSelector.getNullVector();
+ boolean nullAbsent = false;
+ firstTime = buf.getLong(position);
+ // check if nullVector is found or not
+ // the nullVector is null if no null values are found
+ // set the nullAbsent flag accordingly
+ if (nullValueVector == null) {
+ nullAbsent = true;
+ }
+
+ // the time vector is already sorted so the first element would be the
earliest
+ // traverse accordingly
+ int index = startRow;
+ if (!useDefault && !nullAbsent) {
+ for (int i = startRow; i < endRow; i++) {
+ if (!nullValueVector[i]) {
+ index = i;
+ break;
+ }
+ }
+ }
+
+ final long earliestTime = timeVector[index];
+ if (earliestTime < firstTime) {
+ firstTime = earliestTime;
+ if (useDefault || nullValueVector == null || !nullValueVector[index]) {
+ updateTimeWithValue(buf, position, firstTime, index);
+ } else {
+ updateTimeWithNull(buf, position, firstTime);
+ }
+ }
+ }
+
+ /**
+ *
+ * Checks if the aggregated value at a position in the buffer is null or not
+ *
+ * @param buf byte buffer storing the byte array representation of
the aggregate
+ * @param position offset within the byte buffer at which the current
aggregate value is stored
+ * @return
+ */
+ boolean isValueNull(ByteBuffer buf, int position)
+ {
+ return buf.get(position + NULL_OFFSET) == NullHandling.IS_NULL_BYTE;
+ }
+
+ @Override
+ public void aggregate(
+ ByteBuffer buf,
+ int numRows,
+ int[] positions,
+ @Nullable int[] rows,
+ int positionOffset
+ )
+ {
+ boolean[] nulls = useDefault ? null : valueSelector.getNullVector();
+ long[] timeVector = timeSelector.getLongVector();
+
+ for (int i = 0; i < numRows; i++) {
+ int position = positions[i] + positionOffset;
+ int row = rows == null ? i : rows[i];
+ long firstTime = buf.getLong(position);
+ if (timeVector[row] < firstTime) {
+ if (useDefault || nulls == null || !nulls[row]) {
+ updateTimeWithValue(buf, position, timeVector[row], row);
+ } else {
+ updateTimeWithNull(buf, position, timeVector[row]);
+ }
Review Comment:
the docs seem to indicate that we pick the first non-null value, however
looking at the non-vectorized aggregator it looks like we just pick the first
value, which is also what we are doing here.
I guess allowing the native aggregator to pick the first value even if it is
null is a bit more expressive than always ignoring null values, since we could
always wrap this in a filtered aggregator (i vaguely remember having this exact
discussion years ago for #9161), but otoh it doesn't seem like very typical
behavior for SQL, which usually ignores null values for most aggregation
functions. (the 'any' aggregator also behaves consistently with this and will
return any value including null).
I wonder if we should either change the SQL conversion stuff to always wrap
with a filtered agg to remove nulls, or modify the documentation to indicate
that this function will return null values if the earliest row is null.
##########
processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstVectorAggregator.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.first;
+
+import org.apache.druid.collections.SerializablePair;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public class LongFirstVectorAggregator extends NumericFirstVectorAggregator
+{
+ long firstValue;
Review Comment:
any reason these are fields instead of just a local variable?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]