This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 66cac08a52 Refactor HllSketchBuildAggregatorFactory (#14544)
66cac08a52 is described below

commit 66cac08a52cc357a4c775dbb64919941f7733c57
Author: imply-cheddar <[email protected]>
AuthorDate: Tue Jul 11 01:57:09 2023 +0900

    Refactor HllSketchBuildAggregatorFactory (#14544)
    
    * Refactor HllSketchBuildAggregatorFactory
    
    The usage of ColumnProcessors and HllSketchBuildColumnProcessorFactory
    made it very difficult to figure out what was going on from just looking
    at the AggregatorFactory or Aggregator code.  It also didn't properly
    double check that you could use UTF8 ahead of time, even though it's
    entirely possible to validate it before trying to use it.  This refactor
    makes keeps the general indirection that had been implemented by
    the Consumer<Supplier<HllSketch>> but centralizes the decision logic and
    makes it easier to understand the code.
    
    * Test fixes
    
    * Add test that validates the types are maintained
    
    * Add back indirection to avoid buffer calls
    
    * Cover floats and doubles are the same thing
    
    * Static checks
---
 .../datasketches/hll/HllSketchBuildAggregator.java |  11 +-
 .../hll/HllSketchBuildAggregatorFactory.java       |  99 ++++++--
 .../hll/HllSketchBuildBufferAggregator.java        |  14 +-
 .../hll/HllSketchBuildColumnProcessorFactory.java  | 110 --------
 .../datasketches/hll/HllSketchUpdater.java         |  29 +++
 .../hll/sql/HllSketchSqlAggregatorTest.java        | 276 ++++++++++++++-------
 6 files changed, 307 insertions(+), 232 deletions(-)

diff --git 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregator.java
 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregator.java
index 7a086b7257..211078bd75 100644
--- 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregator.java
+++ 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregator.java
@@ -23,25 +23,22 @@ import org.apache.datasketches.hll.HllSketch;
 import org.apache.datasketches.hll.TgtHllType;
 import org.apache.druid.query.aggregation.Aggregator;
 
-import java.util.function.Consumer;
-import java.util.function.Supplier;
-
 /**
  * This aggregator builds sketches from raw data.
  * The input column can contain identifiers of type string, char[], byte[] or 
any numeric type.
  */
 public class HllSketchBuildAggregator implements Aggregator
 {
-  private final Consumer<Supplier<HllSketch>> processor;
+  private final HllSketchUpdater updater;
   private HllSketch sketch;
 
   public HllSketchBuildAggregator(
-      final Consumer<Supplier<HllSketch>> processor,
+      final HllSketchUpdater updater,
       final int lgK,
       final TgtHllType tgtHllType
   )
   {
-    this.processor = processor;
+    this.updater = updater;
     this.sketch = new HllSketch(lgK, tgtHllType);
   }
 
@@ -53,7 +50,7 @@ public class HllSketchBuildAggregator implements Aggregator
   @Override
   public synchronized void aggregate()
   {
-    processor.accept(() -> sketch);
+    updater.update(() -> sketch);
   }
 
   /*
diff --git 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregatorFactory.java
 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregatorFactory.java
index 2762d007b2..35370648f0 100644
--- 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregatorFactory.java
+++ 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregatorFactory.java
@@ -30,22 +30,25 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.AggregatorUtil;
 import org.apache.druid.query.aggregation.BufferAggregator;
 import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
 import org.apache.druid.segment.ColumnInspector;
-import org.apache.druid.segment.ColumnProcessors;
 import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.DimensionSelector;
 import org.apache.druid.segment.column.ColumnCapabilities;
 import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.data.IndexedInts;
 import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
 
 import javax.annotation.Nullable;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
+import java.nio.ByteBuffer;
 
 /**
  * This aggregator factory is for building sketches from raw data.
  * The input column can contain identifiers of type string, char[], byte[] or 
any numeric type.
  */
+@SuppressWarnings("NullableProblems")
 public class HllSketchBuildAggregatorFactory extends HllSketchAggregatorFactory
 {
   public static final ColumnType TYPE = 
ColumnType.ofComplex(HllSketchModule.BUILD_TYPE_NAME);
@@ -80,16 +83,8 @@ public class HllSketchBuildAggregatorFactory extends 
HllSketchAggregatorFactory
   @Override
   public Aggregator factorize(final ColumnSelectorFactory 
columnSelectorFactory)
   {
-    
validateInputs(columnSelectorFactory.getColumnCapabilities(getFieldName()));
-
-    final Consumer<Supplier<HllSketch>> processor = 
ColumnProcessors.makeProcessor(
-        getFieldName(),
-        new HllSketchBuildColumnProcessorFactory(getStringEncoding()),
-        columnSelectorFactory
-    );
-
     return new HllSketchBuildAggregator(
-        processor,
+        formulateSketchUpdater(columnSelectorFactory),
         getLgK(),
         TgtHllType.valueOf(getTgtHllType())
     );
@@ -98,16 +93,8 @@ public class HllSketchBuildAggregatorFactory extends 
HllSketchAggregatorFactory
   @Override
   public BufferAggregator factorizeBuffered(final ColumnSelectorFactory 
columnSelectorFactory)
   {
-    
validateInputs(columnSelectorFactory.getColumnCapabilities(getFieldName()));
-
-    final Consumer<Supplier<HllSketch>> processor = 
ColumnProcessors.makeProcessor(
-        getFieldName(),
-        new HllSketchBuildColumnProcessorFactory(getStringEncoding()),
-        columnSelectorFactory
-    );
-
     return new HllSketchBuildBufferAggregator(
-        processor,
+        formulateSketchUpdater(columnSelectorFactory),
         getLgK(),
         TgtHllType.valueOf(getTgtHllType()),
         getStringEncoding(),
@@ -175,4 +162,74 @@ public class HllSketchBuildAggregatorFactory extends 
HllSketchAggregatorFactory
     }
   }
 
+  private HllSketchUpdater formulateSketchUpdater(ColumnSelectorFactory 
columnSelectorFactory)
+  {
+    final ColumnCapabilities capabilities = 
columnSelectorFactory.getColumnCapabilities(getFieldName());
+    validateInputs(capabilities);
+
+    HllSketchUpdater updater = null;
+    if (capabilities != null &&
+        StringEncoding.UTF8.equals(getStringEncoding()) && 
ValueType.STRING.equals(capabilities.getType())) {
+      final DimensionSelector selector = 
columnSelectorFactory.makeDimensionSelector(
+          DefaultDimensionSpec.of(getFieldName())
+      );
+
+      if (selector.supportsLookupNameUtf8()) {
+        updater = sketch -> {
+          final IndexedInts row = selector.getRow();
+          final int sz = row.size();
+
+          for (int i = 0; i < sz; i++) {
+            final ByteBuffer buf = selector.lookupNameUtf8(row.get(i));
+
+            if (buf != null) {
+              sketch.get().update(buf);
+            }
+          }
+        };
+      }
+    }
+
+    if (updater == null) {
+      @SuppressWarnings("unchecked")
+      final ColumnValueSelector<Object> selector = 
columnSelectorFactory.makeColumnValueSelector(getFieldName());
+      final ValueType type;
+
+      if (capabilities == null) {
+        // When ingesting data, the columnSelectorFactory returns null for 
column capabilities, so this doesn't
+        // necessarily mean that the column doesn't exist.  We thus need to be 
prepared to accept anything in this
+        // case.  As such, we pretend like the input is COMPLEX to get the 
logic to use the object-based aggregation
+        type = ValueType.COMPLEX;
+      } else {
+        type = capabilities.getType();
+      }
+
+
+      switch (type) {
+        case LONG:
+          updater = sketch -> {
+            if (!selector.isNull()) {
+              sketch.get().update(selector.getLong());
+            }
+          };
+          break;
+        case FLOAT:
+        case DOUBLE:
+          updater = sketch -> {
+            if (!selector.isNull()) {
+              sketch.get().update(selector.getDouble());
+            }
+          };
+          break;
+        default:
+          updater = sketch -> {
+            Object obj = selector.getObject();
+            if (obj != null) {
+              HllSketchBuildUtil.updateSketch(sketch.get(), 
getStringEncoding(), obj);
+            }
+          };
+      }
+    }
+    return updater;
+  }
 }
diff --git 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java
 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java
index 10cde4aa25..623bab2df5 100644
--- 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java
+++ 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java
@@ -19,35 +19,33 @@
 
 package org.apache.druid.query.aggregation.datasketches.hll;
 
-import org.apache.datasketches.hll.HllSketch;
 import org.apache.datasketches.hll.TgtHllType;
 import org.apache.druid.java.util.common.StringEncoding;
 import org.apache.druid.query.aggregation.BufferAggregator;
 import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
 
 import java.nio.ByteBuffer;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
 
 /**
  * This aggregator builds sketches from raw data.
  * The input column can contain identifiers of type string, char[], byte[] or 
any numeric type.
  */
+@SuppressWarnings("NullableProblems")
 public class HllSketchBuildBufferAggregator implements BufferAggregator
 {
-  private final Consumer<Supplier<HllSketch>> processor;
+  private final HllSketchUpdater updater;
   private final HllSketchBuildBufferAggregatorHelper helper;
   private final StringEncoding stringEncoding;
 
   public HllSketchBuildBufferAggregator(
-      final Consumer<Supplier<HllSketch>> processor,
+      final HllSketchUpdater updater,
       final int lgK,
       final TgtHllType tgtHllType,
       final StringEncoding stringEncoding,
       final int size
   )
   {
-    this.processor = processor;
+    this.updater = updater;
     this.helper = new HllSketchBuildBufferAggregatorHelper(lgK, tgtHllType, 
size);
     this.stringEncoding = stringEncoding;
   }
@@ -61,7 +59,7 @@ public class HllSketchBuildBufferAggregator implements 
BufferAggregator
   @Override
   public void aggregate(final ByteBuffer buf, final int position)
   {
-    processor.accept(() -> helper.getSketchAtPosition(buf, position));
+    updater.update(() -> helper.getSketchAtPosition(buf, position));
   }
 
   @Override
@@ -101,7 +99,7 @@ public class HllSketchBuildBufferAggregator implements 
BufferAggregator
   @Override
   public void inspectRuntimeShape(RuntimeShapeInspector inspector)
   {
-    inspector.visit("processor", processor);
+    inspector.visit("processor", updater);
     // lgK should be inspected because different execution paths exist in 
HllSketch.update() that is called from
     // @CalledFromHotLoop-annotated aggregate() depending on the lgK.
     // See https://github.com/apache/druid/pull/6893#discussion_r250726028
diff --git 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildColumnProcessorFactory.java
 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildColumnProcessorFactory.java
deleted file mode 100644
index d082388957..0000000000
--- 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildColumnProcessorFactory.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.query.aggregation.datasketches.hll;
-
-import org.apache.datasketches.hll.HllSketch;
-import org.apache.druid.java.util.common.StringEncoding;
-import org.apache.druid.segment.BaseDoubleColumnValueSelector;
-import org.apache.druid.segment.BaseFloatColumnValueSelector;
-import org.apache.druid.segment.BaseLongColumnValueSelector;
-import org.apache.druid.segment.BaseObjectColumnValueSelector;
-import org.apache.druid.segment.ColumnProcessorFactory;
-import org.apache.druid.segment.DimensionSelector;
-import org.apache.druid.segment.column.ColumnType;
-import org.apache.druid.segment.data.IndexedInts;
-
-import java.util.function.Consumer;
-import java.util.function.Supplier;
-
-/**
- * Scalar (non-vectorized) column processor factory.
- */
-public class HllSketchBuildColumnProcessorFactory implements 
ColumnProcessorFactory<Consumer<Supplier<HllSketch>>>
-{
-  private final StringEncoding stringEncoding;
-
-  HllSketchBuildColumnProcessorFactory(StringEncoding stringEncoding)
-  {
-    this.stringEncoding = stringEncoding;
-  }
-
-  @Override
-  public ColumnType defaultType()
-  {
-    return ColumnType.STRING;
-  }
-
-  @Override
-  public Consumer<Supplier<HllSketch>> 
makeDimensionProcessor(DimensionSelector selector, boolean multiValue)
-  {
-    return sketch -> {
-      final IndexedInts row = selector.getRow();
-      final int sz = row.size();
-
-      for (int i = 0; i < sz; i++) {
-        HllSketchBuildUtil.updateSketchWithDictionarySelector(sketch.get(), 
stringEncoding, selector, row.get(i));
-      }
-    };
-  }
-
-  @Override
-  public Consumer<Supplier<HllSketch>> 
makeFloatProcessor(BaseFloatColumnValueSelector selector)
-  {
-    return sketch -> {
-      if (!selector.isNull()) {
-        // Important that this is *double* typed, since 
HllSketchBuildAggregator treats doubles and floats the same.
-        final double value = selector.getFloat();
-        sketch.get().update(value);
-      }
-    };
-  }
-
-  @Override
-  public Consumer<Supplier<HllSketch>> 
makeDoubleProcessor(BaseDoubleColumnValueSelector selector)
-  {
-    return sketch -> {
-      if (!selector.isNull()) {
-        sketch.get().update(selector.getDouble());
-      }
-    };
-  }
-
-  @Override
-  public Consumer<Supplier<HllSketch>> 
makeLongProcessor(BaseLongColumnValueSelector selector)
-  {
-    return sketch -> {
-      if (!selector.isNull()) {
-        sketch.get().update(selector.getLong());
-      }
-    };
-  }
-
-  @Override
-  public Consumer<Supplier<HllSketch>> 
makeComplexProcessor(BaseObjectColumnValueSelector<?> selector)
-  {
-    return sketch -> {
-      final Object o = selector.getObject();
-
-      if (o != null) {
-        HllSketchBuildUtil.updateSketch(sketch.get(), stringEncoding, o);
-      }
-    };
-  }
-}
diff --git 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchUpdater.java
 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchUpdater.java
new file mode 100644
index 0000000000..146bf1055d
--- /dev/null
+++ 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchUpdater.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.datasketches.hll;
+
+import org.apache.datasketches.hll.HllSketch;
+
+import java.util.function.Supplier;
+
+public interface HllSketchUpdater
+{
+  void update(Supplier<HllSketch> sketch);
+}
diff --git 
a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java
 
b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java
index 498bb06d9a..1431aa49a9 100644
--- 
a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java
+++ 
b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.query.aggregation.datasketches.hll.sql;
 
+import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.inject.Injector;
@@ -46,6 +47,7 @@ import 
org.apache.druid.query.aggregation.datasketches.hll.HllSketchModule;
 import 
org.apache.druid.query.aggregation.datasketches.hll.HllSketchToEstimatePostAggregator;
 import 
org.apache.druid.query.aggregation.datasketches.hll.HllSketchToEstimateWithBoundsPostAggregator;
 import 
org.apache.druid.query.aggregation.datasketches.hll.HllSketchToStringPostAggregator;
+import 
org.apache.druid.query.aggregation.datasketches.hll.HllSketchUnionPostAggregator;
 import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator;
 import org.apache.druid.query.aggregation.post.ExpressionPostAggregator;
 import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
@@ -109,26 +111,36 @@ public class HllSketchSqlAggregatorTest extends 
BaseCalciteQueryTest
           "[2.000000004967054,2.0,2.000099863468538]",
           "\"AgEHDAMIBgC1EYgH1mlHBwsKPwu5SK8MIiUxB7iZVwU=\"",
           2L,
-          "### HLL SKETCH SUMMARY: \n"
-            + "  Log Config K   : 12\n"
-            + "  Hll Target     : HLL_4\n"
-            + "  Current Mode   : LIST\n"
-            + "  Memory         : false\n"
-            + "  LB             : 2.0\n"
-            + "  Estimate       : 2.000000004967054\n"
-            + "  UB             : 2.000099863468538\n"
-            + "  OutOfOrder Flag: false\n"
-            + "  Coupon Count   : 2\n",
-          "### HLL SKETCH SUMMARY: \n"
-            + "  LOG CONFIG K   : 12\n"
-            + "  HLL TARGET     : HLL_4\n"
-            + "  CURRENT MODE   : LIST\n"
-            + "  MEMORY         : FALSE\n"
-            + "  LB             : 2.0\n"
-            + "  ESTIMATE       : 2.000000004967054\n"
-            + "  UB             : 2.000099863468538\n"
-            + "  OUTOFORDER FLAG: FALSE\n"
-            + "  COUPON COUNT   : 2\n",
+          Joiner.on("\n").join(
+              new Object[]{
+                  "### HLL SKETCH SUMMARY: ",
+                  "  Log Config K   : 12",
+                  "  Hll Target     : HLL_4",
+                  "  Current Mode   : LIST",
+                  "  Memory         : false",
+                  "  LB             : 2.0",
+                  "  Estimate       : 2.000000004967054",
+                  "  UB             : 2.000099863468538",
+                  "  OutOfOrder Flag: false",
+                  "  Coupon Count   : 2",
+                  ""
+              }
+          ),
+          Joiner.on("\n").join(
+              new Object[]{
+                  "### HLL SKETCH SUMMARY: ",
+                  "  LOG CONFIG K   : 12",
+                  "  HLL TARGET     : HLL_4",
+                  "  CURRENT MODE   : LIST",
+                  "  MEMORY         : FALSE",
+                  "  LB             : 2.0",
+                  "  ESTIMATE       : 2.000000004967054",
+                  "  UB             : 2.000099863468538",
+                  "  OUTOFORDER FLAG: FALSE",
+                  "  COUPON COUNT   : 2",
+                  ""
+              }
+          ),
           2.0,
           2L
       };
@@ -242,39 +254,27 @@ public class HllSketchSqlAggregatorTest extends 
BaseCalciteQueryTest
   ) throws IOException
   {
     HllSketchModule.registerSerde();
-    final QueryableIndex index = IndexBuilder.create()
-                                             
.tmpDir(temporaryFolder.newFolder())
-                                             
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
-                                             .schema(
-                                                 new 
IncrementalIndexSchema.Builder()
-                                                     .withMetrics(
-                                                         new 
CountAggregatorFactory("cnt"),
-                                                         new 
DoubleSumAggregatorFactory("m1", "m1"),
-                                                         new 
HllSketchBuildAggregatorFactory(
-                                                             "hllsketch_dim1",
-                                                             "dim1",
-                                                             null,
-                                                             null,
-                                                             null,
-                                                             false,
-                                                             ROUND
-                                                         ),
-                                                         new 
HllSketchBuildAggregatorFactory(
-                                                             "hllsketch_dim3",
-                                                             "dim3",
-                                                             null,
-                                                             null,
-                                                             null,
-                                                             false,
-                                                             false
-                                                         )
-                                                     )
-                                                     .withRollup(false)
-                                                     .build()
-                                             )
-                                             .rows(TestDataBuilder.ROWS1)
-                                             .buildMMappedIndex();
-
+    final QueryableIndex index = IndexBuilder
+        .create()
+        .tmpDir(temporaryFolder.newFolder())
+        
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
+        .schema(
+            new IncrementalIndexSchema.Builder()
+                .withMetrics(
+                    new CountAggregatorFactory("cnt"),
+                    new DoubleSumAggregatorFactory("m1", "m1"),
+                    new HllSketchBuildAggregatorFactory("hllsketch_dim1", 
"dim1", null, null, null, false, ROUND),
+                    new HllSketchBuildAggregatorFactory("hllsketch_dim3", 
"dim3", null, null, null, false, false),
+                    new HllSketchBuildAggregatorFactory("hllsketch_m1", "m1", 
null, null, null, false, ROUND),
+                    new HllSketchBuildAggregatorFactory("hllsketch_f1", "f1", 
null, null, null, false, ROUND),
+                    new HllSketchBuildAggregatorFactory("hllsketch_l1", "l1", 
null, null, null, false, ROUND),
+                    new HllSketchBuildAggregatorFactory("hllsketch_d1", "d1", 
null, null, null, false, ROUND)
+                )
+                .withRollup(false)
+                .build()
+        )
+        .rows(TestDataBuilder.ROWS1_WITH_NUMERIC_DIMS)
+        .buildMMappedIndex();
 
     return new SpecificSegmentsQuerySegmentWalker(conglomerate).add(
         DataSegment.builder()
@@ -473,7 +473,7 @@ public class HllSketchSqlAggregatorTest extends 
BaseCalciteQueryTest
             GroupByQuery.builder()
                         .setInterval(querySegmentSpec(Filtration.eternity()))
                         .setDataSource(CalciteTests.DATASOURCE1)
-                        .setDimensions(dimensions(new 
DefaultDimensionSpec("dim2", "d0")))
+                        .setDimensions(dimensions(new 
DefaultDimensionSpec("dim2", "_d0")))
                         .setGranularity(Granularities.ALL)
                         .setAggregatorSpecs(
                             aggregators(
@@ -518,7 +518,7 @@ public class HllSketchSqlAggregatorTest extends 
BaseCalciteQueryTest
                         .setInterval(new 
MultipleIntervalSegmentSpec(Collections.singletonList(Filtration.eternity())))
                         .setGranularity(Granularities.ALL)
                         
.setVirtualColumns(VirtualColumns.create(EXPECTED_PA_VIRTUAL_COLUMNS))
-                        .setDimensions(new DefaultDimensionSpec("cnt", "d0", 
ColumnType.LONG))
+                        .setDimensions(new DefaultDimensionSpec("cnt", "_d0", 
ColumnType.LONG))
                         .setAggregatorSpecs(EXPECTED_FILTERED_AGGREGATORS)
                         
.setPostAggregatorSpecs(EXPECTED_FILTERED_POST_AGGREGATORS)
                         .setContext(QUERY_CONTEXT_DEFAULT)
@@ -613,7 +613,7 @@ public class HllSketchSqlAggregatorTest extends 
BaseCalciteQueryTest
                         .setInterval(new 
MultipleIntervalSegmentSpec(Collections.singletonList(Filtration.eternity())))
                         .setGranularity(Granularities.ALL)
                         
.setVirtualColumns(VirtualColumns.create(EXPECTED_PA_VIRTUAL_COLUMNS))
-                        .setDimensions(new DefaultDimensionSpec("cnt", "d0", 
ColumnType.LONG))
+                        .setDimensions(new DefaultDimensionSpec("cnt", "_d0", 
ColumnType.LONG))
                         .setAggregatorSpecs(EXPECTED_PA_AGGREGATORS)
                         .setPostAggregatorSpecs(EXPECTED_PA_POST_AGGREGATORS)
                         .setContext(QUERY_CONTEXT_DEFAULT)
@@ -898,7 +898,7 @@ public class HllSketchSqlAggregatorTest extends 
BaseCalciteQueryTest
                         .setDimFilter(selector("dim2", "a", null))
                         .setGranularity(Granularities.ALL)
                         .setVirtualColumns(expressionVirtualColumn("v0", 
"'a'", ColumnType.STRING))
-                        .setDimensions(new DefaultDimensionSpec("v0", "d0", 
ColumnType.STRING))
+                        .setDimensions(new DefaultDimensionSpec("v0", "_d0", 
ColumnType.STRING))
                         .setAggregatorSpecs(
                             aggregators(
                                 new FilteredAggregatorFactory(
@@ -957,7 +957,7 @@ public class HllSketchSqlAggregatorTest extends 
BaseCalciteQueryTest
                         .setDimFilter(selector("dim2", "a", null))
                         .setGranularity(Granularities.ALL)
                         .setVirtualColumns(expressionVirtualColumn("v0", 
"'a'", ColumnType.STRING))
-                        .setDimensions(new DefaultDimensionSpec("v0", "d0", 
ColumnType.STRING))
+                        .setDimensions(new DefaultDimensionSpec("v0", "_d0", 
ColumnType.STRING))
                         .setAggregatorSpecs(
                             aggregators(
                                 new FilteredAggregatorFactory(
@@ -982,30 +982,33 @@ public class HllSketchSqlAggregatorTest extends 
BaseCalciteQueryTest
   {
     testQuery(
         "SELECT"
-        + " HLL_SKETCH_ESTIMATE(hllsketch_dim1)"
+        + " HLL_SKETCH_ESTIMATE(hllsketch_dim1),"
+        + " HLL_SKETCH_ESTIMATE(hllsketch_d1),"
+        + " HLL_SKETCH_ESTIMATE(hllsketch_l1),"
+        + " HLL_SKETCH_ESTIMATE(hllsketch_f1)"
         + " FROM druid.foo",
         ImmutableList.of(
             newScanQueryBuilder()
                 .dataSource(CalciteTests.DATASOURCE1)
                 .intervals(querySegmentSpec(Filtration.eternity()))
-                .virtualColumns(new ExpressionVirtualColumn(
-                    "v0",
-                    "hll_sketch_estimate(\"hllsketch_dim1\")",
-                    ColumnType.DOUBLE,
-                    MACRO_TABLE
-                ))
+                .virtualColumns(
+                    makeSketchEstimateExpression("v0", "hllsketch_dim1"),
+                    makeSketchEstimateExpression("v1", "hllsketch_d1"),
+                    makeSketchEstimateExpression("v2", "hllsketch_l1"),
+                    makeSketchEstimateExpression("v3", "hllsketch_f1")
+                )
                 
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
-                .columns("v0")
+                .columns("v0", "v1", "v2", "v3")
                 .context(QUERY_CONTEXT_DEFAULT)
                 .build()
         ),
         ImmutableList.of(
-            new Object[]{0.0D},
-            new Object[]{1.0D},
-            new Object[]{1.0D},
-            new Object[]{1.0D},
-            new Object[]{1.0D},
-            new Object[]{1.0D}
+            new Object[]{0.0D, 1.0D, 1.0D, 1.0D},
+            new Object[]{1.0D, 1.0D, 1.0D, 1.0D},
+            new Object[]{1.0D, 1.0D, 1.0D, 1.0D},
+            new Object[]{1.0D, 0.0D, 0.0D, 0.0D},
+            new Object[]{1.0D, 0.0D, 0.0D, 0.0D},
+            new Object[]{1.0D, 0.0D, 0.0D, 0.0D}
         )
     );
   }
@@ -1097,14 +1100,9 @@ public class HllSketchSqlAggregatorTest extends 
BaseCalciteQueryTest
                         .setInterval(querySegmentSpec(Filtration.eternity()))
                         .setDataSource(CalciteTests.DATASOURCE1)
                         .setGranularity(Granularities.ALL)
-                        .setVirtualColumns(new ExpressionVirtualColumn(
-                            "v0",
-                            "hll_sketch_estimate(\"hllsketch_dim1\")",
-                            ColumnType.DOUBLE,
-                            MACRO_TABLE
-                        ))
+                        .setVirtualColumns(makeSketchEstimateExpression("v0", 
"hllsketch_dim1"))
                         .setDimensions(
-                            new DefaultDimensionSpec("v0", "d0", 
ColumnType.DOUBLE))
+                            new DefaultDimensionSpec("v0", "_d0", 
ColumnType.DOUBLE))
                         .setAggregatorSpecs(
                             aggregators(
                                 new CountAggregatorFactory("a0")
@@ -1146,13 +1144,8 @@ public class HllSketchSqlAggregatorTest extends 
BaseCalciteQueryTest
                 .dataSource(CalciteTests.DATASOURCE1)
                 .intervals(querySegmentSpec(Filtration.eternity()))
                 .granularity(Granularities.ALL)
-                .dimension(new DefaultDimensionSpec("v0", "d0", 
ColumnType.DOUBLE))
-                .virtualColumns(new ExpressionVirtualColumn(
-                    "v0",
-                    "hll_sketch_estimate(\"hllsketch_dim1\")",
-                    ColumnType.DOUBLE,
-                    MACRO_TABLE
-                ))
+                .dimension(new DefaultDimensionSpec("v0", "_d0", 
ColumnType.DOUBLE))
+                .virtualColumns(makeSketchEstimateExpression("v0", 
"hllsketch_dim1"))
                 .metric(new InvertedTopNMetricSpec(new 
NumericTopNMetricSpec("a0")))
                 .threshold(2)
                 .aggregators(new CountAggregatorFactory("a0"))
@@ -1165,4 +1158,115 @@ public class HllSketchSqlAggregatorTest extends 
BaseCalciteQueryTest
         )
     );
   }
+
+  /**
+   * This is an extremely subtle test, so we explain with a comment.  The `m1` 
column in the input data looks like
+   * `["1.0", "2.0", "3.0", "4.0", "5.0", "6.0"]` while the `d1` column looks 
like
+   * `[1.0, 1.7, 0.0]`.  That is, "m1" is numbers-as-strings, while d1 is 
numbers-as-numbers.  If you take the
+   * uniques across both columns, you expect no overlap, so 9 entries.  
However, if the `1.0` from `d1` gets
+   * converted into `"1.0"` or vice-versa, the result can become 8 because 
then the sketch will hash the same
+   * value multiple times considering them duplicates.  This test validates 
that the aggregator properly builds
+   * the sketches preserving the initial type of the data as it came in.  
Specifically, the test was added when
+   * a code change caused the 1.0 to get converted to a String such that the 
resulting value of the query was 8
+   * instead of 9.
+   */
+  @Test
+  public void testEstimateStringAndDoubleAreDifferent()
+  {
+    testQuery(
+        "SELECT"
+        + " HLL_SKETCH_ESTIMATE(HLL_SKETCH_UNION(DS_HLL(hllsketch_d1), 
DS_HLL(hllsketch_m1)), true)"
+        + " FROM druid.foo",
+        ImmutableList.of(
+            Druids.newTimeseriesQueryBuilder()
+                  .dataSource(CalciteTests.DATASOURCE1)
+                  .intervals(querySegmentSpec(Filtration.eternity()))
+                  .granularity(Granularities.ALL)
+                  .aggregators(
+                      new HllSketchMergeAggregatorFactory("a0", 
"hllsketch_d1", null, null, null, false, true),
+                      new HllSketchMergeAggregatorFactory("a1", 
"hllsketch_m1", null, null, null, false, true)
+                  )
+                  .postAggregators(
+                      new HllSketchToEstimatePostAggregator(
+                          "p3",
+                          new HllSketchUnionPostAggregator(
+                              "p2",
+                              Arrays.asList(
+                                  new FieldAccessPostAggregator("p0", "a0"),
+                                  new FieldAccessPostAggregator("p1", "a1")
+                              ),
+                              null,
+                              null
+                          ),
+                          true
+                      )
+                  )
+                  .context(QUERY_CONTEXT_DEFAULT)
+                  .build()
+        ),
+        ImmutableList.of(
+            new Object[]{9.0D}
+        )
+    );
+  }
+
+  /**
+   * This is a test in a similar vein to {@link 
#testEstimateStringAndDoubleAreDifferent()} except here we are
+   * ensuring that float values and doubles values are considered equivalent.  
The expected initial inputs were
+   * <p>
+   * 1. d1 -> [1.0, 1.7, 0.0]
+   * 2. f1 -> [1.0f, 0.1f, 0.0f]
+   * <p>
+   * If we assume that doubles and floats are the same, that means that there 
are 4 unique values, not 6
+   */
+  @Test
+  public void testFloatAndDoubleAreConsideredTheSame()
+  {
+    // This is a test in a similar vein to 
testEstimateStringAndDoubleAreDifferent above
+    testQuery(
+        "SELECT"
+        + " HLL_SKETCH_ESTIMATE(HLL_SKETCH_UNION(DS_HLL(hllsketch_d1), 
DS_HLL(hllsketch_f1)), true)"
+        + " FROM druid.foo",
+        ImmutableList.of(
+            Druids.newTimeseriesQueryBuilder()
+                  .dataSource(CalciteTests.DATASOURCE1)
+                  .intervals(querySegmentSpec(Filtration.eternity()))
+                  .granularity(Granularities.ALL)
+                  .aggregators(
+                      new HllSketchMergeAggregatorFactory("a0", 
"hllsketch_d1", null, null, null, false, true),
+                      new HllSketchMergeAggregatorFactory("a1", 
"hllsketch_f1", null, null, null, false, true)
+                  )
+                  .postAggregators(
+                      new HllSketchToEstimatePostAggregator(
+                          "p3",
+                          new HllSketchUnionPostAggregator(
+                              "p2",
+                              Arrays.asList(
+                                  new FieldAccessPostAggregator("p0", "a0"),
+                                  new FieldAccessPostAggregator("p1", "a1")
+                              ),
+                              null,
+                              null
+                          ),
+                          true
+                      )
+                  )
+                  .context(QUERY_CONTEXT_DEFAULT)
+                  .build()
+        ),
+        ImmutableList.of(
+            new Object[]{4.0D}
+        )
+    );
+  }
+
+  private ExpressionVirtualColumn makeSketchEstimateExpression(String 
outputName, String field)
+  {
+    return new ExpressionVirtualColumn(
+        outputName,
+        StringUtils.format("hll_sketch_estimate(\"%s\")", field),
+        ColumnType.DOUBLE,
+        MACRO_TABLE
+    );
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to