imply-cheddar commented on code in PR #14542:
URL: https://github.com/apache/druid/pull/14542#discussion_r1261940048


##########
extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregatorFactory.java:
##########
@@ -221,11 +226,21 @@ private HllSketchUpdater 
formulateSketchUpdater(ColumnSelectorFactory columnSele
             }
           };
           break;
+        case ARRAY:
+          final ExpressionType expressionType = 
ExpressionType.fromColumnTypeStrict(capabilities);
+          updater = sketch -> {
+            final Object o = selector.getObject();
+            if (o != null) {
+              byte[] bytes = ExprEval.toBytes(expressionType, o);
+              sketch.get().update(bytes);
+            }
+          };
+          break;

Review Comment:
   If `processAsArray` is false, should this fall through to the object logic 
instead?



##########
extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchBaseSqlAggregator.java:
##########
@@ -193,7 +193,8 @@ public Aggregation toDruidAggregation(
             tgtHllType,
             stringEncoding,
             finalizeSketch || 
SketchQueryContext.isFinalizeOuterSketches(plannerContext),
-            ROUND
+            ROUND,
+            false

Review Comment:
   This makes me think that the only behavior I can get from SQL is to use the 
processAsArray = false option.  I had expected a SqlOperatorConversion change 
to add an extra optional parameter.  Does that exist and I'm just blind?



##########
extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildUtil.java:
##########
@@ -33,19 +34,34 @@
 
 public class HllSketchBuildUtil
 {
-  public static void updateSketch(final HllSketch sketch, final StringEncoding 
stringEncoding, final Object value)
+  public static void updateSketch(
+      final HllSketch sketch,
+      final StringEncoding stringEncoding,
+      final Object value,
+      final boolean processAsArray
+  )
   {
     if (value instanceof Integer || value instanceof Long) {
       sketch.update(((Number) value).longValue());
     } else if (value instanceof Float || value instanceof Double) {
       sketch.update(((Number) value).doubleValue());
     } else if (value instanceof String) {
       updateSketchWithString(sketch, stringEncoding, (String) value);
+    } else if (value instanceof Object[] && processAsArray) {
+      byte[] arrayBytes = ExprEval.toBytesBestEffort(value);
+      sketch.update(arrayBytes);
     } else if (value instanceof List) {

Review Comment:
   If `processAsArray` is false, this case will fail with "Unsupported type".  
Should it instead iterate the object array and recursively call this 
`updateSketch()` method instead?



##########
extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/vector/HllSketchBuildVectorProcessorFactory.java:
##########
@@ -83,6 +89,48 @@ public HllSketchBuildVectorProcessor 
makeLongProcessor(ColumnCapabilities capabi
     return new LongHllSketchBuildVectorProcessor(helper, selector);
   }
 
+  @Override
+  public HllSketchBuildVectorProcessor makeArrayProcessor(
+      ColumnCapabilities capabilities,
+      VectorObjectSelector selector
+  )
+  {
+    final ExpressionType expressionType = 
ExpressionType.fromColumnTypeStrict(capabilities);
+    return new HllSketchBuildVectorProcessor()
+    {
+      @Override
+      public void aggregate(ByteBuffer buf, int position, int startRow, int 
endRow)
+      {
+        final Object[] vector = selector.getObjectVector();
+        final HllSketch sketch = helper.getSketchAtPosition(buf, position);
+
+        for (int i = startRow; i < endRow; i++) {
+          if (vector[i] != null) {
+            byte[] bytes = ExprEval.toBytes(expressionType, vector[i]);
+            sketch.update(bytes);
+          }
+        }
+      }
+
+      @Override
+      public void aggregate(ByteBuffer buf, int numRows, int[] positions, 
@Nullable int[] rows, int positionOffset)
+      {
+        final Object[] vector = selector.getObjectVector();
+
+        for (int i = 0; i < numRows; i++) {
+          final int idx = rows != null ? rows[i] : i;
+          final int position = positions[i] + positionOffset;
+          final HllSketch sketch = helper.getSketchAtPosition(buf, position);
+
+          if (vector[idx] != null) {
+            byte[] bytes = ExprEval.toBytes(expressionType, vector[idx]);
+            sketch.update(bytes);
+          }
+        }
+      }
+    };
+  }
+

Review Comment:
   These don't seem to be honoring the `processAsArray` flag, is that done 
somewhere else?



##########
extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregator.java:
##########
@@ -148,12 +151,21 @@ static void updateUnion(Union union, Object update)
       union.update((int[]) update);
     } else if (update instanceof long[]) {
       union.update((long[]) update);
+    } else if (update instanceof Object[]) {
+      final byte[] arrayBytes = ExprEval.toBytesBestEffort(update);
+      union.update(arrayBytes);

Review Comment:
   Same question here about supporting `processAsArrays`?



##########
extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/filter/BloomDimFilter.java:
##########
@@ -165,6 +169,32 @@ public boolean applyNull()
               }
             };
           }
+
+          @Override
+          public Predicate<Object[]> makeArrayPredicate(@Nullable 
TypeSignature<ValueType> arrayType)
+          {
+            final ExpressionType expressionType = arrayType == null
+                                                  ? null
+                                                  : 
ExpressionType.fromColumnTypeStrict(arrayType);
+            if (expressionType != null) {
+              return input -> {
+                if (input == null) {
+                  return bloomKFilter.testBytes(null, 0, 0);
+                }
+                final byte[] bytes = ExprEval.toBytes(expressionType, input);
+                return bloomKFilter.testBytes(bytes);
+              };
+            } else {
+              // fall back to per row detection

Review Comment:
   "When expressionType is null, we expect to primarily be in an ingestion case 
where there is a higher likelihood that each row is different data.  So, check 
the type per row". ?



##########
extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/filter/BloomDimFilter.java:
##########
@@ -165,6 +169,32 @@ public boolean applyNull()
               }
             };
           }
+
+          @Override
+          public Predicate<Object[]> makeArrayPredicate(@Nullable 
TypeSignature<ValueType> arrayType)
+          {
+            final ExpressionType expressionType = arrayType == null
+                                                  ? null
+                                                  : 
ExpressionType.fromColumnTypeStrict(arrayType);
+            if (expressionType != null) {

Review Comment:
   nit, invert the negation please.



##########
extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorTest.java:
##########
@@ -417,6 +440,153 @@ public void testPostAggs() throws Exception
     Assert.assertEquals(expectedSummary, ((HllSketchHolder) 
row.get(4)).getSketch().toString());
   }
 
+  @Test
+  public void testArrays() throws Exception
+  {
+    AggregatorFactory[] aggs = new AggregatorFactory[]{
+            new HllSketchBuildAggregatorFactory("hll0", "arrayString", null, 
null, null, false, false, true),
+            new HllSketchBuildAggregatorFactory("hll1", "arrayLong", null, 
null, null, false, false, true),
+            new HllSketchBuildAggregatorFactory("hll2", "arrayDouble", null, 
null, null, false, false, true),
+            new HllSketchBuildAggregatorFactory("hll3", "arrayString", null, 
null, null, false, false, false),
+            new HllSketchBuildAggregatorFactory("hll4", "arrayLong", null, 
null, null, false, false, false),
+            new HllSketchBuildAggregatorFactory("hll5", "arrayDouble", null, 
null, null, false, false, false)
+    };
+
+    IndexBuilder bob = IndexBuilder.create(timeseriesHelper.getObjectMapper())
+                                   .tmpDir(groupByFolder.newFolder())
+                                   .schema(
+                                       IncrementalIndexSchema.builder()
+                                                             
.withTimestampSpec(NestedDataTestUtils.TIMESTAMP_SPEC)
+                                                             
.withDimensionsSpec(NestedDataTestUtils.AUTO_DISCOVERY)
+                                                             .withMetrics(aggs)
+                                                             
.withQueryGranularity(Granularities.NONE)
+                                                             .withRollup(true)
+                                                             
.withMinTimestamp(0)
+                                                             .build()
+                                   )
+                                   .inputSource(
+                                       ResourceInputSource.of(
+                                           
NestedDataTestUtils.class.getClassLoader(),
+                                           
NestedDataTestUtils.ARRAY_TYPES_DATA_FILE
+                                       )
+                                   )
+                                   
.inputFormat(NestedDataTestUtils.DEFAULT_JSON_INPUT_FORMAT)
+                                   .transform(TransformSpec.NONE)
+                                   .inputTmpDir(groupByFolder.newFolder());
+
+    List<Segment> realtimeSegs = ImmutableList.of(
+        new IncrementalIndexSegment(bob.buildIncrementalIndex(), 
SegmentId.dummy("test_datasource"))
+    );
+    List<Segment> segs = ImmutableList.of(
+        new QueryableIndexSegment(bob.buildMMappedMergedIndex(), 
SegmentId.dummy("test_datasource"))
+    );
+
+    GroupByQuery query = GroupByQuery.builder()
+                                     .setDataSource("test_datasource")
+                                     .setGranularity(Granularities.ALL)
+                                     .setInterval(Intervals.ETERNITY)
+                                     .setAggregatorSpecs(
+                                         new 
HllSketchBuildAggregatorFactory("a0", "arrayString", null, null, null, false, 
false, false),
+                                         new 
HllSketchBuildAggregatorFactory("a1", "arrayLong", null, null, null, false, 
false, false),
+                                         new 
HllSketchBuildAggregatorFactory("a2", "arrayDouble", null, null, null, false, 
false, false),
+                                         new 
HllSketchMergeAggregatorFactory("a3", "hll0", null, null, null, false, false),
+                                         new 
HllSketchMergeAggregatorFactory("a4", "hll1", null, null, null, false, false),
+                                         new 
HllSketchMergeAggregatorFactory("a5", "hll2", null, null, null, false, false),
+                                         new 
HllSketchMergeAggregatorFactory("a6", "hll3", null, null, null, false, false),
+                                         new 
HllSketchMergeAggregatorFactory("a7", "hll4", null, null, null, false, false),
+                                         new 
HllSketchMergeAggregatorFactory("a8", "hll5", null, null, null, false, false),

Review Comment:
   I don't see tests covering the `processAsArrays` case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to