clintropolis commented on code in PR #14408:
URL: https://github.com/apache/druid/pull/14408#discussion_r1265849741
##########
processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregatorFactory.java:
##########
@@ -125,6 +137,21 @@ public BufferAggregator
factorizeBuffered(ColumnSelectorFactory metricFactory)
}
}
+ @Override
+ public VectorAggregator factorizeVector(
+ VectorColumnSelectorFactory columnSelectorFactory
+ )
+ {
+ ColumnCapabilities capabilities =
columnSelectorFactory.getColumnCapabilities(fieldName);
+ if (capabilities.isNumeric()) {
Review Comment:
i think you need to check for capabilities being null too, you should be
able to confirm this by having a test for a column that doesn't exist (which is
what vector engine returns for capabilities if column is missing)
##########
processing/src/main/java/org/apache/druid/query/UnnestDataSource.java:
##########
@@ -61,9 +61,10 @@ private UnnestDataSource(
DimFilter unnestFilter
)
{
- this.base = dataSource;
- this.virtualColumn = virtualColumn;
- this.unnestFilter = unnestFilter;
+ // select * from UNNEST(ARRAY[1,2,3]) as somu(d3) where somu.d3 IN
('a','b')
+ this.base = dataSource; // table
+ this.virtualColumn = virtualColumn; // MV_TO_ARRAY
+ this.unnestFilter = unnestFilter; // d3 in (a,b)
Review Comment:
nit: these comments seem strange, did you mean to leave them here? Also
unrelated to this PR?
##########
processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstAggregatorFactory.java:
##########
@@ -122,6 +128,25 @@ public BufferAggregator
factorizeBuffered(ColumnSelectorFactory metricFactory)
}
}
+ @Override
+ public VectorAggregator factorizeVector(VectorColumnSelectorFactory
columnSelectorFactory)
+ {
+ ColumnCapabilities capabilities =
columnSelectorFactory.getColumnCapabilities(fieldName);
+ if (capabilities.isNumeric()) {
Review Comment:
ditto null check
##########
processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java:
##########
@@ -154,6 +168,46 @@ public BufferAggregator
factorizeBuffered(ColumnSelectorFactory metricFactory)
}
}
+ @Override
+ public VectorAggregator factorizeVector(VectorColumnSelectorFactory
selectorFactory)
+ {
+ BaseLongVectorValueSelector timeSelector = (BaseLongVectorValueSelector)
selectorFactory.makeValueSelector(
+ timeColumn);
+ ColumnCapabilities capabilities =
selectorFactory.getColumnCapabilities(fieldName);
+ if (capabilities != null) {
+ if (capabilities.is(ValueType.STRING) &&
capabilities.isDictionaryEncoded().isTrue()) {
+ // Case 1: Multivalue string with dimension selector
+ if (capabilities.hasMultipleValues().isTrue()) {
+ if (isGetFirstElementFromMvd()) {
+ MultiValueDimensionVectorSelector mSelector =
selectorFactory.makeMultiValueDimensionSelector(
+ DefaultDimensionSpec.of(
+ fieldName));
+ return new MultiStringFirstDimensionVectorAggregator(timeSelector,
mSelector, maxStringBytes);
+ }
+ } else {
+ // Case 2: Single string with dimension selector
+ SingleValueDimensionVectorSelector sSelector =
selectorFactory.makeSingleValueDimensionSelector(
+ DefaultDimensionSpec.of(
+ fieldName));
+ return new SingleStringFirstDimensionVectorAggregator(timeSelector,
sSelector, maxStringBytes);
+ }
+ }
+ }
+ // Case 3: return vector object selector
+ VectorObjectSelector vSelector =
selectorFactory.makeObjectSelector(fieldName);
+ if (capabilities != null) {
+ return new StringFirstVectorAggregator(timeSelector, vSelector,
maxStringBytes);
+ } else {
+ return new StringFirstVectorAggregator(null, vSelector, maxStringBytes);
Review Comment:
nit: it seems non-obvious that passing in a null for the time selector turns
`StringFirstVectorAggregator` into a nil aggregator.. maybe leave a comment
##########
processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstVectorAggregator.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.first;
+
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.query.aggregation.SerializablePairLongString;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.DimensionHandlerUtils;
+import org.apache.druid.segment.vector.BaseLongVectorValueSelector;
+import org.apache.druid.segment.vector.VectorObjectSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public class StringFirstVectorAggregator implements VectorAggregator
+{
+ private static final SerializablePairLongString INIT = new
SerializablePairLongString(
+ DateTimes.MAX.getMillis(),
+ null
+ );
+ private final BaseLongVectorValueSelector timeSelector;
+ private final VectorObjectSelector valueSelector;
+ private final int maxStringBytes;
+
+
+ public StringFirstVectorAggregator(
+ BaseLongVectorValueSelector timeSelector,
+ VectorObjectSelector valueSelector,
+ int maxStringBytes
+ )
+ {
+ this.timeSelector = timeSelector;
+ this.valueSelector = valueSelector;
+ this.maxStringBytes = maxStringBytes;
+ }
+
+ @Override
+ public void init(ByteBuffer buf, int position)
+ {
+ StringFirstLastUtils.writePair(buf, position, INIT, maxStringBytes);
+ }
+
+ @Override
+ public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+ {
+ if (timeSelector == null) {
+ return;
+ }
+ long[] times = timeSelector.getLongVector();
+ Object[] objectsWhichMightBeStrings = valueSelector.getObjectVector();
+ long firstTime = buf.getLong(position);
+ int index;
+ for (int i = startRow; i < endRow; i++) {
+ if (times[i] > firstTime) {
+ break;
+ }
+ index = i;
+ final boolean foldNeeded =
StringFirstLastUtils.objectNeedsFoldCheck(objectsWhichMightBeStrings[index]);
+ if (foldNeeded) {
+ final SerializablePairLongString inPair =
StringFirstLastUtils.readPairFromVectorSelectorsAtIndex(
+ timeSelector,
+ valueSelector,
+ index
+ );
+ if (inPair != null) {
+ firstTime = buf.getLong(position);
+ if (inPair.lhs < firstTime) {
+ StringFirstLastUtils.writePair(
+ buf,
+ position,
+ new SerializablePairLongString(inPair.lhs, inPair.rhs),
+ maxStringBytes
+ );
+ }
+ }
+ } else {
+ final long time = times[index];
+ if (time < firstTime) {
+ final String value =
DimensionHandlerUtils.convertObjectToString(objectsWhichMightBeStrings[index]);
+ firstTime = time;
+ StringFirstLastUtils.writePair(
+ buf,
+ position,
+ new SerializablePairLongString(time, value),
+ maxStringBytes
+ );
+ }
+ }
+ }
+
+ }
+
+ @Override
+ public void aggregate(ByteBuffer buf, int numRows, int[] positions,
@Nullable int[] rows, int positionOffset)
+ {
+ long[] timeVector = timeSelector.getLongVector();
+ Object[] objectsWhichMightBeStrings = valueSelector.getObjectVector();
+
+ // iterate once over the object vector to find first non null element and
+ // determine if the type is Pair or not
+ boolean foldNeeded = false;
+ for (Object obj : objectsWhichMightBeStrings) {
+ if (obj == null) {
+ continue;
+ } else {
+ foldNeeded = StringFirstLastUtils.objectNeedsFoldCheck(obj);
+ break;
+ }
Review Comment:
this seems like lot of work to try to figure out if the input type is a
complex type or a string, see other comment about potentially splitting this
into a separate agg or at least detecting in the factorizeVector method
##########
processing/src/main/java/org/apache/druid/query/aggregation/first/MultiStringFirstDimensionVectorAggregator.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.first;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.aggregation.SerializablePairLongString;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.data.IndexedInts;
+import org.apache.druid.segment.vector.BaseLongVectorValueSelector;
+import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public class MultiStringFirstDimensionVectorAggregator implements
VectorAggregator
+{
+ private final BaseLongVectorValueSelector timeSelector;
+ private final MultiValueDimensionVectorSelector valueDimensionVectorSelector;
+ private long firstTime;
+ private final int maxStringBytes;
+ private final boolean useDefault = NullHandling.replaceWithDefault();
+
+ public MultiStringFirstDimensionVectorAggregator(
+ BaseLongVectorValueSelector timeSelector,
+ MultiValueDimensionVectorSelector valueDimensionVectorSelector,
+ int maxStringBytes
+ )
+ {
+ this.timeSelector = timeSelector;
+ this.valueDimensionVectorSelector = valueDimensionVectorSelector;
+ this.maxStringBytes = maxStringBytes;
+ firstTime = Long.MAX_VALUE;
+ }
+
+ @Override
+ public void init(ByteBuffer buf, int position)
+ {
+ buf.putLong(position, Long.MAX_VALUE);
+ buf.put(
+ position + NumericFirstVectorAggregator.NULL_OFFSET,
+ useDefault ? NullHandling.IS_NOT_NULL_BYTE : NullHandling.IS_NULL_BYTE
+ );
+ buf.putLong(position + NumericFirstVectorAggregator.VALUE_OFFSET, 0);
+ }
+
+ @Override
+ public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+ {
+ final long[] timeVector = timeSelector.getLongVector();
+ final IndexedInts[] valueVector =
valueDimensionVectorSelector.getRowVector();
+ firstTime = buf.getLong(position);
+ int index = startRow;
+ for (int i = startRow; i < endRow; i++) {
+ if (valueVector[i].get(0) != 0) {
+ index = i;
+ break;
+ }
+ }
+
+ final long earliestTime = timeVector[index];
+ if (earliestTime < firstTime) {
+ firstTime = earliestTime;
+ buf.putLong(position, firstTime);
+ buf.put(position + NumericFirstVectorAggregator.NULL_OFFSET,
NullHandling.IS_NOT_NULL_BYTE);
+ buf.putInt(position + NumericFirstVectorAggregator.VALUE_OFFSET,
valueVector[index].get(0));
+ }
+ }
+
+ @Override
+ public void aggregate(ByteBuffer buf, int numRows, int[] positions,
@Nullable int[] rows, int positionOffset)
+ {
+ long[] timeVector = timeSelector.getLongVector();
+ IndexedInts[] values = valueDimensionVectorSelector.getRowVector();
+ for (int i = 0; i < numRows; i++) {
+ int position = positions[i] + positionOffset;
+ int row = rows == null ? i : rows[i];
+ long firstTime = buf.getLong(position);
+ if (timeVector[row] < firstTime) {
+ firstTime = timeVector[row];
+ buf.putLong(position, firstTime);
+ buf.put(position + NumericFirstVectorAggregator.NULL_OFFSET,
NullHandling.IS_NOT_NULL_BYTE);
+ buf.putInt(
+ position + NumericFirstVectorAggregator.VALUE_OFFSET,
+ values[row].size() > 0 ? values[row].get(0) : 0
+ );
+ }
+ }
+ }
+
+ @Nullable
+ @Override
+ public Object get(ByteBuffer buf, int position)
+ {
+ int index = buf.getInt(position +
NumericFirstVectorAggregator.VALUE_OFFSET);
+ long earliest = buf.getLong(position);
+ String strValue = valueDimensionVectorSelector.lookupName(index);
+ return new SerializablePairLongString(earliest, StringUtils.chop(strValue,
maxStringBytes));
Review Comment:
can this be wrong in the case where nothing was aggregated and id 0 in the
dictionary is not null? it seems like we need to check the null byte here and
return null if the null byte is set to null (since otherwise it appears as if
it will be set to not null)
##########
processing/src/test/java/org/apache/druid/query/aggregation/first/DoubleFirstVectorAggregationTest.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.first;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.vector.BaseDoubleVectorValueSelector;
+import org.apache.druid.segment.vector.BaseLongVectorValueSelector;
+import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
+import org.apache.druid.segment.vector.NoFilterVectorOffset;
+import org.apache.druid.segment.vector.ReadableVectorInspector;
+import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
+import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
+import org.apache.druid.segment.vector.VectorObjectSelector;
+import org.apache.druid.segment.vector.VectorValueSelector;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ThreadLocalRandom;
+
+public class DoubleFirstVectorAggregationTest extends
InitializedNullHandlingTest
Review Comment:
these tests seem to have a flaw in that they only test one vector? I don't
have an example handy, but it seems like it would be nicer if the tests used a
cursor/offset and advanced through all of the rows to provide a more realistic
test case.
##########
processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java:
##########
@@ -120,8 +132,10 @@ public StringFirstAggregatorFactory(
this.maxStringBytes = maxStringBytes == null
?
StringFirstAggregatorFactory.DEFAULT_MAX_STRING_SIZE
: maxStringBytes;
+ this.getFirstElementFromMvd = false;
Review Comment:
so this cannot be set?
TBH i'm not really sure how intuitive this setting is, especially given that
the default value of `MultiValueHandling` is `SORTED_ARRAY` (unless that has
changed recently), so at ingest time the first value picked by this aggregator
would not be consistent with the first apparent value if we stored the column
directly, at least with the default configurations for string columns.
If we do wire it up it can be set, then the non-vectorized engine
`factorize` and `factorizeBuffered` also need to honor it, it would be strange
for only the vector engine to support it, and also the last family of
aggregators should also have this multi-value handling?
Maybe this functionality should be split out into a separate PR? Otherwise I
think this PR should be marked as design review to get consensus on this new
behavior.
##########
processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstVectorAggregator.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.first;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+/**
+ * Class for vectorized version of first/earliest aggregator over numeric types
+ */
+public abstract class NumericFirstVectorAggregator implements VectorAggregator
+{
+ static final int NULL_OFFSET = Long.BYTES;
+ static final int VALUE_OFFSET = NULL_OFFSET + Byte.BYTES;
+ final VectorValueSelector valueSelector;
+ private final boolean useDefault = NullHandling.replaceWithDefault();
+ private final VectorValueSelector timeSelector;
+ private long firstTime;
+
+ public NumericFirstVectorAggregator(VectorValueSelector timeSelector,
VectorValueSelector valueSelector)
+ {
+ this.timeSelector = timeSelector;
+ this.valueSelector = valueSelector;
+ firstTime = Long.MAX_VALUE;
+ }
+
+ @Override
+ public void init(ByteBuffer buf, int position)
+ {
+ buf.putLong(position, Long.MAX_VALUE);
+ buf.put(position + NULL_OFFSET, useDefault ? NullHandling.IS_NOT_NULL_BYTE
: NullHandling.IS_NULL_BYTE);
+ initValue(buf, position + VALUE_OFFSET);
+ }
+
+ @Override
+ public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+ {
+ final long[] timeVector = timeSelector.getLongVector();
+ final boolean[] nullValueVector = valueSelector.getNullVector();
+ boolean nullAbsent = false;
+ firstTime = buf.getLong(position);
+ // check if nullVector is found or not
+ // the nullVector is null if no null values are found
+ // set the nullAbsent flag accordingly
+ if (nullValueVector == null) {
+ nullAbsent = true;
+ }
+
+ // the time vector is already sorted so the first element would be the
earliest
+ // traverse accordingly
+ int index = startRow;
+ if (!useDefault && !nullAbsent) {
+ for (int i = startRow; i < endRow; i++) {
+ if (!nullValueVector[i]) {
+ index = i;
+ break;
+ }
Review Comment:
i think this isn't consistent with the non-vectorized aggs, or even the
other `aggregate` function, which just get the first timestamp they find,
regardless of if the value is null or not. I would argue that this behavior is
probably better, or at least more SQL compliant, though otoh we could make the
SQL planner add an explict filtered agg wrapper to filter out null values, then
the native layer could have either behavior....
Though not really intended for testing arbitrary aggs, using something like
`SqlVectorizedExpressionSanityTest` (or something like it) might be useful to
ensure that vectorized + non-vectorized behaves consistently with these
implementations
##########
processing/src/main/java/org/apache/druid/query/aggregation/first/MultiStringFirstDimensionVectorAggregator.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.first;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.aggregation.SerializablePairLongString;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.data.IndexedInts;
+import org.apache.druid.segment.vector.BaseLongVectorValueSelector;
+import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public class MultiStringFirstDimensionVectorAggregator implements
VectorAggregator
+{
+ private final BaseLongVectorValueSelector timeSelector;
+ private final MultiValueDimensionVectorSelector valueDimensionVectorSelector;
+ private long firstTime;
+ private final int maxStringBytes;
+ private final boolean useDefault = NullHandling.replaceWithDefault();
+
+ public MultiStringFirstDimensionVectorAggregator(
+ BaseLongVectorValueSelector timeSelector,
+ MultiValueDimensionVectorSelector valueDimensionVectorSelector,
+ int maxStringBytes
+ )
+ {
+ this.timeSelector = timeSelector;
+ this.valueDimensionVectorSelector = valueDimensionVectorSelector;
+ this.maxStringBytes = maxStringBytes;
+ firstTime = Long.MAX_VALUE;
+ }
+
+ @Override
+ public void init(ByteBuffer buf, int position)
+ {
+ buf.putLong(position, Long.MAX_VALUE);
+ buf.put(
+ position + NumericFirstVectorAggregator.NULL_OFFSET,
+ useDefault ? NullHandling.IS_NOT_NULL_BYTE : NullHandling.IS_NULL_BYTE
+ );
+ buf.putLong(position + NumericFirstVectorAggregator.VALUE_OFFSET, 0);
+ }
+
+ @Override
+ public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+ {
+ final long[] timeVector = timeSelector.getLongVector();
+ final IndexedInts[] valueVector =
valueDimensionVectorSelector.getRowVector();
+ firstTime = buf.getLong(position);
+ int index = startRow;
+ for (int i = startRow; i < endRow; i++) {
+ if (valueVector[i].get(0) != 0) {
+ index = i;
+ break;
+ }
+ }
+
+ final long earliestTime = timeVector[index];
+ if (earliestTime < firstTime) {
+ firstTime = earliestTime;
+ buf.putLong(position, firstTime);
+ buf.put(position + NumericFirstVectorAggregator.NULL_OFFSET,
NullHandling.IS_NOT_NULL_BYTE);
+ buf.putInt(position + NumericFirstVectorAggregator.VALUE_OFFSET,
valueVector[index].get(0));
Review Comment:
curious, if we really are going to add the flag, why wouldn't the new flag
be pushed into this aggregator instead of using it to choose between this and
the object aggregator?
##########
processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstAggregatorFactory.java:
##########
@@ -123,6 +129,24 @@ public BufferAggregator
factorizeBuffered(ColumnSelectorFactory metricFactory)
}
}
+ @Override
+ public VectorAggregator factorizeVector(VectorColumnSelectorFactory
columnSelectorFactory)
+ {
+ ColumnCapabilities capabilities =
columnSelectorFactory.getColumnCapabilities(fieldName);
+ if (capabilities.isNumeric()) {
Review Comment:
same comment re null check
##########
processing/src/main/java/org/apache/druid/query/aggregation/first/MultiStringFirstDimensionVectorAggregator.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.first;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.aggregation.SerializablePairLongString;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.data.IndexedInts;
+import org.apache.druid.segment.vector.BaseLongVectorValueSelector;
+import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public class MultiStringFirstDimensionVectorAggregator implements
VectorAggregator
+{
+ private final BaseLongVectorValueSelector timeSelector;
+ private final MultiValueDimensionVectorSelector valueDimensionVectorSelector;
+ private long firstTime;
+ private final int maxStringBytes;
+ private final boolean useDefault = NullHandling.replaceWithDefault();
+
+ public MultiStringFirstDimensionVectorAggregator(
+ BaseLongVectorValueSelector timeSelector,
+ MultiValueDimensionVectorSelector valueDimensionVectorSelector,
+ int maxStringBytes
+ )
+ {
+ this.timeSelector = timeSelector;
+ this.valueDimensionVectorSelector = valueDimensionVectorSelector;
+ this.maxStringBytes = maxStringBytes;
+ firstTime = Long.MAX_VALUE;
+ }
+
+ @Override
+ public void init(ByteBuffer buf, int position)
+ {
+ buf.putLong(position, Long.MAX_VALUE);
+ buf.put(
+ position + NumericFirstVectorAggregator.NULL_OFFSET,
+ useDefault ? NullHandling.IS_NOT_NULL_BYTE : NullHandling.IS_NULL_BYTE
+ );
+ buf.putLong(position + NumericFirstVectorAggregator.VALUE_OFFSET, 0);
+ }
+
+ @Override
+ public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+ {
+ final long[] timeVector = timeSelector.getLongVector();
+ final IndexedInts[] valueVector =
valueDimensionVectorSelector.getRowVector();
+ firstTime = buf.getLong(position);
+ int index = startRow;
+ for (int i = startRow; i < endRow; i++) {
+ if (valueVector[i].get(0) != 0) {
Review Comment:
is this trying to check for null? if so you need to actually check that
value 0 is null and not just the first value of the dictionary. if not, could
you leave a comment about what is going on here?
##########
processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstVectorAggregator.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.first;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+/**
+ * Class for vectorized version of first/earliest aggregator over numeric types
+ */
+public abstract class NumericFirstVectorAggregator implements VectorAggregator
+{
+ static final int NULL_OFFSET = Long.BYTES;
+ static final int VALUE_OFFSET = NULL_OFFSET + Byte.BYTES;
+ final VectorValueSelector valueSelector;
+ private final boolean useDefault = NullHandling.replaceWithDefault();
+ private final VectorValueSelector timeSelector;
+ private long firstTime;
+
+ public NumericFirstVectorAggregator(VectorValueSelector timeSelector,
VectorValueSelector valueSelector)
+ {
+ this.timeSelector = timeSelector;
+ this.valueSelector = valueSelector;
+ firstTime = Long.MAX_VALUE;
+ }
+
+ @Override
+ public void init(ByteBuffer buf, int position)
+ {
+ buf.putLong(position, Long.MAX_VALUE);
+ buf.put(position + NULL_OFFSET, useDefault ? NullHandling.IS_NOT_NULL_BYTE
: NullHandling.IS_NULL_BYTE);
+ initValue(buf, position + VALUE_OFFSET);
+ }
+
+ @Override
+ public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+ {
+ final long[] timeVector = timeSelector.getLongVector();
+ final boolean[] nullValueVector = valueSelector.getNullVector();
+ boolean nullAbsent = false;
+ firstTime = buf.getLong(position);
+ // check if nullVector is found or not
+ // the nullVector is null if no null values are found
+ // set the nullAbsent flag accordingly
+ if (nullValueVector == null) {
+ nullAbsent = true;
+ }
+
+ // the time vector is already sorted so the first element would be the
earliest
+ // traverse accordingly
Review Comment:
this is definitely true for now, but I can't help but wonder if this will
bite us in the future if we ever support the vector engine on other
orderings... (even just descending would mess all of this up). It doesn't need
to be addressed in this PR since its a much larger problem..
##########
processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstVectorAggregator.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.first;
+
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.query.aggregation.SerializablePairLongString;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.DimensionHandlerUtils;
+import org.apache.druid.segment.vector.BaseLongVectorValueSelector;
+import org.apache.druid.segment.vector.VectorObjectSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public class StringFirstVectorAggregator implements VectorAggregator
+{
+ private static final SerializablePairLongString INIT = new
SerializablePairLongString(
+ DateTimes.MAX.getMillis(),
+ null
+ );
+ private final BaseLongVectorValueSelector timeSelector;
+ private final VectorObjectSelector valueSelector;
+ private final int maxStringBytes;
+
+
+ public StringFirstVectorAggregator(
+ BaseLongVectorValueSelector timeSelector,
+ VectorObjectSelector valueSelector,
+ int maxStringBytes
+ )
+ {
+ this.timeSelector = timeSelector;
+ this.valueSelector = valueSelector;
+ this.maxStringBytes = maxStringBytes;
+ }
+
+ @Override
+ public void init(ByteBuffer buf, int position)
+ {
+ StringFirstLastUtils.writePair(buf, position, INIT, maxStringBytes);
+ }
+
+ @Override
+ public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+ {
+ if (timeSelector == null) {
+ return;
+ }
+ long[] times = timeSelector.getLongVector();
+ Object[] objectsWhichMightBeStrings = valueSelector.getObjectVector();
+ long firstTime = buf.getLong(position);
+ int index;
+ for (int i = startRow; i < endRow; i++) {
+ if (times[i] > firstTime) {
+ break;
+ }
+ index = i;
+ final boolean foldNeeded =
StringFirstLastUtils.objectNeedsFoldCheck(objectsWhichMightBeStrings[index]);
+ if (foldNeeded) {
Review Comment:
doing this inside of the loop seems maybe odd...
to reduce complexity, i wonder if we should split out a separate aggregator
to use when the column type is stringFirst instead of re-using the string
aggregator and then make the aggregator factory choose the correct thing based
on the capabilities (assuming that stringLast shows up as a COMPLEX type)?
##########
processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstVectorAggregator.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.first;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+/**
+ * Class for vectorized version of first/earliest aggregator over numeric types
+ */
+public abstract class NumericFirstVectorAggregator implements VectorAggregator
+{
+ static final int NULL_OFFSET = Long.BYTES;
+ static final int VALUE_OFFSET = NULL_OFFSET + Byte.BYTES;
+ final VectorValueSelector valueSelector;
+ private final boolean useDefault = NullHandling.replaceWithDefault();
+ private final VectorValueSelector timeSelector;
+ private long firstTime;
+
+ public NumericFirstVectorAggregator(VectorValueSelector timeSelector,
VectorValueSelector valueSelector)
+ {
+ this.timeSelector = timeSelector;
+ this.valueSelector = valueSelector;
+ firstTime = Long.MAX_VALUE;
+ }
+
+ @Override
+ public void init(ByteBuffer buf, int position)
+ {
+ buf.putLong(position, Long.MAX_VALUE);
+ buf.put(position + NULL_OFFSET, useDefault ? NullHandling.IS_NOT_NULL_BYTE
: NullHandling.IS_NULL_BYTE);
+ initValue(buf, position + VALUE_OFFSET);
+ }
+
+ @Override
+ public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+ {
+ final long[] timeVector = timeSelector.getLongVector();
+ final boolean[] nullValueVector = valueSelector.getNullVector();
+ boolean nullAbsent = false;
+ firstTime = buf.getLong(position);
+ // check if nullVector is found or not
+ // the nullVector is null if no null values are found
+ // set the nullAbsent flag accordingly
+ if (nullValueVector == null) {
+ nullAbsent = true;
+ }
+
+ // the time vector is already sorted so the first element would be the
earliest
+ // traverse accordingly
+ int index = startRow;
+ if (!useDefault && !nullAbsent) {
+ for (int i = startRow; i < endRow; i++) {
+ if (!nullValueVector[i]) {
+ index = i;
+ break;
+ }
+ }
+ }
+
+ final long earliestTime = timeVector[index];
+ if (earliestTime < firstTime) {
+ firstTime = earliestTime;
+ if (useDefault || nullValueVector == null || !nullValueVector[index]) {
+ updateTimeWithValue(buf, position, firstTime, index);
+ } else {
+ updateTimeWithNull(buf, position, firstTime);
+ }
Review Comment:
this is somewhat confusing, the other loop is breaking if it finds a
non-null value, but here we can still write a null i guess if it made it
through the whole vector without breaking? That seems odd since it means that
it finds the first non-null aggregator in a vector, else it finds the last
timestamp in the first vector it reads?
##########
processing/src/main/java/org/apache/druid/query/aggregation/first/MultiStringFirstDimensionVectorAggregator.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.first;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.aggregation.SerializablePairLongString;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.data.IndexedInts;
+import org.apache.druid.segment.vector.BaseLongVectorValueSelector;
+import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public class MultiStringFirstDimensionVectorAggregator implements
VectorAggregator
+{
+ private final BaseLongVectorValueSelector timeSelector;
+ private final MultiValueDimensionVectorSelector valueDimensionVectorSelector;
+ private long firstTime;
+ private final int maxStringBytes;
+ private final boolean useDefault = NullHandling.replaceWithDefault();
+
+ public MultiStringFirstDimensionVectorAggregator(
+ BaseLongVectorValueSelector timeSelector,
+ MultiValueDimensionVectorSelector valueDimensionVectorSelector,
+ int maxStringBytes
+ )
+ {
+ this.timeSelector = timeSelector;
+ this.valueDimensionVectorSelector = valueDimensionVectorSelector;
+ this.maxStringBytes = maxStringBytes;
+ firstTime = Long.MAX_VALUE;
+ }
+
+ @Override
+ public void init(ByteBuffer buf, int position)
+ {
+ buf.putLong(position, Long.MAX_VALUE);
+ buf.put(
+ position + NumericFirstVectorAggregator.NULL_OFFSET,
+ useDefault ? NullHandling.IS_NOT_NULL_BYTE : NullHandling.IS_NULL_BYTE
+ );
+ buf.putLong(position + NumericFirstVectorAggregator.VALUE_OFFSET, 0);
+ }
+
+ @Override
+ public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+ {
+ final long[] timeVector = timeSelector.getLongVector();
+ final IndexedInts[] valueVector =
valueDimensionVectorSelector.getRowVector();
+ firstTime = buf.getLong(position);
+ int index = startRow;
+ for (int i = startRow; i < endRow; i++) {
+ if (valueVector[i].get(0) != 0) {
+ index = i;
+ break;
+ }
+ }
+
+ final long earliestTime = timeVector[index];
+ if (earliestTime < firstTime) {
+ firstTime = earliestTime;
+ buf.putLong(position, firstTime);
+ buf.put(position + NumericFirstVectorAggregator.NULL_OFFSET,
NullHandling.IS_NOT_NULL_BYTE);
+ buf.putInt(position + NumericFirstVectorAggregator.VALUE_OFFSET,
valueVector[index].get(0));
+ }
+ }
+
+ @Override
+ public void aggregate(ByteBuffer buf, int numRows, int[] positions,
@Nullable int[] rows, int positionOffset)
+ {
+ long[] timeVector = timeSelector.getLongVector();
+ IndexedInts[] values = valueDimensionVectorSelector.getRowVector();
+ for (int i = 0; i < numRows; i++) {
+ int position = positions[i] + positionOffset;
+ int row = rows == null ? i : rows[i];
+ long firstTime = buf.getLong(position);
+ if (timeVector[row] < firstTime) {
+ firstTime = timeVector[row];
+ buf.putLong(position, firstTime);
+ buf.put(position + NumericFirstVectorAggregator.NULL_OFFSET,
NullHandling.IS_NOT_NULL_BYTE);
Review Comment:
shouldn't this be checking for the value being null or not? or is the
assumption that we never set the null bit here and instead translate it in the
get method? If that is the case, why do we need a null byte at all instead of
just storing a long and an int in the buffer? Or is it to distinguish the case
between 'aggregate' not being called from actually aggregating something? (e.g.
an empty group should probably always spit out a null value...)
##########
processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstVectorAggregator.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.first;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+/**
+ * Class for vectorized version of first/earliest aggregator over numeric types
+ */
+public abstract class NumericFirstVectorAggregator implements VectorAggregator
+{
+ static final int NULL_OFFSET = Long.BYTES;
+ static final int VALUE_OFFSET = NULL_OFFSET + Byte.BYTES;
+ final VectorValueSelector valueSelector;
+ private final boolean useDefault = NullHandling.replaceWithDefault();
+ private final VectorValueSelector timeSelector;
+ private long firstTime;
+
+ public NumericFirstVectorAggregator(VectorValueSelector timeSelector,
VectorValueSelector valueSelector)
+ {
+ this.timeSelector = timeSelector;
+ this.valueSelector = valueSelector;
+ firstTime = Long.MAX_VALUE;
+ }
+
+ @Override
+ public void init(ByteBuffer buf, int position)
+ {
+ buf.putLong(position, Long.MAX_VALUE);
+ buf.put(position + NULL_OFFSET, useDefault ? NullHandling.IS_NOT_NULL_BYTE
: NullHandling.IS_NULL_BYTE);
+ initValue(buf, position + VALUE_OFFSET);
+ }
+
+ @Override
+ public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+ {
+ final long[] timeVector = timeSelector.getLongVector();
+ final boolean[] nullValueVector = valueSelector.getNullVector();
+ boolean nullAbsent = false;
+ firstTime = buf.getLong(position);
+ // check if nullVector is found or not
+ // the nullVector is null if no null values are found
+ // set the nullAbsent flag accordingly
+ if (nullValueVector == null) {
+ nullAbsent = true;
+ }
+
+ // the time vector is already sorted so the first element would be the
earliest
+ // traverse accordingly
+ int index = startRow;
+ if (!useDefault && !nullAbsent) {
+ for (int i = startRow; i < endRow; i++) {
+ if (!nullValueVector[i]) {
+ index = i;
+ break;
+ }
+ }
+ }
+
+ final long earliestTime = timeVector[index];
+ if (earliestTime < firstTime) {
+ firstTime = earliestTime;
+ if (useDefault || nullValueVector == null || !nullValueVector[index]) {
+ updateTimeWithValue(buf, position, firstTime, index);
+ } else {
+ updateTimeWithNull(buf, position, firstTime);
+ }
+ }
+ }
+
+ /**
+ *
+ * Checks if the aggregated value at a position in the buffer is null or not
+ *
+ * @param buf byte buffer storing the byte array representation of
the aggregate
+ * @param position offset within the byte buffer at which the current
aggregate value is stored
+ * @return
+ */
+ boolean isValueNull(ByteBuffer buf, int position)
+ {
+ return buf.get(position + NULL_OFFSET) == NullHandling.IS_NULL_BYTE;
+ }
+
+ @Override
+ public void aggregate(
+ ByteBuffer buf,
+ int numRows,
+ int[] positions,
+ @Nullable int[] rows,
+ int positionOffset
+ )
+ {
+ boolean[] nulls = useDefault ? null : valueSelector.getNullVector();
+ long[] timeVector = timeSelector.getLongVector();
+
+ for (int i = 0; i < numRows; i++) {
+ int position = positions[i] + positionOffset;
+ int row = rows == null ? i : rows[i];
+ long firstTime = buf.getLong(position);
+ if (timeVector[row] < firstTime) {
+ if (useDefault || nulls == null || !nulls[row]) {
+ updateTimeWithValue(buf, position, timeVector[row], row);
+ } else {
+ updateTimeWithNull(buf, position, timeVector[row]);
+ }
+ }
+ }
+ }
+
+ /**
+ * Updates the time and the non null values to the appropriate position in
buffer
+ *
+ * @param buf byte buffer storing the byte array representation of
the aggregate
+ * @param position offset within the byte buffer at which the current
aggregate value is stored
+ * @param time the time to be updated in the buffer as the last time
+ * @param index the index of the vectorized vector which is the last
value
+ */
+ void updateTimeWithValue(ByteBuffer buf, int position, long time, int index)
+ {
+ buf.putLong(position, time);
+ buf.put(position + NULL_OFFSET, NullHandling.IS_NOT_NULL_BYTE);
+ putValue(buf, position + VALUE_OFFSET, index);
+ }
Review Comment:
some thoughts - since the value portion of this is basically the same
behavior of `NullableTypeStrategy` where there is a byte to track nulls and
then the actual value bytes, I can't help but wonder if we could share some
more code between all of the first/last aggregators by letting them use a
`NullableTypeStrategy` for whatever the underlying selector type is. This
definitely doesn't need to be done in this PR, just thinking ahead for if we
supported additional types like arrays.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]