rohangarg commented on code in PR #11201:
URL: https://github.com/apache/druid/pull/11201#discussion_r991500336


##########
extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildUtil.java:
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.datasketches.hll;
+
+import org.apache.datasketches.hll.HllSketch;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.StringEncoding;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.UOE;
+import org.apache.druid.segment.DimensionDictionarySelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class HllSketchBuildUtil
+{
+  public static void updateSketch(final HllSketch sketch, final StringEncoding 
stringEncoding, final Object value)
+  {
+    if (value instanceof Integer || value instanceof Long) {
+      sketch.update(((Number) value).longValue());
+    } else if (value instanceof Float || value instanceof Double) {
+      sketch.update(((Number) value).doubleValue());
+    } else if (value instanceof String) {
+      updateSketchWithString(sketch, stringEncoding, (String) value);
+    } else if (value instanceof List) {
+      // noinspection rawtypes
+      for (Object entry : (List) value) {
+        if (entry != null) {
+          final String asString = entry.toString();
+          if (!NullHandling.isNullOrEquivalent(asString)) {
+            updateSketchWithString(sketch, stringEncoding, asString);
+          }
+        }
+      }
+    } else if (value instanceof char[]) {
+      sketch.update((char[]) value);
+    } else if (value instanceof byte[]) {
+      sketch.update((byte[]) value);
+    } else if (value instanceof int[]) {
+      sketch.update((int[]) value);
+    } else if (value instanceof long[]) {
+      sketch.update((long[]) value);
+    } else {
+      throw new IAE("Unsupported type " + value.getClass());
+    }
+  }
+
+  public static void updateSketchWithDictionarySelector(
+      final HllSketch sketch,
+      final StringEncoding stringEncoding,
+      final DimensionDictionarySelector selector,
+      final int id
+  )
+  {
+    if (stringEncoding == StringEncoding.UTF8 && 
selector.supportsLookupNameUtf8()) {

Review Comment:
   should we do this once per aggregator object itself? probably the branch 
predictor will optimize it eventually, but it may take time. 



##########
extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildUtil.java:
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.datasketches.hll;
+
+import org.apache.datasketches.hll.HllSketch;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.StringEncoding;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.UOE;
+import org.apache.druid.segment.DimensionDictionarySelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class HllSketchBuildUtil
+{
+  public static void updateSketch(final HllSketch sketch, final StringEncoding 
stringEncoding, final Object value)
+  {
+    if (value instanceof Integer || value instanceof Long) {
+      sketch.update(((Number) value).longValue());
+    } else if (value instanceof Float || value instanceof Double) {
+      sketch.update(((Number) value).doubleValue());
+    } else if (value instanceof String) {
+      updateSketchWithString(sketch, stringEncoding, (String) value);
+    } else if (value instanceof List) {
+      // noinspection rawtypes
+      for (Object entry : (List) value) {
+        if (entry != null) {
+          final String asString = entry.toString();
+          if (!NullHandling.isNullOrEquivalent(asString)) {
+            updateSketchWithString(sketch, stringEncoding, asString);
+          }
+        }
+      }
+    } else if (value instanceof char[]) {
+      sketch.update((char[]) value);
+    } else if (value instanceof byte[]) {
+      sketch.update((byte[]) value);
+    } else if (value instanceof int[]) {
+      sketch.update((int[]) value);
+    } else if (value instanceof long[]) {
+      sketch.update((long[]) value);
+    } else {
+      throw new IAE("Unsupported type " + value.getClass());
+    }
+  }
+
+  public static void updateSketchWithDictionarySelector(
+      final HllSketch sketch,
+      final StringEncoding stringEncoding,
+      final DimensionDictionarySelector selector,
+      final int id
+  )
+  {
+    if (stringEncoding == StringEncoding.UTF8 && 
selector.supportsLookupNameUtf8()) {
+      final ByteBuffer buf = selector.lookupNameUtf8(id);
+
+      if (buf != null) {
+        sketch.update(buf);
+      }
+    } else {
+      final String s = 
NullHandling.nullToEmptyIfNeeded(selector.lookupName(id));
+      updateSketchWithString(sketch, stringEncoding, s);

Review Comment:
   should we add a `!NullHandling.isNullOrEquivalent(asString)` check as well? 
The earlier code seems to check that for all objects.



##########
extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java:
##########
@@ -74,11 +78,123 @@
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.stream.Collectors;
 
 public class HllSketchSqlAggregatorTest extends BaseCalciteQueryTest
 {
   private static final boolean ROUND = true;
 
+  // For testHllSketchPostAggsGroupBy, testHllSketchPostAggsTimeseries
+  private static final Object[] EXPECTED_PA_RESULT =
+      new Object[]{
+          "\"AgEHDAMIAgDhUv8P63iABQ==\"",
+          "\"AgEHDAMIBgALpZ0PjpTfBY5ElQo+C7UE4jA+DKfcYQQ=\"",
+          "\"AgEHDAMIAQAr8vsG\"",
+          2.000000004967054d,
+          3.000000004967054d,
+          3.000000014901161d,
+          2.000000004967054d,
+          "[2.000000004967054,2.0,2.0001997319422404]",
+          "[2.000000004967054,2.0,2.000099863468538]",
+          "\"AgEHDAMIBgC1EYgH1mlHBwsKPwu5SK8MIiUxB7iZVwU=\"",
+          2L,
+          "### HLL SKETCH SUMMARY: \n"
+            + "  Log Config K   : 12\n"
+            + "  Hll Target     : HLL_4\n"
+            + "  Current Mode   : LIST\n"
+            + "  Memory         : false\n"
+            + "  LB             : 2.0\n"
+            + "  Estimate       : 2.000000004967054\n"
+            + "  UB             : 2.000099863468538\n"
+            + "  OutOfOrder Flag: false\n"
+            + "  Coupon Count   : 2\n",
+          "### HLL SKETCH SUMMARY: \n"
+            + "  LOG CONFIG K   : 12\n"
+            + "  HLL TARGET     : HLL_4\n"
+            + "  CURRENT MODE   : LIST\n"
+            + "  MEMORY         : FALSE\n"
+            + "  LB             : 2.0\n"
+            + "  ESTIMATE       : 2.000000004967054\n"
+            + "  UB             : 2.000099863468538\n"
+            + "  OUTOFORDER FLAG: FALSE\n"
+            + "  COUPON COUNT   : 2\n",
+          2.0
+      };
+
+  private static final List<VirtualColumn> EXPECTED_PA_VIRTUAL_COLUMNS =
+      ImmutableList.of(
+          new ExpressionVirtualColumn(
+              "v0",
+              "concat(\"dim2\",'hello')",
+              ColumnType.STRING,
+              TestExprMacroTable.INSTANCE
+          ),
+          new ExpressionVirtualColumn(
+              "v1",
+              "pow(abs((\"m1\" + 100)),2)",
+              ColumnType.DOUBLE,
+              TestExprMacroTable.INSTANCE
+          )
+      );
+
+  private static final List<AggregatorFactory> EXPECTED_PA_AGGREGATORS =

Review Comment:
   what does `PA` signify here?



##########
extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/vector/LongHllSketchBuildVectorProcessor.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.datasketches.hll.vector;
+
+import org.apache.datasketches.hll.HllSketch;
+import org.apache.druid.common.config.NullHandling;
+import 
org.apache.druid.query.aggregation.datasketches.hll.HllSketchBuildBufferAggregatorHelper;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public class LongHllSketchBuildVectorProcessor implements 
HllSketchBuildVectorProcessor
+{
+  private final HllSketchBuildBufferAggregatorHelper helper;
+  private final VectorValueSelector selector;
+
+  public LongHllSketchBuildVectorProcessor(
+      final HllSketchBuildBufferAggregatorHelper helper,
+      final VectorValueSelector selector
+  )
+  {
+    this.helper = helper;
+    this.selector = selector;
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+  {
+    final long[] vector = selector.getLongVector();
+    final boolean[] nullVector = selector.getNullVector();
+
+    final HllSketch sketch = helper.getSketchAtPosition(buf, position);
+
+    for (int i = startRow; i < endRow; i++) {
+      if (NullHandling.replaceWithDefault() || nullVector == null || 
!nullVector[i]) {
+        sketch.update(vector[i]);
+      }
+    }
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int numRows, int[] positions, 
@Nullable int[] rows, int positionOffset)
+  {
+    final long[] vector = selector.getLongVector();
+    final boolean[] nullVector = selector.getNullVector();
+
+    for (int i = 0; i < numRows; i++) {
+      final int idx = rows != null ? rows[i] : i;
+      if (NullHandling.replaceWithDefault() || nullVector == null || 
!nullVector[idx]) {

Review Comment:
   having a separate loop for `NullHandling.replaceWithDefault() || nullVector 
== null` which doesn't check anything might be faster. but it could be 
considered later



##########
extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeAggregatorFactory.java:
##########
@@ -41,6 +42,12 @@
 /**
  * This aggregator factory is for merging existing sketches.
  * The input column must contain {@link HllSketch}
+ *
+ * Note: aggregators generated by this class do not directly use 
"stringEncoding", but it is part of this class
+ * anyway so we can preserve enough information to ensure that we are merging 
sketches in a valid way. (Sketches with
+ * incompatible string encodings cannot be merged meaningfully.) Currently, 
the only way this is exposed is through
+ * {@link #getMergingFactory}, which will throw {@link 
AggregatorFactoryNotMergeableException} if presented with
+ * two aggregators with two different encodings.

Review Comment:
   Currently, I think there is no way to detect is someone is accidentaly 
merging different encoding sketches at query time (let's say some sketches were 
ingested as e1 and then at querying they got merged with encoding e2 sketches). 
Probably, that will require augmented state in the aggregator serialization 
itself which is again hard to do wrt compat.



##########
extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactory.java:
##########
@@ -201,62 +216,64 @@ public Comparator<HllSketch> getComparator()
   @Override
   public AggregatorFactory getCombiningFactory()
   {
-    return new HllSketchMergeAggregatorFactory(getName(), getName(), getLgK(), 
getTgtHllType(), isRound());
+    return new HllSketchMergeAggregatorFactory(
+        getName(),
+        getName(),
+        getLgK(),
+        getTgtHllType(),
+        getStringEncoding(),
+        isRound()
+    );
   }
 
   @Override
   public byte[] getCacheKey()
   {
-    return new 
CacheKeyBuilder(getCacheTypeId()).appendString(name).appendString(fieldName)
-                                                
.appendInt(lgK).appendInt(tgtHllType.ordinal()).build();
+    return new CacheKeyBuilder(getCacheTypeId())
+        .appendString(name)
+        .appendString(fieldName)
+        .appendInt(lgK)
+        .appendInt(tgtHllType.ordinal())
+        .appendInt(stringEncoding.ordinal())

Review Comment:
   should we use `stringEncoding.getCacheKey()` instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to