This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch 31.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/31.0.0 by this push:
     new 1a7f91f0ab7 add substituteCombiningFactory implementations for 
datasketches aggs (#17314) (#17323)
1a7f91f0ab7 is described below

commit 1a7f91f0ab77d7dbb992fe042425a9921c8d338a
Author: Kashif Faraz <[email protected]>
AuthorDate: Thu Oct 10 19:01:05 2024 +0530

    add substituteCombiningFactory implementations for datasketches aggs 
(#17314) (#17323)
    
    Follow up to #17214, adds implementations for substituteCombiningFactory so 
that more
    datasketches aggs can match projections, along with some projections tests 
for datasketches.
    
    Co-authored-by: Clint Wylie <[email protected]>
---
 .../hll/HllSketchAggregatorFactory.java            |   8 +-
 .../kll/KllSketchAggregatorFactory.java            |  17 +
 .../quantiles/DoublesSketchAggregatorFactory.java  |  19 +
 .../theta/SketchAggregatorFactory.java             |  17 +
 .../theta/SketchMergeAggregatorFactory.java        |  20 +
 .../ArrayOfDoublesSketchAggregatorFactory.java     |  23 +
 .../kll/KllDoublesSketchAggregatorFactoryTest.java |  18 +
 .../DoublesSketchAggregatorFactoryTest.java        |  15 +
 .../theta/SketchAggregatorFactoryTest.java         |  15 +
 .../ArrayOfDoublesSketchAggregatorFactoryTest.java |  17 +
 .../druid/segment/DatasketchesProjectionTest.java  | 489 +++++++++++++++++++++
 .../druid/segment/AggregateProjectionMetadata.java |   1 +
 .../druid/segment/CursorFactoryProjectionTest.java |   6 +-
 13 files changed, 658 insertions(+), 7 deletions(-)

diff --git 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactory.java
 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactory.java
index fe60d3eda44..166f6f7f61f 100644
--- 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactory.java
+++ 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactory.java
@@ -289,10 +289,10 @@ public abstract class HllSketchAggregatorFactory extends 
AggregatorFactory
       return null;
     }
     HllSketchAggregatorFactory that = (HllSketchAggregatorFactory) 
preAggregated;
-    if (lgK == that.lgK && tgtHllType == that.tgtHllType && stringEncoding == 
that.stringEncoding && Objects.equals(
-        fieldName,
-        that.fieldName
-    )) {
+    if (lgK <= that.lgK &&
+        stringEncoding == that.stringEncoding &&
+        Objects.equals(fieldName, that.fieldName)
+    ) {
       return getCombiningFactory();
     }
     return null;
diff --git 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllSketchAggregatorFactory.java
 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllSketchAggregatorFactory.java
index df181cc7dd9..04b6d2d2303 100644
--- 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllSketchAggregatorFactory.java
+++ 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllSketchAggregatorFactory.java
@@ -226,6 +226,23 @@ abstract class KllSketchAggregatorFactory<SketchType 
extends KllSketch, ValueTyp
     return new 
CacheKeyBuilder(cacheTypeId).appendString(name).appendString(fieldName).appendInt(k).build();
   }
 
+  @Nullable
+  @Override
+  public AggregatorFactory substituteCombiningFactory(AggregatorFactory 
preAggregated)
+  {
+    if (this == preAggregated) {
+      return getCombiningFactory();
+    }
+    if (getClass() != preAggregated.getClass()) {
+      return null;
+    }
+    KllSketchAggregatorFactory<?, ?> that = (KllSketchAggregatorFactory<?, ?>) 
preAggregated;
+    if (Objects.equals(fieldName, that.fieldName) && k == that.k && 
maxStreamLength <= that.maxStreamLength) {
+      return getCombiningFactory();
+    }
+    return null;
+  }
+
   @Override
   public boolean equals(final Object o)
   {
diff --git 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactory.java
 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactory.java
index 4778b950a17..acdef51178f 100644
--- 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactory.java
+++ 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactory.java
@@ -424,6 +424,25 @@ public class DoublesSketchAggregatorFactory extends 
AggregatorFactory
     return new 
CacheKeyBuilder(cacheTypeId).appendString(name).appendString(fieldName).appendInt(k).build();
   }
 
+  @Nullable
+  @Override
+  public AggregatorFactory substituteCombiningFactory(AggregatorFactory 
preAggregated)
+  {
+    if (this == preAggregated) {
+      return getCombiningFactory();
+    }
+
+    if (getClass() != preAggregated.getClass()) {
+      return null;
+    }
+
+    DoublesSketchAggregatorFactory that = (DoublesSketchAggregatorFactory) 
preAggregated;
+    if (k <= that.k && maxStreamLength <= that.getMaxStreamLength() && 
Objects.equals(fieldName, that.fieldName)) {
+      return getCombiningFactory();
+    }
+    return null;
+  }
+
   @Override
   public boolean equals(Object o)
   {
diff --git 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java
 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java
index b24e382ec0a..5a2baec256c 100644
--- 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java
+++ 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java
@@ -49,6 +49,7 @@ import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Objects;
 
 public abstract class SketchAggregatorFactory extends AggregatorFactory
 {
@@ -266,6 +267,22 @@ public abstract class SketchAggregatorFactory extends 
AggregatorFactory
                      .array();
   }
 
+  @Override
+  public AggregatorFactory substituteCombiningFactory(AggregatorFactory 
preAggregated)
+  {
+    if (this == preAggregated) {
+      return getCombiningFactory();
+    }
+    if (getClass() != preAggregated.getClass()) {
+      return null;
+    }
+    SketchMergeAggregatorFactory that = (SketchMergeAggregatorFactory) 
preAggregated;
+    if (Objects.equals(fieldName, that.fieldName) && size <= that.size) {
+      return getCombiningFactory();
+    }
+    return null;
+  }
+
   @Override
   public String toString()
   {
diff --git 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchMergeAggregatorFactory.java
 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchMergeAggregatorFactory.java
index addf76d38df..8a7dec444a9 100644
--- 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchMergeAggregatorFactory.java
+++ 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchMergeAggregatorFactory.java
@@ -29,6 +29,7 @@ import org.apache.druid.query.aggregation.AggregatorUtil;
 import org.apache.druid.segment.column.ColumnType;
 
 import javax.annotation.Nullable;
+import java.util.Objects;
 
 public class SketchMergeAggregatorFactory extends SketchAggregatorFactory
 {
@@ -165,6 +166,25 @@ public class SketchMergeAggregatorFactory extends 
SketchAggregatorFactory
     );
   }
 
+  @Override
+  public AggregatorFactory substituteCombiningFactory(AggregatorFactory 
preAggregated)
+  {
+    if (this == preAggregated) {
+      return getCombiningFactory();
+    }
+    if (getClass() != preAggregated.getClass()) {
+      return null;
+    }
+    SketchMergeAggregatorFactory that = (SketchMergeAggregatorFactory) 
preAggregated;
+    if (Objects.equals(fieldName, that.fieldName) &&
+        size <= that.size &&
+        isInputThetaSketch == that.isInputThetaSketch
+    ) {
+      return getCombiningFactory();
+    }
+    return null;
+  }
+
   @Override
   public boolean equals(Object o)
   {
diff --git 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchAggregatorFactory.java
 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchAggregatorFactory.java
index 3fe939b4ca8..8f0adce1ec3 100644
--- 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchAggregatorFactory.java
+++ 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchAggregatorFactory.java
@@ -307,6 +307,29 @@ public class ArrayOfDoublesSketchAggregatorFactory extends 
AggregatorFactory
     return ColumnType.DOUBLE;
   }
 
+  @Nullable
+  @Override
+  public AggregatorFactory substituteCombiningFactory(AggregatorFactory 
preAggregated)
+  {
+    if (this == preAggregated) {
+      return getCombiningFactory();
+    }
+
+    if (getClass() != preAggregated.getClass()) {
+      return null;
+    }
+
+    ArrayOfDoublesSketchAggregatorFactory that = 
(ArrayOfDoublesSketchAggregatorFactory) preAggregated;
+    if (nominalEntries <= that.nominalEntries &&
+        numberOfValues == that.numberOfValues &&
+        Objects.equals(fieldName, that.fieldName) &&
+        Objects.equals(metricColumns, that.metricColumns)
+    ) {
+      return getCombiningFactory();
+    }
+    return null;
+  }
+
   @Override
   public String toString()
   {
diff --git 
a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchAggregatorFactoryTest.java
 
b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchAggregatorFactoryTest.java
index c56598e4a72..8389c81ce51 100644
--- 
a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchAggregatorFactoryTest.java
+++ 
b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchAggregatorFactoryTest.java
@@ -153,4 +153,22 @@ public class KllDoublesSketchAggregatorFactoryTest
         new TimeseriesQueryQueryToolChest().resultArraySignature(query)
     );
   }
+
+  @Test
+  public void testCanSubstitute()
+  {
+    AggregatorFactory sketch = new KllDoublesSketchAggregatorFactory("sketch", 
"x", 200, null);
+    AggregatorFactory sketch2 = new KllDoublesSketchAggregatorFactory("other", 
"x", 200, null);
+    AggregatorFactory sketch3 = new 
KllDoublesSketchAggregatorFactory("sketch", "x", 200, 1_000L);
+    AggregatorFactory sketch4 = new 
KllDoublesSketchAggregatorFactory("sketch", "y", 200, null);
+    AggregatorFactory sketch5 = new 
KllDoublesSketchAggregatorFactory("sketch", "x", 300, null);
+
+    Assert.assertNotNull(sketch.substituteCombiningFactory(sketch2));
+    Assert.assertNotNull(sketch3.substituteCombiningFactory(sketch2));
+    Assert.assertNotNull(sketch3.substituteCombiningFactory(sketch));
+    Assert.assertNotNull(sketch2.substituteCombiningFactory(sketch));
+    Assert.assertNull(sketch.substituteCombiningFactory(sketch3));
+    Assert.assertNull(sketch.substituteCombiningFactory(sketch4));
+    Assert.assertNull(sketch.substituteCombiningFactory(sketch5));
+  }
 }
diff --git 
a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactoryTest.java
 
b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactoryTest.java
index f5a15cde242..9af69174eec 100644
--- 
a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactoryTest.java
+++ 
b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactoryTest.java
@@ -201,4 +201,19 @@ public class DoublesSketchAggregatorFactoryTest
     ac.fold(new TestDoublesSketchColumnValueSelector());
     Assert.assertNotNull(ac.getObject());
   }
+
+  @Test
+  public void testCanSubstitute()
+  {
+    final DoublesSketchAggregatorFactory sketch = new 
DoublesSketchAggregatorFactory("sketch", "x", 1024, 1000L, null);
+    final DoublesSketchAggregatorFactory sketch2 = new 
DoublesSketchAggregatorFactory("other", "x", 1024, 2000L, null);
+    final DoublesSketchAggregatorFactory sketch3 = new 
DoublesSketchAggregatorFactory("another", "x", 2048, 1000L, null);
+    final DoublesSketchAggregatorFactory incompatible = new 
DoublesSketchAggregatorFactory("incompatible", "y", 1024, 1000L, null);
+
+    Assert.assertNotNull(sketch.substituteCombiningFactory(sketch2));
+    Assert.assertNotNull(sketch.substituteCombiningFactory(sketch3));
+    Assert.assertNull(sketch2.substituteCombiningFactory(sketch3));
+    Assert.assertNull(sketch.substituteCombiningFactory(incompatible));
+    Assert.assertNull(sketch3.substituteCombiningFactory(sketch));
+  }
 }
diff --git 
a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactoryTest.java
 
b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactoryTest.java
index 23887652a73..1b2565dde29 100644
--- 
a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactoryTest.java
+++ 
b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactoryTest.java
@@ -24,6 +24,7 @@ import org.apache.druid.error.DruidException;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.query.Druids;
 import org.apache.druid.query.aggregation.AggregatorAndSize;
+import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import 
org.apache.druid.query.aggregation.datasketches.theta.oldapi.OldSketchBuildAggregatorFactory;
 import 
org.apache.druid.query.aggregation.datasketches.theta.oldapi.OldSketchMergeAggregatorFactory;
@@ -213,4 +214,18 @@ public class SketchAggregatorFactoryTest
     Throwable exception = Assert.assertThrows(DruidException.class, () -> 
AGGREGATOR_16384.factorizeVector(vectorFactory));
     Assert.assertEquals("Unsupported input [x] of type [COMPLEX<json>] for 
aggregator [COMPLEX<thetaSketchBuild>].", exception.getMessage());
   }
+
+  @Test
+  public void testCanSubstitute()
+  {
+    AggregatorFactory sketch1 = new SketchMergeAggregatorFactory("sketch", 
"x", 16, true, false, 2);
+    AggregatorFactory sketch2 = new SketchMergeAggregatorFactory("other", "x", 
null, false, false, null);
+    AggregatorFactory sketch3 = new SketchMergeAggregatorFactory("sketch", 
"x", null, false, false, 3);
+    AggregatorFactory sketch4 = new SketchMergeAggregatorFactory("sketch", 
"y", null, false, false, null);
+
+    Assert.assertNotNull(sketch1.substituteCombiningFactory(sketch2));
+    Assert.assertNotNull(sketch1.substituteCombiningFactory(sketch3));
+    Assert.assertNull(sketch1.substituteCombiningFactory(sketch4));
+    Assert.assertNull(sketch2.substituteCombiningFactory(sketch1));
+  }
 }
diff --git 
a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchAggregatorFactoryTest.java
 
b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchAggregatorFactoryTest.java
index 10f2afe2248..d62c6c61ce7 100644
--- 
a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchAggregatorFactoryTest.java
+++ 
b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchAggregatorFactoryTest.java
@@ -118,4 +118,21 @@ public class ArrayOfDoublesSketchAggregatorFactoryTest
     Assert.assertEquals(factory, factory.withName("name"));
     Assert.assertEquals("newTest", factory.withName("newTest").getName());
   }
+
+  @Test
+  public void testCanSubstitute()
+  {
+    AggregatorFactory sketch = new 
ArrayOfDoublesSketchAggregatorFactory("sketch", "x", null, null, null);
+    AggregatorFactory sketch2 = new 
ArrayOfDoublesSketchAggregatorFactory("sketch2", "x", null, null, null);
+    AggregatorFactory other = new 
ArrayOfDoublesSketchAggregatorFactory("other", "x", 8192, null, null);
+    AggregatorFactory incompatible = new 
ArrayOfDoublesSketchAggregatorFactory("incompatible", "x", 2048, null, null);
+    AggregatorFactory incompatible2 = new 
ArrayOfDoublesSketchAggregatorFactory("sketch", "y", null, null, null);
+    Assert.assertNotNull(sketch.substituteCombiningFactory(other));
+    Assert.assertNotNull(sketch.substituteCombiningFactory(sketch2));
+    Assert.assertNull(sketch.substituteCombiningFactory(incompatible));
+    Assert.assertNotNull(sketch.substituteCombiningFactory(sketch));
+    Assert.assertNull(other.substituteCombiningFactory(sketch));
+    Assert.assertNull(sketch.substituteCombiningFactory(incompatible2));
+    Assert.assertNull(other.substituteCombiningFactory(incompatible2));
+  }
 }
diff --git 
a/extensions-core/datasketches/src/test/java/org/apache/druid/segment/DatasketchesProjectionTest.java
 
b/extensions-core/datasketches/src/test/java/org/apache/druid/segment/DatasketchesProjectionTest.java
new file mode 100644
index 00000000000..9b0af45ef19
--- /dev/null
+++ 
b/extensions-core/datasketches/src/test/java/org/apache/druid/segment/DatasketchesProjectionTest.java
@@ -0,0 +1,489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.datasketches.common.Family;
+import org.apache.datasketches.hll.HllSketch;
+import org.apache.datasketches.kll.KllDoublesSketch;
+import org.apache.datasketches.quantiles.DoublesSketch;
+import org.apache.datasketches.theta.SetOperation;
+import org.apache.datasketches.theta.Union;
+import org.apache.datasketches.thetacommon.ThetaUtil;
+import org.apache.datasketches.tuple.arrayofdoubles.ArrayOfDoublesSketch;
+import 
org.apache.datasketches.tuple.arrayofdoubles.ArrayOfDoublesUpdatableSketch;
+import 
org.apache.datasketches.tuple.arrayofdoubles.ArrayOfDoublesUpdatableSketchBuilder;
+import org.apache.druid.collections.CloseableDefaultBlockingPool;
+import org.apache.druid.collections.CloseableStupidPool;
+import org.apache.druid.collections.NonBlockingPool;
+import org.apache.druid.data.input.impl.AggregateProjectionSpec;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.DoubleDimensionSchema;
+import org.apache.druid.data.input.impl.FloatDimensionSchema;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.query.DruidProcessingConfig;
+import org.apache.druid.query.QueryContexts;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import 
org.apache.druid.query.aggregation.datasketches.hll.HllSketchBuildAggregatorFactory;
+import org.apache.druid.query.aggregation.datasketches.hll.HllSketchHolder;
+import org.apache.druid.query.aggregation.datasketches.hll.HllSketchModule;
+import 
org.apache.druid.query.aggregation.datasketches.kll.KllDoublesSketchAggregatorFactory;
+import org.apache.druid.query.aggregation.datasketches.kll.KllSketchModule;
+import 
org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchAggregatorFactory;
+import 
org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchModule;
+import org.apache.druid.query.aggregation.datasketches.theta.SketchHolder;
+import 
org.apache.druid.query.aggregation.datasketches.theta.SketchMergeAggregatorFactory;
+import org.apache.druid.query.aggregation.datasketches.theta.SketchModule;
+import 
org.apache.druid.query.aggregation.datasketches.tuple.ArrayOfDoublesSketchAggregatorFactory;
+import 
org.apache.druid.query.aggregation.datasketches.tuple.ArrayOfDoublesSketchModule;
+import org.apache.druid.query.groupby.GroupByQuery;
+import org.apache.druid.query.groupby.GroupByQueryConfig;
+import org.apache.druid.query.groupby.GroupByResourcesReservationPool;
+import org.apache.druid.query.groupby.GroupingEngine;
+import org.apache.druid.query.groupby.ResultRow;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.IncrementalIndexCursorFactory;
+import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * like {@link CursorFactoryProjectionTest} but for sketch aggs
+ */
+@RunWith(Parameterized.class)
+public class DatasketchesProjectionTest extends InitializedNullHandlingTest
+{
+  private static final Closer CLOSER = Closer.create();
+
+  private static final List<AggregateProjectionSpec> PROJECTIONS = 
Collections.singletonList(
+      new AggregateProjectionSpec(
+          "a_projection",
+          VirtualColumns.create(
+              Granularities.toVirtualColumn(Granularities.HOUR, "__gran")
+          ),
+          Arrays.asList(
+              new LongDimensionSchema("__gran"),
+              new StringDimensionSchema("a")
+          ),
+          new AggregatorFactory[]{
+              new HllSketchBuildAggregatorFactory("_b_hll", "b", null, null, 
null, null, false),
+              new SketchMergeAggregatorFactory("_b_theta", "b", null, null, 
false, null),
+              new DoublesSketchAggregatorFactory("_d_doubles", "d", null),
+              new ArrayOfDoublesSketchAggregatorFactory("_bcd_aod", "b", null, 
Arrays.asList("c", "d"), null),
+              new KllDoublesSketchAggregatorFactory("_d_kll", "d", null, null)
+          }
+      )
+  );
+
+  private static final List<AggregateProjectionSpec> AUTO_PROJECTIONS = 
PROJECTIONS.stream().map(projection -> {
+    return new AggregateProjectionSpec(
+        projection.getName(),
+        projection.getVirtualColumns(),
+        projection.getGroupingColumns()
+                  .stream()
+                  .map(x -> new AutoTypeColumnSchema(x.getName(), null))
+                  .collect(Collectors.toList()),
+        projection.getAggregators()
+    );
+  }).collect(Collectors.toList());
+
+  @Parameterized.Parameters(name = "name: {0}, sortByDim: {3}, autoSchema: 
{4}")
+  public static Collection<?> constructorFeeder()
+  {
+    HllSketchModule.registerSerde();
+    TestHelper.JSON_MAPPER.registerModules(new 
HllSketchModule().getJacksonModules());
+    SketchModule.registerSerde();
+    TestHelper.JSON_MAPPER.registerModules(new 
SketchModule().getJacksonModules());
+    KllSketchModule.registerSerde();
+    TestHelper.JSON_MAPPER.registerModules(new 
KllSketchModule().getJacksonModules());
+    DoublesSketchModule.registerSerde();
+    TestHelper.JSON_MAPPER.registerModules(new 
DoublesSketchModule().getJacksonModules());
+    ArrayOfDoublesSketchModule.registerSerde();
+    TestHelper.JSON_MAPPER.registerModules(new 
ArrayOfDoublesSketchModule().getJacksonModules());
+
+    final List<Object[]> constructors = new ArrayList<>();
+    final DimensionsSpec.Builder dimensionsBuilder =
+        DimensionsSpec.builder()
+                      .setDimensions(
+                          Arrays.asList(
+                              new StringDimensionSchema("a"),
+                              new StringDimensionSchema("b"),
+                              new LongDimensionSchema("c"),
+                              new DoubleDimensionSchema("d"),
+                              new FloatDimensionSchema("e")
+                          )
+                      );
+    DimensionsSpec dimsTimeOrdered = dimensionsBuilder.build();
+    DimensionsSpec dimsOrdered = 
dimensionsBuilder.setForceSegmentSortByTime(false).build();
+
+
+    List<DimensionSchema> autoDims = dimsOrdered.getDimensions()
+                                                .stream()
+                                                .map(x -> new 
AutoTypeColumnSchema(x.getName(), null))
+                                                .collect(Collectors.toList());
+    for (boolean incremental : new boolean[]{true, false}) {
+      for (boolean sortByDim : new boolean[]{true, false}) {
+        for (boolean autoSchema : new boolean[]{true, false}) {
+          final DimensionsSpec dims;
+          if (sortByDim) {
+            if (autoSchema) {
+              dims = dimsOrdered.withDimensions(autoDims);
+            } else {
+              dims = dimsOrdered;
+            }
+          } else {
+            if (autoSchema) {
+              dims = dimsTimeOrdered.withDimensions(autoDims);
+            } else {
+              dims = dimsTimeOrdered;
+            }
+          }
+          if (incremental) {
+            IncrementalIndex index = CLOSER.register(makeBuilder(dims, 
autoSchema).buildIncrementalIndex());
+            constructors.add(new Object[]{
+                "incrementalIndex",
+                new IncrementalIndexCursorFactory(index),
+                new IncrementalIndexTimeBoundaryInspector(index),
+                sortByDim,
+                autoSchema
+            });
+          } else {
+            QueryableIndex index = CLOSER.register(makeBuilder(dims, 
autoSchema).buildMMappedIndex());
+            constructors.add(new Object[]{
+                "queryableIndex",
+                new QueryableIndexCursorFactory(index),
+                QueryableIndexTimeBoundaryInspector.create(index),
+                sortByDim,
+                autoSchema
+            });
+          }
+        }
+      }
+    }
+    return constructors;
+  }
+
+  @AfterClass
+  public static void cleanup() throws IOException
+  {
+    CLOSER.close();
+  }
+
+  private static IndexBuilder makeBuilder(DimensionsSpec dimensionsSpec, 
boolean autoSchema)
+  {
+    File tmp = FileUtils.createTempDir();
+    CLOSER.register(tmp::delete);
+    return IndexBuilder.create()
+                       .tmpDir(tmp)
+                       .schema(
+                           IncrementalIndexSchema.builder()
+                                                 
.withDimensionsSpec(dimensionsSpec)
+                                                 .withRollup(false)
+                                                 
.withMinTimestamp(CursorFactoryProjectionTest.TIMESTAMP.getMillis())
+                                                 .withProjections(autoSchema ? 
AUTO_PROJECTIONS : PROJECTIONS)
+                                                 .build()
+                       )
+                       .rows(CursorFactoryProjectionTest.ROWS);
+  }
+
+  public final CursorFactory projectionsCursorFactory;
+  public final TimeBoundaryInspector projectionsTimeBoundaryInspector;
+
+  private final GroupingEngine groupingEngine;
+
+  private final NonBlockingPool<ByteBuffer> nonBlockingPool;
+  public final boolean sortByDim;
+  public final boolean autoSchema;
+
+  @Rule
+  public final CloserRule closer = new CloserRule(false);
+
+  public DatasketchesProjectionTest(
+      String name,
+      CursorFactory projectionsCursorFactory,
+      TimeBoundaryInspector projectionsTimeBoundaryInspector,
+      boolean sortByDim,
+      boolean autoSchema
+  )
+  {
+    this.projectionsCursorFactory = projectionsCursorFactory;
+    this.projectionsTimeBoundaryInspector = projectionsTimeBoundaryInspector;
+    this.sortByDim = sortByDim;
+    this.autoSchema = autoSchema;
+    this.nonBlockingPool = closer.closeLater(
+        new CloseableStupidPool<>(
+            "GroupByQueryEngine-bufferPool",
+            () -> ByteBuffer.allocate(1 << 24)
+        )
+    );
+    this.groupingEngine = new GroupingEngine(
+        new DruidProcessingConfig(),
+        GroupByQueryConfig::new,
+        new GroupByResourcesReservationPool(
+            closer.closeLater(
+                new CloseableDefaultBlockingPool<>(
+                    () -> ByteBuffer.allocate(1 << 24),
+                    5
+                )
+            ),
+            new GroupByQueryConfig()
+        ),
+        TestHelper.makeJsonMapper(),
+        TestHelper.makeSmileMapper(),
+        (query, future) -> {
+        }
+    );
+  }
+
+  @Test
+  public void testProjectionSingleDim()
+  {
+    // test can use the single dimension projection
+    final GroupByQuery query =
+        GroupByQuery.builder()
+                    .setDataSource("test")
+                    .setGranularity(Granularities.ALL)
+                    .setInterval(Intervals.ETERNITY)
+                    .addDimension("a")
+                    .setAggregatorSpecs(
+                        new HllSketchBuildAggregatorFactory("b_distinct", "b", 
null, null, null, true, true),
+                        new SketchMergeAggregatorFactory("b_distinct_theta", 
"b", null, null, null, null),
+                        new DoublesSketchAggregatorFactory("d_doubles", "d", 
null, null, null),
+                        new ArrayOfDoublesSketchAggregatorFactory("b_doubles", 
"b", null, Arrays.asList("c", "d"), null),
+                        new KllDoublesSketchAggregatorFactory("d", "d", null, 
null)
+                    )
+                    .build();
+    final CursorBuildSpec buildSpec = 
GroupingEngine.makeCursorBuildSpec(query, null);
+    try (final CursorHolder cursorHolder = 
projectionsCursorFactory.makeCursorHolder(buildSpec)) {
+      final Cursor cursor = cursorHolder.asCursor();
+      int rowCount = 0;
+      while (!cursor.isDone()) {
+        rowCount++;
+        cursor.advance();
+      }
+      Assert.assertEquals(3, rowCount);
+    }
+    final Sequence<ResultRow> resultRows = groupingEngine.process(
+        query,
+        projectionsCursorFactory,
+        projectionsTimeBoundaryInspector,
+        nonBlockingPool,
+        null
+    );
+    final List<ResultRow> results = resultRows.toList();
+    Assert.assertEquals(2, results.size());
+    List<Object[]> expectedResults = getSingleDimExpected();
+    final RowSignature querySignature = 
query.getResultRowSignature(RowSignature.Finalization.NO);
+    for (int i = 0; i < expectedResults.size(); i++) {
+      assertResults(
+          expectedResults.get(i),
+          results.get(i).getArray(),
+          querySignature
+      );
+    }
+  }
+
+  @Test
+  public void testProjectionSingleDimNoProjections()
+  {
+    // test can use the single dimension projection
+    final GroupByQuery query =
+        GroupByQuery.builder()
+                    .setDataSource("test")
+                    .setGranularity(Granularities.ALL)
+                    .setInterval(Intervals.ETERNITY)
+                    .addDimension("a")
+                    .setAggregatorSpecs(
+                        new HllSketchBuildAggregatorFactory("b_distinct", "b", 
null, null, null, true, true),
+                        new SketchMergeAggregatorFactory("b_distinct_theta", 
"b", null, null, null, null),
+                        new DoublesSketchAggregatorFactory("d_doubles", "d", 
null, null, null),
+                        new ArrayOfDoublesSketchAggregatorFactory("b_doubles", 
"b", null, Arrays.asList("c", "d"), null),
+                        new KllDoublesSketchAggregatorFactory("d", "d", null, 
null)
+                    )
+                    .setContext(ImmutableMap.of(QueryContexts.NO_PROJECTIONS, 
true))
+                    .build();
+    final CursorBuildSpec buildSpec = 
GroupingEngine.makeCursorBuildSpec(query, null);
+    try (final CursorHolder cursorHolder = 
projectionsCursorFactory.makeCursorHolder(buildSpec)) {
+      final Cursor cursor = cursorHolder.asCursor();
+      int rowCount = 0;
+      while (!cursor.isDone()) {
+        rowCount++;
+        cursor.advance();
+      }
+      Assert.assertEquals(8, rowCount);
+    }
+    final Sequence<ResultRow> resultRows = groupingEngine.process(
+        query,
+        projectionsCursorFactory,
+        projectionsTimeBoundaryInspector,
+        nonBlockingPool,
+        null
+    );
+    final List<ResultRow> results = resultRows.toList();
+    Assert.assertEquals(2, results.size());
+    List<Object[]> expectedResults = getSingleDimExpected();
+    final RowSignature querySignature = 
query.getResultRowSignature(RowSignature.Finalization.NO);
+    for (int i = 0; i < expectedResults.size(); i++) {
+      assertResults(
+          expectedResults.get(i),
+          results.get(i).getArray(),
+          querySignature
+      );
+    }
+  }
+
+  private List<Object[]> getSingleDimExpected()
+  {
+    HllSketch hll1 = new HllSketch(HllSketch.DEFAULT_LG_K);
+    Union theta1 = (Union) SetOperation.builder().build(Family.UNION);
+    DoublesSketch d1 = 
DoublesSketch.builder().setK(DoublesSketchAggregatorFactory.DEFAULT_K).build();
+    ArrayOfDoublesUpdatableSketch ad1 = new 
ArrayOfDoublesUpdatableSketchBuilder().setNominalEntries(ThetaUtil.DEFAULT_NOMINAL_ENTRIES)
+                                                                               
   .setNumberOfValues(2)
+                                                                               
   .build();
+    KllDoublesSketch kll1 = KllDoublesSketch.newHeapInstance();
+    hll1.update("aa");
+    hll1.update("bb");
+    hll1.update("cc");
+    hll1.update("dd");
+    theta1.update("aa");
+    theta1.update("bb");
+    theta1.update("cc");
+    theta1.update("dd");
+    d1.update(1.0);
+    d1.update(1.1);
+    d1.update(2.2);
+    d1.update(1.1);
+    d1.update(2.2);
+    ad1.update("aa", new double[]{1.0, 1.0});
+    ad1.update("bb", new double[]{1.0, 1.1});
+    ad1.update("cc", new double[]{2.0, 2.2});
+    ad1.update("aa", new double[]{1.0, 1.1});
+    ad1.update("dd", new double[]{2.0, 2.2});
+    kll1.update(1.0);
+    kll1.update(1.1);
+    kll1.update(2.2);
+    kll1.update(1.1);
+    kll1.update(2.2);
+    HllSketch hll2 = new HllSketch(HllSketch.DEFAULT_LG_K);
+    Union theta2 = (Union) SetOperation.builder().build(Family.UNION);
+    DoublesSketch d2 = 
DoublesSketch.builder().setK(DoublesSketchAggregatorFactory.DEFAULT_K).build();
+    ArrayOfDoublesUpdatableSketch ad2 = new 
ArrayOfDoublesUpdatableSketchBuilder().setNominalEntries(ThetaUtil.DEFAULT_NOMINAL_ENTRIES)
+                                                                               
   .setNumberOfValues(2)
+                                                                               
   .build();
+    KllDoublesSketch kll2 = KllDoublesSketch.newHeapInstance();
+    hll2.update("aa");
+    hll2.update("bb");
+    theta2.update("aa");
+    theta2.update("bb");
+    d2.update(3.3);
+    d2.update(4.4);
+    d2.update(5.5);
+    ad2.update("aa", new double[]{3.0, 3.3});
+    ad2.update("aa", new double[]{4.0, 4.4});
+    ad2.update("bb", new double[]{5.0, 5.5});
+    kll2.update(3.3);
+    kll2.update(4.4);
+    kll2.update(5.5);
+
+    return Arrays.asList(
+        new Object[]{"a", HllSketchHolder.of(hll1), SketchHolder.of(theta1), 
d1, ad1, kll1},
+        new Object[]{"b", HllSketchHolder.of(hll2), SketchHolder.of(theta2), 
d2, ad2, kll2}
+    );
+
+  }
+
+  private void assertResults(Object[] expected, Object[] actual, RowSignature 
signature)
+  {
+    Assert.assertEquals(expected.length, actual.length);
+    for (int i = 0; i < expected.length; i++) {
+      if 
(signature.getColumnType(i).get().equals(ColumnType.ofComplex(HllSketchModule.BUILD_TYPE_NAME)))
 {
+        Assert.assertEquals(
+            ((HllSketchHolder) expected[i]).getEstimate(),
+            ((HllSketchHolder) actual[i]).getEstimate(),
+            0.01
+        );
+      } else if 
(signature.getColumnType(i).get().equals(DoublesSketchModule.TYPE)) {
+        Assert.assertEquals(
+            ((DoublesSketch) expected[i]).getMinItem(),
+            ((DoublesSketch) actual[i]).getMinItem(),
+            0.01
+        );
+        Assert.assertEquals(
+            ((DoublesSketch) expected[i]).getMaxItem(),
+            ((DoublesSketch) actual[i]).getMaxItem(),
+            0.01
+        );
+      } else if 
(signature.getColumnType(i).get().equals(ArrayOfDoublesSketchModule.BUILD_TYPE))
 {
+        Assert.assertEquals(
+            ((ArrayOfDoublesSketch) expected[i]).getEstimate(),
+            ((ArrayOfDoublesSketch) actual[i]).getEstimate(),
+            0.01
+        );
+        Assert.assertEquals(
+            ((ArrayOfDoublesSketch) expected[i]).getLowerBound(0),
+            ((ArrayOfDoublesSketch) actual[i]).getLowerBound(0),
+            0.01
+        );
+        Assert.assertEquals(
+            ((ArrayOfDoublesSketch) expected[i]).getUpperBound(0),
+            ((ArrayOfDoublesSketch) actual[i]).getUpperBound(0),
+            0.01
+        );
+      } else if 
(signature.getColumnType(i).get().equals(KllSketchModule.DOUBLES_TYPE)) {
+        Assert.assertEquals(
+            ((KllDoublesSketch) expected[i]).getMinItem(),
+            ((KllDoublesSketch) actual[i]).getMinItem(),
+            0.01
+        );
+        Assert.assertEquals(
+            ((KllDoublesSketch) expected[i]).getMaxItem(),
+            ((KllDoublesSketch) actual[i]).getMaxItem(),
+            0.01
+        );
+      } else {
+        Assert.assertEquals(expected[i], actual[i]);
+      }
+    }
+  }
+}
diff --git 
a/processing/src/main/java/org/apache/druid/segment/AggregateProjectionMetadata.java
 
b/processing/src/main/java/org/apache/druid/segment/AggregateProjectionMetadata.java
index 7712cecb303..1b63d9cb1df 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/AggregateProjectionMetadata.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/AggregateProjectionMetadata.java
@@ -332,6 +332,7 @@ public class AggregateProjectionMetadata
             if (combining != null) {
               matchBuilder.remapColumn(queryAgg.getName(), 
projectionAgg.getName()).addPreAggregatedAggregator(combining);
               foundMatch = true;
+              break;
             }
           }
           allMatch = allMatch && foundMatch;
diff --git 
a/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java
 
b/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java
index 446e714f997..7eae70cc1f4 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java
@@ -99,15 +99,15 @@ import java.util.stream.Collectors;
 public class CursorFactoryProjectionTest extends InitializedNullHandlingTest
 {
   private static final Closer CLOSER = Closer.create();
-  private static final DateTime TIMESTAMP = 
Granularities.DAY.bucket(DateTimes.nowUtc()).getStart();
+  static final DateTime TIMESTAMP = 
Granularities.DAY.bucket(DateTimes.nowUtc()).getStart();
 
-  private static final RowSignature ROW_SIGNATURE = RowSignature.builder()
+  static final RowSignature ROW_SIGNATURE = RowSignature.builder()
                                                                 .add("a", 
ColumnType.STRING)
                                                                 .add("b", 
ColumnType.STRING)
                                                                 .add("c", 
ColumnType.LONG)
                                                                 .add("d", 
ColumnType.DOUBLE)
                                                                 .build();
-  private static final List<InputRow> ROWS = Arrays.asList(
+  static final List<InputRow> ROWS = Arrays.asList(
       new ListBasedInputRow(
           ROW_SIGNATURE,
           TIMESTAMP,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to