This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch 31.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/31.0.0 by this push:
new 1a7f91f0ab7 add substituteCombiningFactory implementations for
datasketches aggs (#17314) (#17323)
1a7f91f0ab7 is described below
commit 1a7f91f0ab77d7dbb992fe042425a9921c8d338a
Author: Kashif Faraz <[email protected]>
AuthorDate: Thu Oct 10 19:01:05 2024 +0530
add substituteCombiningFactory implementations for datasketches aggs
(#17314) (#17323)
Follow up to #17214, adds implementations for substituteCombiningFactory so
that more
datasketches aggs can match projections, along with some projections tests
for datasketches.
Co-authored-by: Clint Wylie <[email protected]>
---
.../hll/HllSketchAggregatorFactory.java | 8 +-
.../kll/KllSketchAggregatorFactory.java | 17 +
.../quantiles/DoublesSketchAggregatorFactory.java | 19 +
.../theta/SketchAggregatorFactory.java | 17 +
.../theta/SketchMergeAggregatorFactory.java | 20 +
.../ArrayOfDoublesSketchAggregatorFactory.java | 23 +
.../kll/KllDoublesSketchAggregatorFactoryTest.java | 18 +
.../DoublesSketchAggregatorFactoryTest.java | 15 +
.../theta/SketchAggregatorFactoryTest.java | 15 +
.../ArrayOfDoublesSketchAggregatorFactoryTest.java | 17 +
.../druid/segment/DatasketchesProjectionTest.java | 489 +++++++++++++++++++++
.../druid/segment/AggregateProjectionMetadata.java | 1 +
.../druid/segment/CursorFactoryProjectionTest.java | 6 +-
13 files changed, 658 insertions(+), 7 deletions(-)
diff --git
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactory.java
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactory.java
index fe60d3eda44..166f6f7f61f 100644
---
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactory.java
+++
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactory.java
@@ -289,10 +289,10 @@ public abstract class HllSketchAggregatorFactory extends
AggregatorFactory
return null;
}
HllSketchAggregatorFactory that = (HllSketchAggregatorFactory)
preAggregated;
- if (lgK == that.lgK && tgtHllType == that.tgtHllType && stringEncoding ==
that.stringEncoding && Objects.equals(
- fieldName,
- that.fieldName
- )) {
+ if (lgK <= that.lgK &&
+ stringEncoding == that.stringEncoding &&
+ Objects.equals(fieldName, that.fieldName)
+ ) {
return getCombiningFactory();
}
return null;
diff --git
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllSketchAggregatorFactory.java
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllSketchAggregatorFactory.java
index df181cc7dd9..04b6d2d2303 100644
---
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllSketchAggregatorFactory.java
+++
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllSketchAggregatorFactory.java
@@ -226,6 +226,23 @@ abstract class KllSketchAggregatorFactory<SketchType
extends KllSketch, ValueTyp
return new
CacheKeyBuilder(cacheTypeId).appendString(name).appendString(fieldName).appendInt(k).build();
}
+ @Nullable
+ @Override
+ public AggregatorFactory substituteCombiningFactory(AggregatorFactory
preAggregated)
+ {
+ if (this == preAggregated) {
+ return getCombiningFactory();
+ }
+ if (getClass() != preAggregated.getClass()) {
+ return null;
+ }
+ KllSketchAggregatorFactory<?, ?> that = (KllSketchAggregatorFactory<?, ?>)
preAggregated;
+ if (Objects.equals(fieldName, that.fieldName) && k == that.k &&
maxStreamLength <= that.maxStreamLength) {
+ return getCombiningFactory();
+ }
+ return null;
+ }
+
@Override
public boolean equals(final Object o)
{
diff --git
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactory.java
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactory.java
index 4778b950a17..acdef51178f 100644
---
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactory.java
+++
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactory.java
@@ -424,6 +424,25 @@ public class DoublesSketchAggregatorFactory extends
AggregatorFactory
return new
CacheKeyBuilder(cacheTypeId).appendString(name).appendString(fieldName).appendInt(k).build();
}
+ @Nullable
+ @Override
+ public AggregatorFactory substituteCombiningFactory(AggregatorFactory
preAggregated)
+ {
+ if (this == preAggregated) {
+ return getCombiningFactory();
+ }
+
+ if (getClass() != preAggregated.getClass()) {
+ return null;
+ }
+
+ DoublesSketchAggregatorFactory that = (DoublesSketchAggregatorFactory)
preAggregated;
+ if (k <= that.k && maxStreamLength <= that.getMaxStreamLength() &&
Objects.equals(fieldName, that.fieldName)) {
+ return getCombiningFactory();
+ }
+ return null;
+ }
+
@Override
public boolean equals(Object o)
{
diff --git
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java
index b24e382ec0a..5a2baec256c 100644
---
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java
+++
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java
@@ -49,6 +49,7 @@ import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
+import java.util.Objects;
public abstract class SketchAggregatorFactory extends AggregatorFactory
{
@@ -266,6 +267,22 @@ public abstract class SketchAggregatorFactory extends
AggregatorFactory
.array();
}
+ @Override
+ public AggregatorFactory substituteCombiningFactory(AggregatorFactory
preAggregated)
+ {
+ if (this == preAggregated) {
+ return getCombiningFactory();
+ }
+ if (getClass() != preAggregated.getClass()) {
+ return null;
+ }
+ SketchMergeAggregatorFactory that = (SketchMergeAggregatorFactory)
preAggregated;
+ if (Objects.equals(fieldName, that.fieldName) && size <= that.size) {
+ return getCombiningFactory();
+ }
+ return null;
+ }
+
@Override
public String toString()
{
diff --git
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchMergeAggregatorFactory.java
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchMergeAggregatorFactory.java
index addf76d38df..8a7dec444a9 100644
---
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchMergeAggregatorFactory.java
+++
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchMergeAggregatorFactory.java
@@ -29,6 +29,7 @@ import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.segment.column.ColumnType;
import javax.annotation.Nullable;
+import java.util.Objects;
public class SketchMergeAggregatorFactory extends SketchAggregatorFactory
{
@@ -165,6 +166,25 @@ public class SketchMergeAggregatorFactory extends
SketchAggregatorFactory
);
}
+ @Override
+ public AggregatorFactory substituteCombiningFactory(AggregatorFactory
preAggregated)
+ {
+ if (this == preAggregated) {
+ return getCombiningFactory();
+ }
+ if (getClass() != preAggregated.getClass()) {
+ return null;
+ }
+ SketchMergeAggregatorFactory that = (SketchMergeAggregatorFactory)
preAggregated;
+ if (Objects.equals(fieldName, that.fieldName) &&
+ size <= that.size &&
+ isInputThetaSketch == that.isInputThetaSketch
+ ) {
+ return getCombiningFactory();
+ }
+ return null;
+ }
+
@Override
public boolean equals(Object o)
{
diff --git
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchAggregatorFactory.java
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchAggregatorFactory.java
index 3fe939b4ca8..8f0adce1ec3 100644
---
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchAggregatorFactory.java
+++
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchAggregatorFactory.java
@@ -307,6 +307,29 @@ public class ArrayOfDoublesSketchAggregatorFactory extends
AggregatorFactory
return ColumnType.DOUBLE;
}
+ @Nullable
+ @Override
+ public AggregatorFactory substituteCombiningFactory(AggregatorFactory
preAggregated)
+ {
+ if (this == preAggregated) {
+ return getCombiningFactory();
+ }
+
+ if (getClass() != preAggregated.getClass()) {
+ return null;
+ }
+
+ ArrayOfDoublesSketchAggregatorFactory that =
(ArrayOfDoublesSketchAggregatorFactory) preAggregated;
+ if (nominalEntries <= that.nominalEntries &&
+ numberOfValues == that.numberOfValues &&
+ Objects.equals(fieldName, that.fieldName) &&
+ Objects.equals(metricColumns, that.metricColumns)
+ ) {
+ return getCombiningFactory();
+ }
+ return null;
+ }
+
@Override
public String toString()
{
diff --git
a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchAggregatorFactoryTest.java
b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchAggregatorFactoryTest.java
index c56598e4a72..8389c81ce51 100644
---
a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchAggregatorFactoryTest.java
+++
b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchAggregatorFactoryTest.java
@@ -153,4 +153,22 @@ public class KllDoublesSketchAggregatorFactoryTest
new TimeseriesQueryQueryToolChest().resultArraySignature(query)
);
}
+
+ @Test
+ public void testCanSubstitute()
+ {
+ AggregatorFactory sketch = new KllDoublesSketchAggregatorFactory("sketch",
"x", 200, null);
+ AggregatorFactory sketch2 = new KllDoublesSketchAggregatorFactory("other",
"x", 200, null);
+ AggregatorFactory sketch3 = new
KllDoublesSketchAggregatorFactory("sketch", "x", 200, 1_000L);
+ AggregatorFactory sketch4 = new
KllDoublesSketchAggregatorFactory("sketch", "y", 200, null);
+ AggregatorFactory sketch5 = new
KllDoublesSketchAggregatorFactory("sketch", "x", 300, null);
+
+ Assert.assertNotNull(sketch.substituteCombiningFactory(sketch2));
+ Assert.assertNotNull(sketch3.substituteCombiningFactory(sketch2));
+ Assert.assertNotNull(sketch3.substituteCombiningFactory(sketch));
+ Assert.assertNotNull(sketch2.substituteCombiningFactory(sketch));
+ Assert.assertNull(sketch.substituteCombiningFactory(sketch3));
+ Assert.assertNull(sketch.substituteCombiningFactory(sketch4));
+ Assert.assertNull(sketch.substituteCombiningFactory(sketch5));
+ }
}
diff --git
a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactoryTest.java
b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactoryTest.java
index f5a15cde242..9af69174eec 100644
---
a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactoryTest.java
+++
b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactoryTest.java
@@ -201,4 +201,19 @@ public class DoublesSketchAggregatorFactoryTest
ac.fold(new TestDoublesSketchColumnValueSelector());
Assert.assertNotNull(ac.getObject());
}
+
+ @Test
+ public void testCanSubstitute()
+ {
+ final DoublesSketchAggregatorFactory sketch = new
DoublesSketchAggregatorFactory("sketch", "x", 1024, 1000L, null);
+ final DoublesSketchAggregatorFactory sketch2 = new
DoublesSketchAggregatorFactory("other", "x", 1024, 2000L, null);
+ final DoublesSketchAggregatorFactory sketch3 = new
DoublesSketchAggregatorFactory("another", "x", 2048, 1000L, null);
+ final DoublesSketchAggregatorFactory incompatible = new
DoublesSketchAggregatorFactory("incompatible", "y", 1024, 1000L, null);
+
+ Assert.assertNotNull(sketch.substituteCombiningFactory(sketch2));
+ Assert.assertNotNull(sketch.substituteCombiningFactory(sketch3));
+ Assert.assertNull(sketch2.substituteCombiningFactory(sketch3));
+ Assert.assertNull(sketch.substituteCombiningFactory(incompatible));
+ Assert.assertNull(sketch3.substituteCombiningFactory(sketch));
+ }
}
diff --git
a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactoryTest.java
b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactoryTest.java
index 23887652a73..1b2565dde29 100644
---
a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactoryTest.java
+++
b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactoryTest.java
@@ -24,6 +24,7 @@ import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.Druids;
import org.apache.druid.query.aggregation.AggregatorAndSize;
+import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import
org.apache.druid.query.aggregation.datasketches.theta.oldapi.OldSketchBuildAggregatorFactory;
import
org.apache.druid.query.aggregation.datasketches.theta.oldapi.OldSketchMergeAggregatorFactory;
@@ -213,4 +214,18 @@ public class SketchAggregatorFactoryTest
Throwable exception = Assert.assertThrows(DruidException.class, () ->
AGGREGATOR_16384.factorizeVector(vectorFactory));
Assert.assertEquals("Unsupported input [x] of type [COMPLEX<json>] for
aggregator [COMPLEX<thetaSketchBuild>].", exception.getMessage());
}
+
+ @Test
+ public void testCanSubstitute()
+ {
+ AggregatorFactory sketch1 = new SketchMergeAggregatorFactory("sketch",
"x", 16, true, false, 2);
+ AggregatorFactory sketch2 = new SketchMergeAggregatorFactory("other", "x",
null, false, false, null);
+ AggregatorFactory sketch3 = new SketchMergeAggregatorFactory("sketch",
"x", null, false, false, 3);
+ AggregatorFactory sketch4 = new SketchMergeAggregatorFactory("sketch",
"y", null, false, false, null);
+
+ Assert.assertNotNull(sketch1.substituteCombiningFactory(sketch2));
+ Assert.assertNotNull(sketch1.substituteCombiningFactory(sketch3));
+ Assert.assertNull(sketch1.substituteCombiningFactory(sketch4));
+ Assert.assertNull(sketch2.substituteCombiningFactory(sketch1));
+ }
}
diff --git
a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchAggregatorFactoryTest.java
b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchAggregatorFactoryTest.java
index 10f2afe2248..d62c6c61ce7 100644
---
a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchAggregatorFactoryTest.java
+++
b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchAggregatorFactoryTest.java
@@ -118,4 +118,21 @@ public class ArrayOfDoublesSketchAggregatorFactoryTest
Assert.assertEquals(factory, factory.withName("name"));
Assert.assertEquals("newTest", factory.withName("newTest").getName());
}
+
+ @Test
+ public void testCanSubstitute()
+ {
+ AggregatorFactory sketch = new
ArrayOfDoublesSketchAggregatorFactory("sketch", "x", null, null, null);
+ AggregatorFactory sketch2 = new
ArrayOfDoublesSketchAggregatorFactory("sketch2", "x", null, null, null);
+ AggregatorFactory other = new
ArrayOfDoublesSketchAggregatorFactory("other", "x", 8192, null, null);
+ AggregatorFactory incompatible = new
ArrayOfDoublesSketchAggregatorFactory("incompatible", "x", 2048, null, null);
+ AggregatorFactory incompatible2 = new
ArrayOfDoublesSketchAggregatorFactory("sketch", "y", null, null, null);
+ Assert.assertNotNull(sketch.substituteCombiningFactory(other));
+ Assert.assertNotNull(sketch.substituteCombiningFactory(sketch2));
+ Assert.assertNull(sketch.substituteCombiningFactory(incompatible));
+ Assert.assertNotNull(sketch.substituteCombiningFactory(sketch));
+ Assert.assertNull(other.substituteCombiningFactory(sketch));
+ Assert.assertNull(sketch.substituteCombiningFactory(incompatible2));
+ Assert.assertNull(other.substituteCombiningFactory(incompatible2));
+ }
}
diff --git
a/extensions-core/datasketches/src/test/java/org/apache/druid/segment/DatasketchesProjectionTest.java
b/extensions-core/datasketches/src/test/java/org/apache/druid/segment/DatasketchesProjectionTest.java
new file mode 100644
index 00000000000..9b0af45ef19
--- /dev/null
+++
b/extensions-core/datasketches/src/test/java/org/apache/druid/segment/DatasketchesProjectionTest.java
@@ -0,0 +1,489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.datasketches.common.Family;
+import org.apache.datasketches.hll.HllSketch;
+import org.apache.datasketches.kll.KllDoublesSketch;
+import org.apache.datasketches.quantiles.DoublesSketch;
+import org.apache.datasketches.theta.SetOperation;
+import org.apache.datasketches.theta.Union;
+import org.apache.datasketches.thetacommon.ThetaUtil;
+import org.apache.datasketches.tuple.arrayofdoubles.ArrayOfDoublesSketch;
+import
org.apache.datasketches.tuple.arrayofdoubles.ArrayOfDoublesUpdatableSketch;
+import
org.apache.datasketches.tuple.arrayofdoubles.ArrayOfDoublesUpdatableSketchBuilder;
+import org.apache.druid.collections.CloseableDefaultBlockingPool;
+import org.apache.druid.collections.CloseableStupidPool;
+import org.apache.druid.collections.NonBlockingPool;
+import org.apache.druid.data.input.impl.AggregateProjectionSpec;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.DoubleDimensionSchema;
+import org.apache.druid.data.input.impl.FloatDimensionSchema;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.query.DruidProcessingConfig;
+import org.apache.druid.query.QueryContexts;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import
org.apache.druid.query.aggregation.datasketches.hll.HllSketchBuildAggregatorFactory;
+import org.apache.druid.query.aggregation.datasketches.hll.HllSketchHolder;
+import org.apache.druid.query.aggregation.datasketches.hll.HllSketchModule;
+import
org.apache.druid.query.aggregation.datasketches.kll.KllDoublesSketchAggregatorFactory;
+import org.apache.druid.query.aggregation.datasketches.kll.KllSketchModule;
+import
org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchAggregatorFactory;
+import
org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchModule;
+import org.apache.druid.query.aggregation.datasketches.theta.SketchHolder;
+import
org.apache.druid.query.aggregation.datasketches.theta.SketchMergeAggregatorFactory;
+import org.apache.druid.query.aggregation.datasketches.theta.SketchModule;
+import
org.apache.druid.query.aggregation.datasketches.tuple.ArrayOfDoublesSketchAggregatorFactory;
+import
org.apache.druid.query.aggregation.datasketches.tuple.ArrayOfDoublesSketchModule;
+import org.apache.druid.query.groupby.GroupByQuery;
+import org.apache.druid.query.groupby.GroupByQueryConfig;
+import org.apache.druid.query.groupby.GroupByResourcesReservationPool;
+import org.apache.druid.query.groupby.GroupingEngine;
+import org.apache.druid.query.groupby.ResultRow;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.IncrementalIndexCursorFactory;
+import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * like {@link CursorFactoryProjectionTest} but for sketch aggs
+ */
+@RunWith(Parameterized.class)
+public class DatasketchesProjectionTest extends InitializedNullHandlingTest
+{
+ private static final Closer CLOSER = Closer.create();
+
+ private static final List<AggregateProjectionSpec> PROJECTIONS =
Collections.singletonList(
+ new AggregateProjectionSpec(
+ "a_projection",
+ VirtualColumns.create(
+ Granularities.toVirtualColumn(Granularities.HOUR, "__gran")
+ ),
+ Arrays.asList(
+ new LongDimensionSchema("__gran"),
+ new StringDimensionSchema("a")
+ ),
+ new AggregatorFactory[]{
+ new HllSketchBuildAggregatorFactory("_b_hll", "b", null, null,
null, null, false),
+ new SketchMergeAggregatorFactory("_b_theta", "b", null, null,
false, null),
+ new DoublesSketchAggregatorFactory("_d_doubles", "d", null),
+ new ArrayOfDoublesSketchAggregatorFactory("_bcd_aod", "b", null,
Arrays.asList("c", "d"), null),
+ new KllDoublesSketchAggregatorFactory("_d_kll", "d", null, null)
+ }
+ )
+ );
+
+ private static final List<AggregateProjectionSpec> AUTO_PROJECTIONS =
PROJECTIONS.stream().map(projection -> {
+ return new AggregateProjectionSpec(
+ projection.getName(),
+ projection.getVirtualColumns(),
+ projection.getGroupingColumns()
+ .stream()
+ .map(x -> new AutoTypeColumnSchema(x.getName(), null))
+ .collect(Collectors.toList()),
+ projection.getAggregators()
+ );
+ }).collect(Collectors.toList());
+
+ @Parameterized.Parameters(name = "name: {0}, sortByDim: {3}, autoSchema:
{4}")
+ public static Collection<?> constructorFeeder()
+ {
+ HllSketchModule.registerSerde();
+ TestHelper.JSON_MAPPER.registerModules(new
HllSketchModule().getJacksonModules());
+ SketchModule.registerSerde();
+ TestHelper.JSON_MAPPER.registerModules(new
SketchModule().getJacksonModules());
+ KllSketchModule.registerSerde();
+ TestHelper.JSON_MAPPER.registerModules(new
KllSketchModule().getJacksonModules());
+ DoublesSketchModule.registerSerde();
+ TestHelper.JSON_MAPPER.registerModules(new
DoublesSketchModule().getJacksonModules());
+ ArrayOfDoublesSketchModule.registerSerde();
+ TestHelper.JSON_MAPPER.registerModules(new
ArrayOfDoublesSketchModule().getJacksonModules());
+
+ final List<Object[]> constructors = new ArrayList<>();
+ final DimensionsSpec.Builder dimensionsBuilder =
+ DimensionsSpec.builder()
+ .setDimensions(
+ Arrays.asList(
+ new StringDimensionSchema("a"),
+ new StringDimensionSchema("b"),
+ new LongDimensionSchema("c"),
+ new DoubleDimensionSchema("d"),
+ new FloatDimensionSchema("e")
+ )
+ );
+ DimensionsSpec dimsTimeOrdered = dimensionsBuilder.build();
+ DimensionsSpec dimsOrdered =
dimensionsBuilder.setForceSegmentSortByTime(false).build();
+
+
+ List<DimensionSchema> autoDims = dimsOrdered.getDimensions()
+ .stream()
+ .map(x -> new
AutoTypeColumnSchema(x.getName(), null))
+ .collect(Collectors.toList());
+ for (boolean incremental : new boolean[]{true, false}) {
+ for (boolean sortByDim : new boolean[]{true, false}) {
+ for (boolean autoSchema : new boolean[]{true, false}) {
+ final DimensionsSpec dims;
+ if (sortByDim) {
+ if (autoSchema) {
+ dims = dimsOrdered.withDimensions(autoDims);
+ } else {
+ dims = dimsOrdered;
+ }
+ } else {
+ if (autoSchema) {
+ dims = dimsTimeOrdered.withDimensions(autoDims);
+ } else {
+ dims = dimsTimeOrdered;
+ }
+ }
+ if (incremental) {
+ IncrementalIndex index = CLOSER.register(makeBuilder(dims,
autoSchema).buildIncrementalIndex());
+ constructors.add(new Object[]{
+ "incrementalIndex",
+ new IncrementalIndexCursorFactory(index),
+ new IncrementalIndexTimeBoundaryInspector(index),
+ sortByDim,
+ autoSchema
+ });
+ } else {
+ QueryableIndex index = CLOSER.register(makeBuilder(dims,
autoSchema).buildMMappedIndex());
+ constructors.add(new Object[]{
+ "queryableIndex",
+ new QueryableIndexCursorFactory(index),
+ QueryableIndexTimeBoundaryInspector.create(index),
+ sortByDim,
+ autoSchema
+ });
+ }
+ }
+ }
+ }
+ return constructors;
+ }
+
+ @AfterClass
+ public static void cleanup() throws IOException
+ {
+ CLOSER.close();
+ }
+
+ private static IndexBuilder makeBuilder(DimensionsSpec dimensionsSpec,
boolean autoSchema)
+ {
+ File tmp = FileUtils.createTempDir();
+ CLOSER.register(tmp::delete);
+ return IndexBuilder.create()
+ .tmpDir(tmp)
+ .schema(
+ IncrementalIndexSchema.builder()
+
.withDimensionsSpec(dimensionsSpec)
+ .withRollup(false)
+
.withMinTimestamp(CursorFactoryProjectionTest.TIMESTAMP.getMillis())
+ .withProjections(autoSchema ?
AUTO_PROJECTIONS : PROJECTIONS)
+ .build()
+ )
+ .rows(CursorFactoryProjectionTest.ROWS);
+ }
+
+ public final CursorFactory projectionsCursorFactory;
+ public final TimeBoundaryInspector projectionsTimeBoundaryInspector;
+
+ private final GroupingEngine groupingEngine;
+
+ private final NonBlockingPool<ByteBuffer> nonBlockingPool;
+ public final boolean sortByDim;
+ public final boolean autoSchema;
+
+ @Rule
+ public final CloserRule closer = new CloserRule(false);
+
+ public DatasketchesProjectionTest(
+ String name,
+ CursorFactory projectionsCursorFactory,
+ TimeBoundaryInspector projectionsTimeBoundaryInspector,
+ boolean sortByDim,
+ boolean autoSchema
+ )
+ {
+ this.projectionsCursorFactory = projectionsCursorFactory;
+ this.projectionsTimeBoundaryInspector = projectionsTimeBoundaryInspector;
+ this.sortByDim = sortByDim;
+ this.autoSchema = autoSchema;
+ this.nonBlockingPool = closer.closeLater(
+ new CloseableStupidPool<>(
+ "GroupByQueryEngine-bufferPool",
+ () -> ByteBuffer.allocate(1 << 24)
+ )
+ );
+ this.groupingEngine = new GroupingEngine(
+ new DruidProcessingConfig(),
+ GroupByQueryConfig::new,
+ new GroupByResourcesReservationPool(
+ closer.closeLater(
+ new CloseableDefaultBlockingPool<>(
+ () -> ByteBuffer.allocate(1 << 24),
+ 5
+ )
+ ),
+ new GroupByQueryConfig()
+ ),
+ TestHelper.makeJsonMapper(),
+ TestHelper.makeSmileMapper(),
+ (query, future) -> {
+ }
+ );
+ }
+
+ @Test
+ public void testProjectionSingleDim()
+ {
+ // test can use the single dimension projection
+ final GroupByQuery query =
+ GroupByQuery.builder()
+ .setDataSource("test")
+ .setGranularity(Granularities.ALL)
+ .setInterval(Intervals.ETERNITY)
+ .addDimension("a")
+ .setAggregatorSpecs(
+ new HllSketchBuildAggregatorFactory("b_distinct", "b",
null, null, null, true, true),
+ new SketchMergeAggregatorFactory("b_distinct_theta",
"b", null, null, null, null),
+ new DoublesSketchAggregatorFactory("d_doubles", "d",
null, null, null),
+ new ArrayOfDoublesSketchAggregatorFactory("b_doubles",
"b", null, Arrays.asList("c", "d"), null),
+ new KllDoublesSketchAggregatorFactory("d", "d", null,
null)
+ )
+ .build();
+ final CursorBuildSpec buildSpec =
GroupingEngine.makeCursorBuildSpec(query, null);
+ try (final CursorHolder cursorHolder =
projectionsCursorFactory.makeCursorHolder(buildSpec)) {
+ final Cursor cursor = cursorHolder.asCursor();
+ int rowCount = 0;
+ while (!cursor.isDone()) {
+ rowCount++;
+ cursor.advance();
+ }
+ Assert.assertEquals(3, rowCount);
+ }
+ final Sequence<ResultRow> resultRows = groupingEngine.process(
+ query,
+ projectionsCursorFactory,
+ projectionsTimeBoundaryInspector,
+ nonBlockingPool,
+ null
+ );
+ final List<ResultRow> results = resultRows.toList();
+ Assert.assertEquals(2, results.size());
+ List<Object[]> expectedResults = getSingleDimExpected();
+ final RowSignature querySignature =
query.getResultRowSignature(RowSignature.Finalization.NO);
+ for (int i = 0; i < expectedResults.size(); i++) {
+ assertResults(
+ expectedResults.get(i),
+ results.get(i).getArray(),
+ querySignature
+ );
+ }
+ }
+
+ @Test
+ public void testProjectionSingleDimNoProjections()
+ {
+ // test can use the single dimension projection
+ final GroupByQuery query =
+ GroupByQuery.builder()
+ .setDataSource("test")
+ .setGranularity(Granularities.ALL)
+ .setInterval(Intervals.ETERNITY)
+ .addDimension("a")
+ .setAggregatorSpecs(
+ new HllSketchBuildAggregatorFactory("b_distinct", "b",
null, null, null, true, true),
+ new SketchMergeAggregatorFactory("b_distinct_theta",
"b", null, null, null, null),
+ new DoublesSketchAggregatorFactory("d_doubles", "d",
null, null, null),
+ new ArrayOfDoublesSketchAggregatorFactory("b_doubles",
"b", null, Arrays.asList("c", "d"), null),
+ new KllDoublesSketchAggregatorFactory("d", "d", null,
null)
+ )
+ .setContext(ImmutableMap.of(QueryContexts.NO_PROJECTIONS,
true))
+ .build();
+ final CursorBuildSpec buildSpec =
GroupingEngine.makeCursorBuildSpec(query, null);
+ try (final CursorHolder cursorHolder =
projectionsCursorFactory.makeCursorHolder(buildSpec)) {
+ final Cursor cursor = cursorHolder.asCursor();
+ int rowCount = 0;
+ while (!cursor.isDone()) {
+ rowCount++;
+ cursor.advance();
+ }
+ Assert.assertEquals(8, rowCount);
+ }
+ final Sequence<ResultRow> resultRows = groupingEngine.process(
+ query,
+ projectionsCursorFactory,
+ projectionsTimeBoundaryInspector,
+ nonBlockingPool,
+ null
+ );
+ final List<ResultRow> results = resultRows.toList();
+ Assert.assertEquals(2, results.size());
+ List<Object[]> expectedResults = getSingleDimExpected();
+ final RowSignature querySignature =
query.getResultRowSignature(RowSignature.Finalization.NO);
+ for (int i = 0; i < expectedResults.size(); i++) {
+ assertResults(
+ expectedResults.get(i),
+ results.get(i).getArray(),
+ querySignature
+ );
+ }
+ }
+
+ private List<Object[]> getSingleDimExpected()
+ {
+ HllSketch hll1 = new HllSketch(HllSketch.DEFAULT_LG_K);
+ Union theta1 = (Union) SetOperation.builder().build(Family.UNION);
+ DoublesSketch d1 =
DoublesSketch.builder().setK(DoublesSketchAggregatorFactory.DEFAULT_K).build();
+ ArrayOfDoublesUpdatableSketch ad1 = new
ArrayOfDoublesUpdatableSketchBuilder().setNominalEntries(ThetaUtil.DEFAULT_NOMINAL_ENTRIES)
+
.setNumberOfValues(2)
+
.build();
+ KllDoublesSketch kll1 = KllDoublesSketch.newHeapInstance();
+ hll1.update("aa");
+ hll1.update("bb");
+ hll1.update("cc");
+ hll1.update("dd");
+ theta1.update("aa");
+ theta1.update("bb");
+ theta1.update("cc");
+ theta1.update("dd");
+ d1.update(1.0);
+ d1.update(1.1);
+ d1.update(2.2);
+ d1.update(1.1);
+ d1.update(2.2);
+ ad1.update("aa", new double[]{1.0, 1.0});
+ ad1.update("bb", new double[]{1.0, 1.1});
+ ad1.update("cc", new double[]{2.0, 2.2});
+ ad1.update("aa", new double[]{1.0, 1.1});
+ ad1.update("dd", new double[]{2.0, 2.2});
+ kll1.update(1.0);
+ kll1.update(1.1);
+ kll1.update(2.2);
+ kll1.update(1.1);
+ kll1.update(2.2);
+ HllSketch hll2 = new HllSketch(HllSketch.DEFAULT_LG_K);
+ Union theta2 = (Union) SetOperation.builder().build(Family.UNION);
+ DoublesSketch d2 =
DoublesSketch.builder().setK(DoublesSketchAggregatorFactory.DEFAULT_K).build();
+ ArrayOfDoublesUpdatableSketch ad2 = new
ArrayOfDoublesUpdatableSketchBuilder().setNominalEntries(ThetaUtil.DEFAULT_NOMINAL_ENTRIES)
+
.setNumberOfValues(2)
+
.build();
+ KllDoublesSketch kll2 = KllDoublesSketch.newHeapInstance();
+ hll2.update("aa");
+ hll2.update("bb");
+ theta2.update("aa");
+ theta2.update("bb");
+ d2.update(3.3);
+ d2.update(4.4);
+ d2.update(5.5);
+ ad2.update("aa", new double[]{3.0, 3.3});
+ ad2.update("aa", new double[]{4.0, 4.4});
+ ad2.update("bb", new double[]{5.0, 5.5});
+ kll2.update(3.3);
+ kll2.update(4.4);
+ kll2.update(5.5);
+
+ return Arrays.asList(
+ new Object[]{"a", HllSketchHolder.of(hll1), SketchHolder.of(theta1),
d1, ad1, kll1},
+ new Object[]{"b", HllSketchHolder.of(hll2), SketchHolder.of(theta2),
d2, ad2, kll2}
+ );
+
+ }
+
+ private void assertResults(Object[] expected, Object[] actual, RowSignature
signature)
+ {
+ Assert.assertEquals(expected.length, actual.length);
+ for (int i = 0; i < expected.length; i++) {
+ if
(signature.getColumnType(i).get().equals(ColumnType.ofComplex(HllSketchModule.BUILD_TYPE_NAME)))
{
+ Assert.assertEquals(
+ ((HllSketchHolder) expected[i]).getEstimate(),
+ ((HllSketchHolder) actual[i]).getEstimate(),
+ 0.01
+ );
+ } else if
(signature.getColumnType(i).get().equals(DoublesSketchModule.TYPE)) {
+ Assert.assertEquals(
+ ((DoublesSketch) expected[i]).getMinItem(),
+ ((DoublesSketch) actual[i]).getMinItem(),
+ 0.01
+ );
+ Assert.assertEquals(
+ ((DoublesSketch) expected[i]).getMaxItem(),
+ ((DoublesSketch) actual[i]).getMaxItem(),
+ 0.01
+ );
+ } else if
(signature.getColumnType(i).get().equals(ArrayOfDoublesSketchModule.BUILD_TYPE))
{
+ Assert.assertEquals(
+ ((ArrayOfDoublesSketch) expected[i]).getEstimate(),
+ ((ArrayOfDoublesSketch) actual[i]).getEstimate(),
+ 0.01
+ );
+ Assert.assertEquals(
+ ((ArrayOfDoublesSketch) expected[i]).getLowerBound(0),
+ ((ArrayOfDoublesSketch) actual[i]).getLowerBound(0),
+ 0.01
+ );
+ Assert.assertEquals(
+ ((ArrayOfDoublesSketch) expected[i]).getUpperBound(0),
+ ((ArrayOfDoublesSketch) actual[i]).getUpperBound(0),
+ 0.01
+ );
+ } else if
(signature.getColumnType(i).get().equals(KllSketchModule.DOUBLES_TYPE)) {
+ Assert.assertEquals(
+ ((KllDoublesSketch) expected[i]).getMinItem(),
+ ((KllDoublesSketch) actual[i]).getMinItem(),
+ 0.01
+ );
+ Assert.assertEquals(
+ ((KllDoublesSketch) expected[i]).getMaxItem(),
+ ((KllDoublesSketch) actual[i]).getMaxItem(),
+ 0.01
+ );
+ } else {
+ Assert.assertEquals(expected[i], actual[i]);
+ }
+ }
+ }
+}
diff --git
a/processing/src/main/java/org/apache/druid/segment/AggregateProjectionMetadata.java
b/processing/src/main/java/org/apache/druid/segment/AggregateProjectionMetadata.java
index 7712cecb303..1b63d9cb1df 100644
---
a/processing/src/main/java/org/apache/druid/segment/AggregateProjectionMetadata.java
+++
b/processing/src/main/java/org/apache/druid/segment/AggregateProjectionMetadata.java
@@ -332,6 +332,7 @@ public class AggregateProjectionMetadata
if (combining != null) {
matchBuilder.remapColumn(queryAgg.getName(),
projectionAgg.getName()).addPreAggregatedAggregator(combining);
foundMatch = true;
+ break;
}
}
allMatch = allMatch && foundMatch;
diff --git
a/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java
b/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java
index 446e714f997..7eae70cc1f4 100644
---
a/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java
@@ -99,15 +99,15 @@ import java.util.stream.Collectors;
public class CursorFactoryProjectionTest extends InitializedNullHandlingTest
{
private static final Closer CLOSER = Closer.create();
- private static final DateTime TIMESTAMP =
Granularities.DAY.bucket(DateTimes.nowUtc()).getStart();
+ static final DateTime TIMESTAMP =
Granularities.DAY.bucket(DateTimes.nowUtc()).getStart();
- private static final RowSignature ROW_SIGNATURE = RowSignature.builder()
+ static final RowSignature ROW_SIGNATURE = RowSignature.builder()
.add("a",
ColumnType.STRING)
.add("b",
ColumnType.STRING)
.add("c",
ColumnType.LONG)
.add("d",
ColumnType.DOUBLE)
.build();
- private static final List<InputRow> ROWS = Arrays.asList(
+ static final List<InputRow> ROWS = Arrays.asList(
new ListBasedInputRow(
ROW_SIGNATURE,
TIMESTAMP,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]