This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 66cac08a52 Refactor HllSketchBuildAggregatorFactory (#14544)
66cac08a52 is described below
commit 66cac08a52cc357a4c775dbb64919941f7733c57
Author: imply-cheddar <[email protected]>
AuthorDate: Tue Jul 11 01:57:09 2023 +0900
Refactor HllSketchBuildAggregatorFactory (#14544)
* Refactor HllSketchBuildAggregatorFactory
The usage of ColumnProcessors and HllSketchBuildColumnProcessorFactory
made it very difficult to figure out what was going on from just looking
at the AggregatorFactory or Aggregator code. It also didn't properly
double check that you could use UTF8 ahead of time, even though it's
entirely possible to validate it before trying to use it. This refactor
makes keeps the general indirection that had been implemented by
the Consumer<Supplier<HllSketch>> but centralizes the decision logic and
makes it easier to understand the code.
* Test fixes
* Add test that validates the types are maintained
* Add back indirection to avoid buffer calls
* Cover floats and doubles are the same thing
* Static checks
---
.../datasketches/hll/HllSketchBuildAggregator.java | 11 +-
.../hll/HllSketchBuildAggregatorFactory.java | 99 ++++++--
.../hll/HllSketchBuildBufferAggregator.java | 14 +-
.../hll/HllSketchBuildColumnProcessorFactory.java | 110 --------
.../datasketches/hll/HllSketchUpdater.java | 29 +++
.../hll/sql/HllSketchSqlAggregatorTest.java | 276 ++++++++++++++-------
6 files changed, 307 insertions(+), 232 deletions(-)
diff --git
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregator.java
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregator.java
index 7a086b7257..211078bd75 100644
---
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregator.java
+++
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregator.java
@@ -23,25 +23,22 @@ import org.apache.datasketches.hll.HllSketch;
import org.apache.datasketches.hll.TgtHllType;
import org.apache.druid.query.aggregation.Aggregator;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
-
/**
* This aggregator builds sketches from raw data.
* The input column can contain identifiers of type string, char[], byte[] or
any numeric type.
*/
public class HllSketchBuildAggregator implements Aggregator
{
- private final Consumer<Supplier<HllSketch>> processor;
+ private final HllSketchUpdater updater;
private HllSketch sketch;
public HllSketchBuildAggregator(
- final Consumer<Supplier<HllSketch>> processor,
+ final HllSketchUpdater updater,
final int lgK,
final TgtHllType tgtHllType
)
{
- this.processor = processor;
+ this.updater = updater;
this.sketch = new HllSketch(lgK, tgtHllType);
}
@@ -53,7 +50,7 @@ public class HllSketchBuildAggregator implements Aggregator
@Override
public synchronized void aggregate()
{
- processor.accept(() -> sketch);
+ updater.update(() -> sketch);
}
/*
diff --git
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregatorFactory.java
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregatorFactory.java
index 2762d007b2..35370648f0 100644
---
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregatorFactory.java
+++
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregatorFactory.java
@@ -30,22 +30,25 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.segment.ColumnInspector;
-import org.apache.druid.segment.ColumnProcessors;
import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.data.IndexedInts;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import javax.annotation.Nullable;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
+import java.nio.ByteBuffer;
/**
* This aggregator factory is for building sketches from raw data.
* The input column can contain identifiers of type string, char[], byte[] or
any numeric type.
*/
+@SuppressWarnings("NullableProblems")
public class HllSketchBuildAggregatorFactory extends HllSketchAggregatorFactory
{
public static final ColumnType TYPE =
ColumnType.ofComplex(HllSketchModule.BUILD_TYPE_NAME);
@@ -80,16 +83,8 @@ public class HllSketchBuildAggregatorFactory extends
HllSketchAggregatorFactory
@Override
public Aggregator factorize(final ColumnSelectorFactory
columnSelectorFactory)
{
-
validateInputs(columnSelectorFactory.getColumnCapabilities(getFieldName()));
-
- final Consumer<Supplier<HllSketch>> processor =
ColumnProcessors.makeProcessor(
- getFieldName(),
- new HllSketchBuildColumnProcessorFactory(getStringEncoding()),
- columnSelectorFactory
- );
-
return new HllSketchBuildAggregator(
- processor,
+ formulateSketchUpdater(columnSelectorFactory),
getLgK(),
TgtHllType.valueOf(getTgtHllType())
);
@@ -98,16 +93,8 @@ public class HllSketchBuildAggregatorFactory extends
HllSketchAggregatorFactory
@Override
public BufferAggregator factorizeBuffered(final ColumnSelectorFactory
columnSelectorFactory)
{
-
validateInputs(columnSelectorFactory.getColumnCapabilities(getFieldName()));
-
- final Consumer<Supplier<HllSketch>> processor =
ColumnProcessors.makeProcessor(
- getFieldName(),
- new HllSketchBuildColumnProcessorFactory(getStringEncoding()),
- columnSelectorFactory
- );
-
return new HllSketchBuildBufferAggregator(
- processor,
+ formulateSketchUpdater(columnSelectorFactory),
getLgK(),
TgtHllType.valueOf(getTgtHllType()),
getStringEncoding(),
@@ -175,4 +162,74 @@ public class HllSketchBuildAggregatorFactory extends
HllSketchAggregatorFactory
}
}
+ private HllSketchUpdater formulateSketchUpdater(ColumnSelectorFactory
columnSelectorFactory)
+ {
+ final ColumnCapabilities capabilities =
columnSelectorFactory.getColumnCapabilities(getFieldName());
+ validateInputs(capabilities);
+
+ HllSketchUpdater updater = null;
+ if (capabilities != null &&
+ StringEncoding.UTF8.equals(getStringEncoding()) &&
ValueType.STRING.equals(capabilities.getType())) {
+ final DimensionSelector selector =
columnSelectorFactory.makeDimensionSelector(
+ DefaultDimensionSpec.of(getFieldName())
+ );
+
+ if (selector.supportsLookupNameUtf8()) {
+ updater = sketch -> {
+ final IndexedInts row = selector.getRow();
+ final int sz = row.size();
+
+ for (int i = 0; i < sz; i++) {
+ final ByteBuffer buf = selector.lookupNameUtf8(row.get(i));
+
+ if (buf != null) {
+ sketch.get().update(buf);
+ }
+ }
+ };
+ }
+ }
+
+ if (updater == null) {
+ @SuppressWarnings("unchecked")
+ final ColumnValueSelector<Object> selector =
columnSelectorFactory.makeColumnValueSelector(getFieldName());
+ final ValueType type;
+
+ if (capabilities == null) {
+ // When ingesting data, the columnSelectorFactory returns null for
column capabilities, so this doesn't
+ // necessarily mean that the column doesn't exist. We thus need to be
prepared to accept anything in this
+ // case. As such, we pretend like the input is COMPLEX to get the
logic to use the object-based aggregation
+ type = ValueType.COMPLEX;
+ } else {
+ type = capabilities.getType();
+ }
+
+
+ switch (type) {
+ case LONG:
+ updater = sketch -> {
+ if (!selector.isNull()) {
+ sketch.get().update(selector.getLong());
+ }
+ };
+ break;
+ case FLOAT:
+ case DOUBLE:
+ updater = sketch -> {
+ if (!selector.isNull()) {
+ sketch.get().update(selector.getDouble());
+ }
+ };
+ break;
+ default:
+ updater = sketch -> {
+ Object obj = selector.getObject();
+ if (obj != null) {
+ HllSketchBuildUtil.updateSketch(sketch.get(),
getStringEncoding(), obj);
+ }
+ };
+ }
+ }
+ return updater;
+ }
}
diff --git
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java
index 10cde4aa25..623bab2df5 100644
---
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java
+++
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java
@@ -19,35 +19,33 @@
package org.apache.druid.query.aggregation.datasketches.hll;
-import org.apache.datasketches.hll.HllSketch;
import org.apache.datasketches.hll.TgtHllType;
import org.apache.druid.java.util.common.StringEncoding;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import java.nio.ByteBuffer;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
/**
* This aggregator builds sketches from raw data.
* The input column can contain identifiers of type string, char[], byte[] or
any numeric type.
*/
+@SuppressWarnings("NullableProblems")
public class HllSketchBuildBufferAggregator implements BufferAggregator
{
- private final Consumer<Supplier<HllSketch>> processor;
+ private final HllSketchUpdater updater;
private final HllSketchBuildBufferAggregatorHelper helper;
private final StringEncoding stringEncoding;
public HllSketchBuildBufferAggregator(
- final Consumer<Supplier<HllSketch>> processor,
+ final HllSketchUpdater updater,
final int lgK,
final TgtHllType tgtHllType,
final StringEncoding stringEncoding,
final int size
)
{
- this.processor = processor;
+ this.updater = updater;
this.helper = new HllSketchBuildBufferAggregatorHelper(lgK, tgtHllType,
size);
this.stringEncoding = stringEncoding;
}
@@ -61,7 +59,7 @@ public class HllSketchBuildBufferAggregator implements
BufferAggregator
@Override
public void aggregate(final ByteBuffer buf, final int position)
{
- processor.accept(() -> helper.getSketchAtPosition(buf, position));
+ updater.update(() -> helper.getSketchAtPosition(buf, position));
}
@Override
@@ -101,7 +99,7 @@ public class HllSketchBuildBufferAggregator implements
BufferAggregator
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
- inspector.visit("processor", processor);
+ inspector.visit("processor", updater);
// lgK should be inspected because different execution paths exist in
HllSketch.update() that is called from
// @CalledFromHotLoop-annotated aggregate() depending on the lgK.
// See https://github.com/apache/druid/pull/6893#discussion_r250726028
diff --git
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildColumnProcessorFactory.java
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildColumnProcessorFactory.java
deleted file mode 100644
index d082388957..0000000000
---
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildColumnProcessorFactory.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.query.aggregation.datasketches.hll;
-
-import org.apache.datasketches.hll.HllSketch;
-import org.apache.druid.java.util.common.StringEncoding;
-import org.apache.druid.segment.BaseDoubleColumnValueSelector;
-import org.apache.druid.segment.BaseFloatColumnValueSelector;
-import org.apache.druid.segment.BaseLongColumnValueSelector;
-import org.apache.druid.segment.BaseObjectColumnValueSelector;
-import org.apache.druid.segment.ColumnProcessorFactory;
-import org.apache.druid.segment.DimensionSelector;
-import org.apache.druid.segment.column.ColumnType;
-import org.apache.druid.segment.data.IndexedInts;
-
-import java.util.function.Consumer;
-import java.util.function.Supplier;
-
-/**
- * Scalar (non-vectorized) column processor factory.
- */
-public class HllSketchBuildColumnProcessorFactory implements
ColumnProcessorFactory<Consumer<Supplier<HllSketch>>>
-{
- private final StringEncoding stringEncoding;
-
- HllSketchBuildColumnProcessorFactory(StringEncoding stringEncoding)
- {
- this.stringEncoding = stringEncoding;
- }
-
- @Override
- public ColumnType defaultType()
- {
- return ColumnType.STRING;
- }
-
- @Override
- public Consumer<Supplier<HllSketch>>
makeDimensionProcessor(DimensionSelector selector, boolean multiValue)
- {
- return sketch -> {
- final IndexedInts row = selector.getRow();
- final int sz = row.size();
-
- for (int i = 0; i < sz; i++) {
- HllSketchBuildUtil.updateSketchWithDictionarySelector(sketch.get(),
stringEncoding, selector, row.get(i));
- }
- };
- }
-
- @Override
- public Consumer<Supplier<HllSketch>>
makeFloatProcessor(BaseFloatColumnValueSelector selector)
- {
- return sketch -> {
- if (!selector.isNull()) {
- // Important that this is *double* typed, since
HllSketchBuildAggregator treats doubles and floats the same.
- final double value = selector.getFloat();
- sketch.get().update(value);
- }
- };
- }
-
- @Override
- public Consumer<Supplier<HllSketch>>
makeDoubleProcessor(BaseDoubleColumnValueSelector selector)
- {
- return sketch -> {
- if (!selector.isNull()) {
- sketch.get().update(selector.getDouble());
- }
- };
- }
-
- @Override
- public Consumer<Supplier<HllSketch>>
makeLongProcessor(BaseLongColumnValueSelector selector)
- {
- return sketch -> {
- if (!selector.isNull()) {
- sketch.get().update(selector.getLong());
- }
- };
- }
-
- @Override
- public Consumer<Supplier<HllSketch>>
makeComplexProcessor(BaseObjectColumnValueSelector<?> selector)
- {
- return sketch -> {
- final Object o = selector.getObject();
-
- if (o != null) {
- HllSketchBuildUtil.updateSketch(sketch.get(), stringEncoding, o);
- }
- };
- }
-}
diff --git
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchUpdater.java
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchUpdater.java
new file mode 100644
index 0000000000..146bf1055d
--- /dev/null
+++
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchUpdater.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.datasketches.hll;
+
+import org.apache.datasketches.hll.HllSketch;
+
+import java.util.function.Supplier;
+
+public interface HllSketchUpdater
+{
+ void update(Supplier<HllSketch> sketch);
+}
diff --git
a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java
b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java
index 498bb06d9a..1431aa49a9 100644
---
a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java
+++
b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java
@@ -19,6 +19,7 @@
package org.apache.druid.query.aggregation.datasketches.hll.sql;
+import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Injector;
@@ -46,6 +47,7 @@ import
org.apache.druid.query.aggregation.datasketches.hll.HllSketchModule;
import
org.apache.druid.query.aggregation.datasketches.hll.HllSketchToEstimatePostAggregator;
import
org.apache.druid.query.aggregation.datasketches.hll.HllSketchToEstimateWithBoundsPostAggregator;
import
org.apache.druid.query.aggregation.datasketches.hll.HllSketchToStringPostAggregator;
+import
org.apache.druid.query.aggregation.datasketches.hll.HllSketchUnionPostAggregator;
import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator;
import org.apache.druid.query.aggregation.post.ExpressionPostAggregator;
import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
@@ -109,26 +111,36 @@ public class HllSketchSqlAggregatorTest extends
BaseCalciteQueryTest
"[2.000000004967054,2.0,2.000099863468538]",
"\"AgEHDAMIBgC1EYgH1mlHBwsKPwu5SK8MIiUxB7iZVwU=\"",
2L,
- "### HLL SKETCH SUMMARY: \n"
- + " Log Config K : 12\n"
- + " Hll Target : HLL_4\n"
- + " Current Mode : LIST\n"
- + " Memory : false\n"
- + " LB : 2.0\n"
- + " Estimate : 2.000000004967054\n"
- + " UB : 2.000099863468538\n"
- + " OutOfOrder Flag: false\n"
- + " Coupon Count : 2\n",
- "### HLL SKETCH SUMMARY: \n"
- + " LOG CONFIG K : 12\n"
- + " HLL TARGET : HLL_4\n"
- + " CURRENT MODE : LIST\n"
- + " MEMORY : FALSE\n"
- + " LB : 2.0\n"
- + " ESTIMATE : 2.000000004967054\n"
- + " UB : 2.000099863468538\n"
- + " OUTOFORDER FLAG: FALSE\n"
- + " COUPON COUNT : 2\n",
+ Joiner.on("\n").join(
+ new Object[]{
+ "### HLL SKETCH SUMMARY: ",
+ " Log Config K : 12",
+ " Hll Target : HLL_4",
+ " Current Mode : LIST",
+ " Memory : false",
+ " LB : 2.0",
+ " Estimate : 2.000000004967054",
+ " UB : 2.000099863468538",
+ " OutOfOrder Flag: false",
+ " Coupon Count : 2",
+ ""
+ }
+ ),
+ Joiner.on("\n").join(
+ new Object[]{
+ "### HLL SKETCH SUMMARY: ",
+ " LOG CONFIG K : 12",
+ " HLL TARGET : HLL_4",
+ " CURRENT MODE : LIST",
+ " MEMORY : FALSE",
+ " LB : 2.0",
+ " ESTIMATE : 2.000000004967054",
+ " UB : 2.000099863468538",
+ " OUTOFORDER FLAG: FALSE",
+ " COUPON COUNT : 2",
+ ""
+ }
+ ),
2.0,
2L
};
@@ -242,39 +254,27 @@ public class HllSketchSqlAggregatorTest extends
BaseCalciteQueryTest
) throws IOException
{
HllSketchModule.registerSerde();
- final QueryableIndex index = IndexBuilder.create()
-
.tmpDir(temporaryFolder.newFolder())
-
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
- .schema(
- new
IncrementalIndexSchema.Builder()
- .withMetrics(
- new
CountAggregatorFactory("cnt"),
- new
DoubleSumAggregatorFactory("m1", "m1"),
- new
HllSketchBuildAggregatorFactory(
- "hllsketch_dim1",
- "dim1",
- null,
- null,
- null,
- false,
- ROUND
- ),
- new
HllSketchBuildAggregatorFactory(
- "hllsketch_dim3",
- "dim3",
- null,
- null,
- null,
- false,
- false
- )
- )
- .withRollup(false)
- .build()
- )
- .rows(TestDataBuilder.ROWS1)
- .buildMMappedIndex();
-
+ final QueryableIndex index = IndexBuilder
+ .create()
+ .tmpDir(temporaryFolder.newFolder())
+
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
+ .schema(
+ new IncrementalIndexSchema.Builder()
+ .withMetrics(
+ new CountAggregatorFactory("cnt"),
+ new DoubleSumAggregatorFactory("m1", "m1"),
+ new HllSketchBuildAggregatorFactory("hllsketch_dim1",
"dim1", null, null, null, false, ROUND),
+ new HllSketchBuildAggregatorFactory("hllsketch_dim3",
"dim3", null, null, null, false, false),
+ new HllSketchBuildAggregatorFactory("hllsketch_m1", "m1",
null, null, null, false, ROUND),
+ new HllSketchBuildAggregatorFactory("hllsketch_f1", "f1",
null, null, null, false, ROUND),
+ new HllSketchBuildAggregatorFactory("hllsketch_l1", "l1",
null, null, null, false, ROUND),
+ new HllSketchBuildAggregatorFactory("hllsketch_d1", "d1",
null, null, null, false, ROUND)
+ )
+ .withRollup(false)
+ .build()
+ )
+ .rows(TestDataBuilder.ROWS1_WITH_NUMERIC_DIMS)
+ .buildMMappedIndex();
return new SpecificSegmentsQuerySegmentWalker(conglomerate).add(
DataSegment.builder()
@@ -473,7 +473,7 @@ public class HllSketchSqlAggregatorTest extends
BaseCalciteQueryTest
GroupByQuery.builder()
.setInterval(querySegmentSpec(Filtration.eternity()))
.setDataSource(CalciteTests.DATASOURCE1)
- .setDimensions(dimensions(new
DefaultDimensionSpec("dim2", "d0")))
+ .setDimensions(dimensions(new
DefaultDimensionSpec("dim2", "_d0")))
.setGranularity(Granularities.ALL)
.setAggregatorSpecs(
aggregators(
@@ -518,7 +518,7 @@ public class HllSketchSqlAggregatorTest extends
BaseCalciteQueryTest
.setInterval(new
MultipleIntervalSegmentSpec(Collections.singletonList(Filtration.eternity())))
.setGranularity(Granularities.ALL)
.setVirtualColumns(VirtualColumns.create(EXPECTED_PA_VIRTUAL_COLUMNS))
- .setDimensions(new DefaultDimensionSpec("cnt", "d0",
ColumnType.LONG))
+ .setDimensions(new DefaultDimensionSpec("cnt", "_d0",
ColumnType.LONG))
.setAggregatorSpecs(EXPECTED_FILTERED_AGGREGATORS)
.setPostAggregatorSpecs(EXPECTED_FILTERED_POST_AGGREGATORS)
.setContext(QUERY_CONTEXT_DEFAULT)
@@ -613,7 +613,7 @@ public class HllSketchSqlAggregatorTest extends
BaseCalciteQueryTest
.setInterval(new
MultipleIntervalSegmentSpec(Collections.singletonList(Filtration.eternity())))
.setGranularity(Granularities.ALL)
.setVirtualColumns(VirtualColumns.create(EXPECTED_PA_VIRTUAL_COLUMNS))
- .setDimensions(new DefaultDimensionSpec("cnt", "d0",
ColumnType.LONG))
+ .setDimensions(new DefaultDimensionSpec("cnt", "_d0",
ColumnType.LONG))
.setAggregatorSpecs(EXPECTED_PA_AGGREGATORS)
.setPostAggregatorSpecs(EXPECTED_PA_POST_AGGREGATORS)
.setContext(QUERY_CONTEXT_DEFAULT)
@@ -898,7 +898,7 @@ public class HllSketchSqlAggregatorTest extends
BaseCalciteQueryTest
.setDimFilter(selector("dim2", "a", null))
.setGranularity(Granularities.ALL)
.setVirtualColumns(expressionVirtualColumn("v0",
"'a'", ColumnType.STRING))
- .setDimensions(new DefaultDimensionSpec("v0", "d0",
ColumnType.STRING))
+ .setDimensions(new DefaultDimensionSpec("v0", "_d0",
ColumnType.STRING))
.setAggregatorSpecs(
aggregators(
new FilteredAggregatorFactory(
@@ -957,7 +957,7 @@ public class HllSketchSqlAggregatorTest extends
BaseCalciteQueryTest
.setDimFilter(selector("dim2", "a", null))
.setGranularity(Granularities.ALL)
.setVirtualColumns(expressionVirtualColumn("v0",
"'a'", ColumnType.STRING))
- .setDimensions(new DefaultDimensionSpec("v0", "d0",
ColumnType.STRING))
+ .setDimensions(new DefaultDimensionSpec("v0", "_d0",
ColumnType.STRING))
.setAggregatorSpecs(
aggregators(
new FilteredAggregatorFactory(
@@ -982,30 +982,33 @@ public class HllSketchSqlAggregatorTest extends
BaseCalciteQueryTest
{
testQuery(
"SELECT"
- + " HLL_SKETCH_ESTIMATE(hllsketch_dim1)"
+ + " HLL_SKETCH_ESTIMATE(hllsketch_dim1),"
+ + " HLL_SKETCH_ESTIMATE(hllsketch_d1),"
+ + " HLL_SKETCH_ESTIMATE(hllsketch_l1),"
+ + " HLL_SKETCH_ESTIMATE(hllsketch_f1)"
+ " FROM druid.foo",
ImmutableList.of(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
- .virtualColumns(new ExpressionVirtualColumn(
- "v0",
- "hll_sketch_estimate(\"hllsketch_dim1\")",
- ColumnType.DOUBLE,
- MACRO_TABLE
- ))
+ .virtualColumns(
+ makeSketchEstimateExpression("v0", "hllsketch_dim1"),
+ makeSketchEstimateExpression("v1", "hllsketch_d1"),
+ makeSketchEstimateExpression("v2", "hllsketch_l1"),
+ makeSketchEstimateExpression("v3", "hllsketch_f1")
+ )
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
- .columns("v0")
+ .columns("v0", "v1", "v2", "v3")
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
- new Object[]{0.0D},
- new Object[]{1.0D},
- new Object[]{1.0D},
- new Object[]{1.0D},
- new Object[]{1.0D},
- new Object[]{1.0D}
+ new Object[]{0.0D, 1.0D, 1.0D, 1.0D},
+ new Object[]{1.0D, 1.0D, 1.0D, 1.0D},
+ new Object[]{1.0D, 1.0D, 1.0D, 1.0D},
+ new Object[]{1.0D, 0.0D, 0.0D, 0.0D},
+ new Object[]{1.0D, 0.0D, 0.0D, 0.0D},
+ new Object[]{1.0D, 0.0D, 0.0D, 0.0D}
)
);
}
@@ -1097,14 +1100,9 @@ public class HllSketchSqlAggregatorTest extends
BaseCalciteQueryTest
.setInterval(querySegmentSpec(Filtration.eternity()))
.setDataSource(CalciteTests.DATASOURCE1)
.setGranularity(Granularities.ALL)
- .setVirtualColumns(new ExpressionVirtualColumn(
- "v0",
- "hll_sketch_estimate(\"hllsketch_dim1\")",
- ColumnType.DOUBLE,
- MACRO_TABLE
- ))
+ .setVirtualColumns(makeSketchEstimateExpression("v0",
"hllsketch_dim1"))
.setDimensions(
- new DefaultDimensionSpec("v0", "d0",
ColumnType.DOUBLE))
+ new DefaultDimensionSpec("v0", "_d0",
ColumnType.DOUBLE))
.setAggregatorSpecs(
aggregators(
new CountAggregatorFactory("a0")
@@ -1146,13 +1144,8 @@ public class HllSketchSqlAggregatorTest extends
BaseCalciteQueryTest
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.granularity(Granularities.ALL)
- .dimension(new DefaultDimensionSpec("v0", "d0",
ColumnType.DOUBLE))
- .virtualColumns(new ExpressionVirtualColumn(
- "v0",
- "hll_sketch_estimate(\"hllsketch_dim1\")",
- ColumnType.DOUBLE,
- MACRO_TABLE
- ))
+ .dimension(new DefaultDimensionSpec("v0", "_d0",
ColumnType.DOUBLE))
+ .virtualColumns(makeSketchEstimateExpression("v0",
"hllsketch_dim1"))
.metric(new InvertedTopNMetricSpec(new
NumericTopNMetricSpec("a0")))
.threshold(2)
.aggregators(new CountAggregatorFactory("a0"))
@@ -1165,4 +1158,115 @@ public class HllSketchSqlAggregatorTest extends
BaseCalciteQueryTest
)
);
}
+
+ /**
+ * This is an extremely subtle test, so we explain with a comment. The `m1`
column in the input data looks like
+ * `["1.0", "2.0", "3.0", "4.0", "5.0", "6.0"]` while the `d1` column looks
like
+ * `[1.0, 1.7, 0.0]`. That is, "m1" is numbers-as-strings, while d1 is
numbers-as-numbers. If you take the
+ * uniques across both columns, you expect no overlap, so 9 entries.
However, if the `1.0` from `d1` gets
+ * converted into `"1.0"` or vice-versa, the result can become 8 because
then the sketch will hash the same
+ * value multiple times considering them duplicates. This test validates
that the aggregator properly builds
+ * the sketches preserving the initial type of the data as it came in.
Specifically, the test was added when
+ * a code change caused the 1.0 to get converted to a String such that the
resulting value of the query was 8
+ * instead of 9.
+ */
+ @Test
+ public void testEstimateStringAndDoubleAreDifferent()
+ {
+ testQuery(
+ "SELECT"
+ + " HLL_SKETCH_ESTIMATE(HLL_SKETCH_UNION(DS_HLL(hllsketch_d1),
DS_HLL(hllsketch_m1)), true)"
+ + " FROM druid.foo",
+ ImmutableList.of(
+ Druids.newTimeseriesQueryBuilder()
+ .dataSource(CalciteTests.DATASOURCE1)
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .granularity(Granularities.ALL)
+ .aggregators(
+ new HllSketchMergeAggregatorFactory("a0",
"hllsketch_d1", null, null, null, false, true),
+ new HllSketchMergeAggregatorFactory("a1",
"hllsketch_m1", null, null, null, false, true)
+ )
+ .postAggregators(
+ new HllSketchToEstimatePostAggregator(
+ "p3",
+ new HllSketchUnionPostAggregator(
+ "p2",
+ Arrays.asList(
+ new FieldAccessPostAggregator("p0", "a0"),
+ new FieldAccessPostAggregator("p1", "a1")
+ ),
+ null,
+ null
+ ),
+ true
+ )
+ )
+ .context(QUERY_CONTEXT_DEFAULT)
+ .build()
+ ),
+ ImmutableList.of(
+ new Object[]{9.0D}
+ )
+ );
+ }
+
+ /**
+ * This is a test in a similar vein to {@link
#testEstimateStringAndDoubleAreDifferent()} except here we are
+ * ensuring that float values and doubles values are considered equivalent.
The expected initial inputs were
+ * <p>
+ * 1. d1 -> [1.0, 1.7, 0.0]
+ * 2. f1 -> [1.0f, 0.1f, 0.0f]
+ * <p>
+ * If we assume that doubles and floats are the same, that means that there
are 4 unique values, not 6
+ */
+ @Test
+ public void testFloatAndDoubleAreConsideredTheSame()
+ {
+ // This is a test in a similar vein to
testEstimateStringAndDoubleAreDifferent above
+ testQuery(
+ "SELECT"
+ + " HLL_SKETCH_ESTIMATE(HLL_SKETCH_UNION(DS_HLL(hllsketch_d1),
DS_HLL(hllsketch_f1)), true)"
+ + " FROM druid.foo",
+ ImmutableList.of(
+ Druids.newTimeseriesQueryBuilder()
+ .dataSource(CalciteTests.DATASOURCE1)
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .granularity(Granularities.ALL)
+ .aggregators(
+ new HllSketchMergeAggregatorFactory("a0",
"hllsketch_d1", null, null, null, false, true),
+ new HllSketchMergeAggregatorFactory("a1",
"hllsketch_f1", null, null, null, false, true)
+ )
+ .postAggregators(
+ new HllSketchToEstimatePostAggregator(
+ "p3",
+ new HllSketchUnionPostAggregator(
+ "p2",
+ Arrays.asList(
+ new FieldAccessPostAggregator("p0", "a0"),
+ new FieldAccessPostAggregator("p1", "a1")
+ ),
+ null,
+ null
+ ),
+ true
+ )
+ )
+ .context(QUERY_CONTEXT_DEFAULT)
+ .build()
+ ),
+ ImmutableList.of(
+ new Object[]{4.0D}
+ )
+ );
+ }
+
+ private ExpressionVirtualColumn makeSketchEstimateExpression(String
outputName, String field)
+ {
+ return new ExpressionVirtualColumn(
+ outputName,
+ StringUtils.format("hll_sketch_estimate(\"%s\")", field),
+ ColumnType.DOUBLE,
+ MACRO_TABLE
+ );
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]