This is an automated email from the ASF dual-hosted git repository.
soumyava pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 5c42ac8c4db Fix for latest agg to handle nulls in time column. Also
adding optimi… (#14911)
5c42ac8c4db is described below
commit 5c42ac8c4dbbc8fa34ab3910145847cea2e49ec1
Author: Soumyava <[email protected]>
AuthorDate: Wed Sep 13 17:37:26 2023 -0700
Fix for latest agg to handle nulls in time column. Also adding optimi…
(#14911)
* Fix for latest agg to handle nulls in time column. Also adding
optimization for dictionary encoded string columns
* One minor fix
* Adding more tests for the new class
* Changing the init to a putInt
---
...SingleStringFirstDimensionVectorAggregator.java | 2 +-
...SingleStringLastDimensionVectorAggregator.java} | 51 +++++-----
.../last/StringLastAggregatorFactory.java | 22 ++++-
.../last/StringLastVectorAggregator.java | 20 ++--
.../last/StringLastVectorAggregatorTest.java | 109 ++++++++++++++++++++-
5 files changed, 162 insertions(+), 42 deletions(-)
diff --git
a/processing/src/main/java/org/apache/druid/query/aggregation/first/SingleStringFirstDimensionVectorAggregator.java
b/processing/src/main/java/org/apache/druid/query/aggregation/first/SingleStringFirstDimensionVectorAggregator.java
index 22fa50ea462..119e13464a0 100644
---
a/processing/src/main/java/org/apache/druid/query/aggregation/first/SingleStringFirstDimensionVectorAggregator.java
+++
b/processing/src/main/java/org/apache/druid/query/aggregation/first/SingleStringFirstDimensionVectorAggregator.java
@@ -57,7 +57,7 @@ public class SingleStringFirstDimensionVectorAggregator
implements VectorAggrega
position + NumericFirstVectorAggregator.NULL_OFFSET,
useDefault ? NullHandling.IS_NOT_NULL_BYTE : NullHandling.IS_NULL_BYTE
);
- buf.putLong(position + NumericFirstVectorAggregator.VALUE_OFFSET, 0);
+ buf.putInt(position + NumericFirstVectorAggregator.VALUE_OFFSET, 0);
}
@Override
diff --git
a/processing/src/main/java/org/apache/druid/query/aggregation/first/SingleStringFirstDimensionVectorAggregator.java
b/processing/src/main/java/org/apache/druid/query/aggregation/last/SingleStringLastDimensionVectorAggregator.java
similarity index 70%
copy from
processing/src/main/java/org/apache/druid/query/aggregation/first/SingleStringFirstDimensionVectorAggregator.java
copy to
processing/src/main/java/org/apache/druid/query/aggregation/last/SingleStringLastDimensionVectorAggregator.java
index 22fa50ea462..6b39088faa2 100644
---
a/processing/src/main/java/org/apache/druid/query/aggregation/first/SingleStringFirstDimensionVectorAggregator.java
+++
b/processing/src/main/java/org/apache/druid/query/aggregation/last/SingleStringLastDimensionVectorAggregator.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.druid.query.aggregation.first;
+package org.apache.druid.query.aggregation.last;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.StringUtils;
@@ -29,15 +29,15 @@ import org.apache.druid.segment.vector.VectorValueSelector;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
-public class SingleStringFirstDimensionVectorAggregator implements
VectorAggregator
+public class SingleStringLastDimensionVectorAggregator implements
VectorAggregator
{
private final VectorValueSelector timeSelector;
private final SingleValueDimensionVectorSelector
valueDimensionVectorSelector;
- private long firstTime;
+ private long lastTime;
private final int maxStringBytes;
private final boolean useDefault = NullHandling.replaceWithDefault();
- public SingleStringFirstDimensionVectorAggregator(
+ public SingleStringLastDimensionVectorAggregator(
VectorValueSelector timeSelector,
SingleValueDimensionVectorSelector valueDimensionVectorSelector,
int maxStringBytes
@@ -46,18 +46,18 @@ public class SingleStringFirstDimensionVectorAggregator
implements VectorAggrega
this.timeSelector = timeSelector;
this.valueDimensionVectorSelector = valueDimensionVectorSelector;
this.maxStringBytes = maxStringBytes;
- this.firstTime = Long.MAX_VALUE;
+ this.lastTime = Long.MIN_VALUE;
}
@Override
public void init(ByteBuffer buf, int position)
{
- buf.putLong(position, Long.MAX_VALUE);
+ buf.putLong(position, Long.MIN_VALUE);
buf.put(
- position + NumericFirstVectorAggregator.NULL_OFFSET,
+ position + NumericLastVectorAggregator.NULL_OFFSET,
useDefault ? NullHandling.IS_NOT_NULL_BYTE : NullHandling.IS_NULL_BYTE
);
- buf.putLong(position + NumericFirstVectorAggregator.VALUE_OFFSET, 0);
+ buf.putInt(position + NumericLastVectorAggregator.VALUE_OFFSET, 0);
}
@Override
@@ -66,20 +66,20 @@ public class SingleStringFirstDimensionVectorAggregator
implements VectorAggrega
final long[] timeVector = timeSelector.getLongVector();
final boolean[] nullTimeVector = timeSelector.getNullVector();
final int[] valueVector = valueDimensionVectorSelector.getRowVector();
- firstTime = buf.getLong(position);
+ lastTime = buf.getLong(position);
int index;
- long earliestTime;
- for (index = startRow; index < endRow; index++) {
+ long latestTime;
+ for (index = endRow - 1; index >= startRow; index--) {
if (nullTimeVector != null && nullTimeVector[index]) {
continue;
}
- earliestTime = timeVector[index];
- if (earliestTime < firstTime) {
- firstTime = earliestTime;
- buf.putLong(position, firstTime);
- buf.put(position + NumericFirstVectorAggregator.NULL_OFFSET,
NullHandling.IS_NOT_NULL_BYTE);
- buf.putInt(position + NumericFirstVectorAggregator.VALUE_OFFSET,
valueVector[index]);
+ latestTime = timeVector[index];
+ if (latestTime > lastTime) {
+ lastTime = latestTime;
+ buf.putLong(position, lastTime);
+ buf.put(position + NumericLastVectorAggregator.NULL_OFFSET,
NullHandling.IS_NOT_NULL_BYTE);
+ buf.putInt(position + NumericLastVectorAggregator.VALUE_OFFSET,
valueVector[index]);
}
}
}
@@ -90,28 +90,27 @@ public class SingleStringFirstDimensionVectorAggregator
implements VectorAggrega
final long[] timeVector = timeSelector.getLongVector();
final boolean[] nullTimeVector = timeSelector.getNullVector();
final int[] values = valueDimensionVectorSelector.getRowVector();
- for (int i = 0; i < numRows; i++) {
+ for (int i = numRows - 1; i >= 0; i--) {
if (nullTimeVector != null && nullTimeVector[i]) {
continue;
}
int position = positions[i] + positionOffset;
int row = rows == null ? i : rows[i];
- long firstTime = buf.getLong(position);
- if (timeVector[row] < firstTime) {
- firstTime = timeVector[row];
- buf.putLong(position, firstTime);
- buf.put(position + NumericFirstVectorAggregator.NULL_OFFSET,
NullHandling.IS_NOT_NULL_BYTE);
- buf.putInt(position + NumericFirstVectorAggregator.VALUE_OFFSET,
values[row]);
+ lastTime = buf.getLong(position);
+ if (timeVector[row] > lastTime) {
+ lastTime = timeVector[row];
+ buf.putLong(position, lastTime);
+ buf.put(position + NumericLastVectorAggregator.NULL_OFFSET,
NullHandling.IS_NOT_NULL_BYTE);
+ buf.putInt(position + NumericLastVectorAggregator.VALUE_OFFSET,
values[row]);
}
}
-
}
@Nullable
@Override
public Object get(ByteBuffer buf, int position)
{
- int index = buf.getInt(position +
NumericFirstVectorAggregator.VALUE_OFFSET);
+ int index = buf.getInt(position +
NumericLastVectorAggregator.VALUE_OFFSET);
long earliest = buf.getLong(position);
String strValue = valueDimensionVectorSelector.lookupName(index);
return new SerializablePairLongString(earliest, StringUtils.chop(strValue,
maxStringBytes));
diff --git
a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java
b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java
index 234f03be3f9..909b7d4971e 100644
---
a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java
+++
b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java
@@ -35,6 +35,7 @@ import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.query.aggregation.first.StringFirstAggregatorFactory;
import org.apache.druid.query.aggregation.first.StringFirstLastUtils;
import org.apache.druid.query.cache.CacheKeyBuilder;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
import org.apache.druid.segment.ColumnInspector;
import org.apache.druid.segment.ColumnSelectorFactory;
@@ -43,6 +44,8 @@ import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.Types;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import org.apache.druid.segment.vector.VectorObjectSelector;
import org.apache.druid.segment.vector.VectorValueSelector;
@@ -160,6 +163,7 @@ public class StringLastAggregatorFactory extends
AggregatorFactory
final ColumnCapabilities capabilities =
selectorFactory.getColumnCapabilities(fieldName);
VectorValueSelector timeSelector =
selectorFactory.makeValueSelector(timeColumn);
+
if (Types.isNumeric(capabilities)) {
VectorValueSelector valueSelector =
selectorFactory.makeValueSelector(fieldName);
VectorObjectSelector objectSelector =
ExpressionVectorSelectors.castValueSelectorToObject(
@@ -171,6 +175,18 @@ public class StringLastAggregatorFactory extends
AggregatorFactory
);
return new StringLastVectorAggregator(timeSelector, objectSelector,
maxStringBytes);
}
+
+ if (capabilities != null) {
+ if (capabilities.is(ValueType.STRING) &&
capabilities.isDictionaryEncoded().isTrue()) {
+ if (!capabilities.hasMultipleValues().isTrue()) {
+ SingleValueDimensionVectorSelector sSelector =
selectorFactory.makeSingleValueDimensionSelector(
+ DefaultDimensionSpec.of(
+ fieldName));
+ return new SingleStringLastDimensionVectorAggregator(timeSelector,
sSelector, maxStringBytes);
+ }
+ }
+ }
+
VectorObjectSelector vSelector =
selectorFactory.makeObjectSelector(fieldName);
if (capabilities != null) {
return new StringLastVectorAggregator(timeSelector, vSelector,
maxStringBytes);
@@ -296,9 +312,9 @@ public class StringLastAggregatorFactory extends
AggregatorFactory
}
StringLastAggregatorFactory that = (StringLastAggregatorFactory) o;
return maxStringBytes == that.maxStringBytes &&
- Objects.equals(fieldName, that.fieldName) &&
- Objects.equals(timeColumn, that.timeColumn) &&
- Objects.equals(name, that.name);
+ Objects.equals(fieldName, that.fieldName) &&
+ Objects.equals(timeColumn, that.timeColumn) &&
+ Objects.equals(name, that.name);
}
@Override
diff --git
a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastVectorAggregator.java
b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastVectorAggregator.java
index a18a1d4c963..00e70c78098 100644
---
a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastVectorAggregator.java
+++
b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastVectorAggregator.java
@@ -64,8 +64,9 @@ public class StringLastVectorAggregator implements
VectorAggregator
if (timeSelector == null) {
return;
}
- long[] times = timeSelector.getLongVector();
- Object[] objectsWhichMightBeStrings = valueSelector.getObjectVector();
+ final long[] times = timeSelector.getLongVector();
+ final boolean[] nullTimeVector = timeSelector.getNullVector();
+ final Object[] objectsWhichMightBeStrings =
valueSelector.getObjectVector();
lastTime = buf.getLong(position);
int index;
@@ -76,6 +77,9 @@ public class StringLastVectorAggregator implements
VectorAggregator
if (times[i] <= lastTime) {
continue;
}
+ if (nullTimeVector != null && nullTimeVector[i]) {
+ continue;
+ }
index = i;
final boolean foldNeeded =
StringFirstLastUtils.objectNeedsFoldCheck(objectsWhichMightBeStrings[index]);
if (foldNeeded) {
@@ -127,22 +131,24 @@ public class StringLastVectorAggregator implements
VectorAggregator
if (timeSelector == null) {
return;
}
- long[] timeVector = timeSelector.getLongVector();
- Object[] objectsWhichMightBeStrings = valueSelector.getObjectVector();
+ final long[] timeVector = timeSelector.getLongVector();
+ final boolean[] nullTimeVector = timeSelector.getNullVector();
+ final Object[] objectsWhichMightBeStrings =
valueSelector.getObjectVector();
// iterate once over the object vector to find first non null element and
// determine if the type is Pair or not
boolean foldNeeded = false;
for (Object obj : objectsWhichMightBeStrings) {
- if (obj == null) {
- continue;
- } else {
+ if (obj != null) {
foldNeeded = StringFirstLastUtils.objectNeedsFoldCheck(obj);
break;
}
}
for (int i = 0; i < numRows; i++) {
+ if (nullTimeVector != null && nullTimeVector[i]) {
+ continue;
+ }
int position = positions[i] + positionOffset;
int row = rows == null ? i : rows[i];
long lastTime = buf.getLong(position);
diff --git
a/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastVectorAggregatorTest.java
b/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastVectorAggregatorTest.java
index f144552d57e..da79faae3c1 100644
---
a/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastVectorAggregatorTest.java
+++
b/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastVectorAggregatorTest.java
@@ -23,7 +23,9 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.query.aggregation.SerializablePairLongString;
import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.segment.IdLookup;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
import org.apache.druid.segment.column.ColumnType;
@@ -49,11 +51,13 @@ public class StringLastVectorAggregatorTest extends
InitializedNullHandlingTest
{
private static final double EPSILON = 1e-5;
private static final String[] VALUES = new String[]{"a", "b", null, "c"};
+ private static final int[] DICT_VALUES = new int[]{1, 2, 0, 3};
private static final long[] LONG_VALUES = new long[]{1L, 2L, 3L, 4L};
private static final String[] STRING_VALUES = new String[]{"1", "2", "3",
"4"};
private static final float[] FLOAT_VALUES = new float[]{1.0f, 2.0f, 3.0f,
4.0f};
private static final double[] DOUBLE_VALUES = new double[]{1.0, 2.0, 3.0,
4.0};
private static final boolean[] NULLS = new boolean[]{false, false, true,
false};
+ private static final boolean[] NULLS1 = new boolean[]{false, false};
private static final String NAME = "NAME";
private static final String FIELD_NAME = "FIELD_NAME";
private static final String FIELD_NAME_LONG = "LONG_NAME";
@@ -74,6 +78,7 @@ public class StringLastVectorAggregatorTest extends
InitializedNullHandlingTest
private StringLastAggregatorFactory stringLastAggregatorFactory;
private StringLastAggregatorFactory stringLastAggregatorFactory1;
+ private SingleStringLastDimensionVectorAggregator targetSingleDim;
private VectorColumnSelectorFactory selectorFactory;
@@ -96,7 +101,7 @@ public class StringLastVectorAggregatorTest extends
InitializedNullHandlingTest
@Override
public boolean[] getNullVector()
{
- return NULLS;
+ return null;
}
};
nonStringValueSelector = new BaseLongVectorValueSelector(new
NoFilterVectorOffset(
@@ -163,9 +168,9 @@ public class StringLastVectorAggregatorTest extends
InitializedNullHandlingTest
}
};
BaseLongVectorValueSelector timeSelectorForPairs = new
BaseLongVectorValueSelector(new NoFilterVectorOffset(
- times.length,
+ timesSame.length,
0,
- times.length
+ timesSame.length
))
{
@Override
@@ -178,7 +183,7 @@ public class StringLastVectorAggregatorTest extends
InitializedNullHandlingTest
@Override
public boolean[] getNullVector()
{
- return new boolean[0];
+ return NULLS1;
}
};
VectorObjectSelector selectorForPairs = new VectorObjectSelector()
@@ -212,7 +217,61 @@ public class StringLastVectorAggregatorTest extends
InitializedNullHandlingTest
@Override
public SingleValueDimensionVectorSelector
makeSingleValueDimensionSelector(DimensionSpec dimensionSpec)
{
- return null;
+ return new SingleValueDimensionVectorSelector()
+ {
+ @Override
+ public int[] getRowVector()
+ {
+ return DICT_VALUES;
+ }
+
+ @Override
+ public int getValueCardinality()
+ {
+ return DICT_VALUES.length;
+ }
+
+ @Nullable
+ @Override
+ public String lookupName(int id)
+ {
+ switch (id) {
+ case 1:
+ return "a";
+ case 2:
+ return "b";
+ case 3:
+ return "c";
+ default:
+ return null;
+ }
+ }
+
+ @Override
+ public boolean nameLookupPossibleInAdvance()
+ {
+ return false;
+ }
+
+ @Nullable
+ @Override
+ public IdLookup idLookup()
+ {
+ return null;
+ }
+
+ @Override
+ public int getMaxVectorSize()
+ {
+ return DICT_VALUES.length;
+ }
+
+ @Override
+ public int getCurrentVectorSize()
+ {
+ return DICT_VALUES.length;
+ }
+ };
}
@Override
@@ -257,6 +316,8 @@ public class StringLastVectorAggregatorTest extends
InitializedNullHandlingTest
target = new StringLastVectorAggregator(timeSelector, selector, 10);
targetWithPairs = new StringLastVectorAggregator(timeSelectorForPairs,
selectorForPairs, 10);
+ targetSingleDim = new
SingleStringLastDimensionVectorAggregator(timeSelector,
selectorFactory.makeSingleValueDimensionSelector(
+ DefaultDimensionSpec.of(FIELD_NAME)), 10);
clearBufferForPositions(0, 0);
@@ -361,6 +422,44 @@ public class StringLastVectorAggregatorTest extends
InitializedNullHandlingTest
}
}
+ @Test
+ public void aggregateSingleDim()
+ {
+ targetSingleDim.aggregate(buf, 0, 0, VALUES.length);
+ Pair<Long, String> result = (Pair<Long, String>) targetSingleDim.get(buf,
0);
+ Assert.assertEquals(times[3], result.lhs.longValue());
+ Assert.assertEquals(VALUES[3], result.rhs);
+ }
+
+ @Test
+ public void aggregateBatchWithoutRowsSingleDim()
+ {
+ int[] positions = new int[]{0, 43, 70};
+ int positionOffset = 2;
+ clearBufferForPositions(positionOffset, positions);
+ targetSingleDim.aggregate(buf, 3, positions, null, positionOffset);
+ for (int i = 0; i < positions.length; i++) {
+ Pair<Long, String> result = (Pair<Long, String>)
targetSingleDim.get(buf, positions[i] + positionOffset);
+ Assert.assertEquals(times[i], result.lhs.longValue());
+ Assert.assertEquals(VALUES[i], result.rhs);
+ }
+ }
+
+ @Test
+ public void aggregateBatchWithRowsSingleDim()
+ {
+ int[] positions = new int[]{0, 43, 70};
+ int[] rows = new int[]{3, 2, 0};
+ int positionOffset = 2;
+ clearBufferForPositions(positionOffset, positions);
+ targetSingleDim.aggregate(buf, 3, positions, rows, positionOffset);
+ for (int i = 0; i < positions.length; i++) {
+ Pair<Long, String> result = (Pair<Long, String>)
targetSingleDim.get(buf, positions[i] + positionOffset);
+ Assert.assertEquals(times[rows[i]], result.lhs.longValue());
+ Assert.assertEquals(VALUES[rows[i]], result.rhs);
+ }
+ }
+
private void clearBufferForPositions(int offset, int... positions)
{
for (int position : positions) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]