imply-cheddar commented on code in PR #14542:
URL: https://github.com/apache/druid/pull/14542#discussion_r1261940048
##########
extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregatorFactory.java:
##########
@@ -221,11 +226,21 @@ private HllSketchUpdater
formulateSketchUpdater(ColumnSelectorFactory columnSele
}
};
break;
+ case ARRAY:
+ final ExpressionType expressionType =
ExpressionType.fromColumnTypeStrict(capabilities);
+ updater = sketch -> {
+ final Object o = selector.getObject();
+ if (o != null) {
+ byte[] bytes = ExprEval.toBytes(expressionType, o);
+ sketch.get().update(bytes);
+ }
+ };
+ break;
Review Comment:
If `processAsArray` is false, should this fall through to the object logic
instead?
##########
extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchBaseSqlAggregator.java:
##########
@@ -193,7 +193,8 @@ public Aggregation toDruidAggregation(
tgtHllType,
stringEncoding,
finalizeSketch ||
SketchQueryContext.isFinalizeOuterSketches(plannerContext),
- ROUND
+ ROUND,
+ false
Review Comment:
This makes me think that the only behavior I can get from SQL is to use the
processAsArray = false option. I had expected a SqlOperatorConversion change
to add an extra optional parameter. Does that exist and I'm just blind?
##########
extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildUtil.java:
##########
@@ -33,19 +34,34 @@
public class HllSketchBuildUtil
{
- public static void updateSketch(final HllSketch sketch, final StringEncoding
stringEncoding, final Object value)
+ public static void updateSketch(
+ final HllSketch sketch,
+ final StringEncoding stringEncoding,
+ final Object value,
+ final boolean processAsArray
+ )
{
if (value instanceof Integer || value instanceof Long) {
sketch.update(((Number) value).longValue());
} else if (value instanceof Float || value instanceof Double) {
sketch.update(((Number) value).doubleValue());
} else if (value instanceof String) {
updateSketchWithString(sketch, stringEncoding, (String) value);
+ } else if (value instanceof Object[] && processAsArray) {
+ byte[] arrayBytes = ExprEval.toBytesBestEffort(value);
+ sketch.update(arrayBytes);
} else if (value instanceof List) {
Review Comment:
If `processAsArray` is false, this case will fail with "Unsupported type".
Should it instead iterate the object array and recursively call this
`updateSketch()` method instead?
##########
extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/vector/HllSketchBuildVectorProcessorFactory.java:
##########
@@ -83,6 +89,48 @@ public HllSketchBuildVectorProcessor
makeLongProcessor(ColumnCapabilities capabi
return new LongHllSketchBuildVectorProcessor(helper, selector);
}
+ @Override
+ public HllSketchBuildVectorProcessor makeArrayProcessor(
+ ColumnCapabilities capabilities,
+ VectorObjectSelector selector
+ )
+ {
+ final ExpressionType expressionType =
ExpressionType.fromColumnTypeStrict(capabilities);
+ return new HllSketchBuildVectorProcessor()
+ {
+ @Override
+ public void aggregate(ByteBuffer buf, int position, int startRow, int
endRow)
+ {
+ final Object[] vector = selector.getObjectVector();
+ final HllSketch sketch = helper.getSketchAtPosition(buf, position);
+
+ for (int i = startRow; i < endRow; i++) {
+ if (vector[i] != null) {
+ byte[] bytes = ExprEval.toBytes(expressionType, vector[i]);
+ sketch.update(bytes);
+ }
+ }
+ }
+
+ @Override
+ public void aggregate(ByteBuffer buf, int numRows, int[] positions,
@Nullable int[] rows, int positionOffset)
+ {
+ final Object[] vector = selector.getObjectVector();
+
+ for (int i = 0; i < numRows; i++) {
+ final int idx = rows != null ? rows[i] : i;
+ final int position = positions[i] + positionOffset;
+ final HllSketch sketch = helper.getSketchAtPosition(buf, position);
+
+ if (vector[idx] != null) {
+ byte[] bytes = ExprEval.toBytes(expressionType, vector[idx]);
+ sketch.update(bytes);
+ }
+ }
+ }
+ };
+ }
+
Review Comment:
These don't seem to be honoring the `processAsArray` flag, is that done
somewhere else?
##########
extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregator.java:
##########
@@ -148,12 +151,21 @@ static void updateUnion(Union union, Object update)
union.update((int[]) update);
} else if (update instanceof long[]) {
union.update((long[]) update);
+ } else if (update instanceof Object[]) {
+ final byte[] arrayBytes = ExprEval.toBytesBestEffort(update);
+ union.update(arrayBytes);
Review Comment:
Same question here about supporting `processAsArrays`?
##########
extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/filter/BloomDimFilter.java:
##########
@@ -165,6 +169,32 @@ public boolean applyNull()
}
};
}
+
+ @Override
+ public Predicate<Object[]> makeArrayPredicate(@Nullable
TypeSignature<ValueType> arrayType)
+ {
+ final ExpressionType expressionType = arrayType == null
+ ? null
+ :
ExpressionType.fromColumnTypeStrict(arrayType);
+ if (expressionType != null) {
+ return input -> {
+ if (input == null) {
+ return bloomKFilter.testBytes(null, 0, 0);
+ }
+ final byte[] bytes = ExprEval.toBytes(expressionType, input);
+ return bloomKFilter.testBytes(bytes);
+ };
+ } else {
+ // fall back to per row detection
Review Comment:
"When expressionType is null, we expect to primarily be in an ingestion case
where there is a higher likelihood that each row is different data. So, check
the type per row". ?
##########
extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/filter/BloomDimFilter.java:
##########
@@ -165,6 +169,32 @@ public boolean applyNull()
}
};
}
+
+ @Override
+ public Predicate<Object[]> makeArrayPredicate(@Nullable
TypeSignature<ValueType> arrayType)
+ {
+ final ExpressionType expressionType = arrayType == null
+ ? null
+ :
ExpressionType.fromColumnTypeStrict(arrayType);
+ if (expressionType != null) {
Review Comment:
nit, invert the negation please.
##########
extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorTest.java:
##########
@@ -417,6 +440,153 @@ public void testPostAggs() throws Exception
Assert.assertEquals(expectedSummary, ((HllSketchHolder)
row.get(4)).getSketch().toString());
}
+ @Test
+ public void testArrays() throws Exception
+ {
+ AggregatorFactory[] aggs = new AggregatorFactory[]{
+ new HllSketchBuildAggregatorFactory("hll0", "arrayString", null,
null, null, false, false, true),
+ new HllSketchBuildAggregatorFactory("hll1", "arrayLong", null,
null, null, false, false, true),
+ new HllSketchBuildAggregatorFactory("hll2", "arrayDouble", null,
null, null, false, false, true),
+ new HllSketchBuildAggregatorFactory("hll3", "arrayString", null,
null, null, false, false, false),
+ new HllSketchBuildAggregatorFactory("hll4", "arrayLong", null,
null, null, false, false, false),
+ new HllSketchBuildAggregatorFactory("hll5", "arrayDouble", null,
null, null, false, false, false)
+ };
+
+ IndexBuilder bob = IndexBuilder.create(timeseriesHelper.getObjectMapper())
+ .tmpDir(groupByFolder.newFolder())
+ .schema(
+ IncrementalIndexSchema.builder()
+
.withTimestampSpec(NestedDataTestUtils.TIMESTAMP_SPEC)
+
.withDimensionsSpec(NestedDataTestUtils.AUTO_DISCOVERY)
+ .withMetrics(aggs)
+
.withQueryGranularity(Granularities.NONE)
+ .withRollup(true)
+
.withMinTimestamp(0)
+ .build()
+ )
+ .inputSource(
+ ResourceInputSource.of(
+
NestedDataTestUtils.class.getClassLoader(),
+
NestedDataTestUtils.ARRAY_TYPES_DATA_FILE
+ )
+ )
+
.inputFormat(NestedDataTestUtils.DEFAULT_JSON_INPUT_FORMAT)
+ .transform(TransformSpec.NONE)
+ .inputTmpDir(groupByFolder.newFolder());
+
+ List<Segment> realtimeSegs = ImmutableList.of(
+ new IncrementalIndexSegment(bob.buildIncrementalIndex(),
SegmentId.dummy("test_datasource"))
+ );
+ List<Segment> segs = ImmutableList.of(
+ new QueryableIndexSegment(bob.buildMMappedMergedIndex(),
SegmentId.dummy("test_datasource"))
+ );
+
+ GroupByQuery query = GroupByQuery.builder()
+ .setDataSource("test_datasource")
+ .setGranularity(Granularities.ALL)
+ .setInterval(Intervals.ETERNITY)
+ .setAggregatorSpecs(
+ new
HllSketchBuildAggregatorFactory("a0", "arrayString", null, null, null, false,
false, false),
+ new
HllSketchBuildAggregatorFactory("a1", "arrayLong", null, null, null, false,
false, false),
+ new
HllSketchBuildAggregatorFactory("a2", "arrayDouble", null, null, null, false,
false, false),
+ new
HllSketchMergeAggregatorFactory("a3", "hll0", null, null, null, false, false),
+ new
HllSketchMergeAggregatorFactory("a4", "hll1", null, null, null, false, false),
+ new
HllSketchMergeAggregatorFactory("a5", "hll2", null, null, null, false, false),
+ new
HllSketchMergeAggregatorFactory("a6", "hll3", null, null, null, false, false),
+ new
HllSketchMergeAggregatorFactory("a7", "hll4", null, null, null, false, false),
+ new
HllSketchMergeAggregatorFactory("a8", "hll5", null, null, null, false, false),
Review Comment:
I don't see tests covering the `processAsArrays` case?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]