gianm commented on code in PR #12498:
URL: https://github.com/apache/druid/pull/12498#discussion_r941590264


##########
docs/development/extensions-core/datasketches-quantiles.md:
##########
@@ -56,7 +56,7 @@ The result of the aggregation is a DoublesSketch that is the 
union of all sketch
 |name|A String for the output (result) name of the calculation.|yes|
 |fieldName|A String for the name of the input field (can contain sketches or 
raw numeric values).|yes|
 |k|Parameter that determines the accuracy and size of the sketch. Higher k 
means higher accuracy but more space to store sketches. Must be a power of 2 
from 2 to 32768. See [accuracy 
information](https://datasketches.apache.org/docs/Quantiles/OrigQuantilesSketch)
 in the DataSketches documentation for details.|no, defaults to 128|
-|maxStreamLength|This parameter is a temporary solution to avoid a [known 
issue](https://github.com/apache/druid/issues/11544). It may be removed in a 
future release after the bug is fixed. This parameter defines the maximum 
number of items to store in each sketch. If a sketch reaches the limit, the 
query can throw `IllegalStateException`. To workaround this issue, increase the 
maximum stream length. See [accuracy 
information](https://datasketches.apache.org/docs/Quantiles/OrigQuantilesSketch)
 in the DataSketches documentation for how many bytes are required per stream 
length.|no, defaults to 1000000000|

Review Comment:
   Yeah, good call. Documenting something as a temporary solution is a great 
way to ensure it sticks around forever ๐Ÿ™‚



##########
docs/development/extensions-core/datasketches-kll.md:
##########
@@ -0,0 +1,138 @@
+---
+id: datasketches-kll
+title: "DataSketches KLL Sketch module"
+---
+
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+
+This module provides Apache Druid aggregators based on numeric quantiles 
KllFloatsSketch and KllDoublesSketch from [Apache 
DataSketches](https://datasketches.apache.org/) library. KLL quantiles sketch 
is a mergeable streaming algorithm to estimate the distribution of values, and 
approximately answer queries about the rank of a value, probability mass 
function of the distribution (PMF) or histogram, cumulative distribution 
function (CDF), and quantiles (median, min, max, 95th percentile and such). See 
[Quantiles Sketch 
Overview](https://datasketches.apache.org/docs/Quantiles/QuantilesOverview). 
This document applies to both KllFloatsSketch and KllDoublesSketch. Only one of 
them will be used in the examples.
+
+There are three major modes of operation:
+
+1. Ingesting sketches built outside of Druid (say, with Pig or Hive)
+2. Building sketches from raw data during ingestion
+3. Building sketches from raw data at query time
+
+To use this aggregator, make sure you 
[include](../../development/extensions.md#loading-extensions) the extension in 
your config file:
+
+```
+druid.extensions.loadList=["druid-datasketches"]
+```
+
+### Aggregator
+
+The result of the aggregation is a KllFloatsSketch or KllDoublesSketch that is 
the union of all sketches either built from raw data or read from the segments.
+
+```json
+{
+  "type" : "KllDoublesSketch",
+  "name" : <output_name>,
+  "fieldName" : <metric_name>,
+  "k": <parameter that controls size and accuracy>
+ }
+```
+
+|property|description|required?|
+|--------|-----------|---------|
+|type|This String should be "KllFloatsSketch" or "KllDoublesSketch"|yes|
+|name|A String for the output (result) name of the calculation.|yes|
+|fieldName|A String for the name of the input field (can contain sketches or 
raw numeric values).|yes|
+|k|Parameter that determines the accuracy and size of the sketch. Higher k 
means higher accuracy but more space to store sketches. Must be from 8 to 
65535.|no, defaults to 200|

Review Comment:
   Is there a table available of k -> space required and expected accuracy? 
Would be great to provide some additional guidance to users about this. It's 
the most common question I get about using sketches in Druid.



##########
core/src/main/java/org/apache/druid/segment/column/ColumnType.java:
##########
@@ -34,10 +34,10 @@ public class ColumnType extends BaseTypeSignature<ValueType>
   public static final ColumnType DOUBLE = new ColumnType(ValueType.DOUBLE, 
null, null);
   public static final ColumnType FLOAT = new ColumnType(ValueType.FLOAT, null, 
null);
   // currently, arrays only come from expressions or aggregators
-  // and there are no native float expressions (or aggs which produce float 
arrays)
   public static final ColumnType STRING_ARRAY = new 
ColumnType(ValueType.ARRAY, null, STRING);
   public static final ColumnType LONG_ARRAY = new ColumnType(ValueType.ARRAY, 
null, LONG);
   public static final ColumnType DOUBLE_ARRAY = new 
ColumnType(ValueType.ARRAY, null, DOUBLE);
+  public static final ColumnType FLOAT_ARRAY = new ColumnType(ValueType.ARRAY, 
null, FLOAT);

Review Comment:
   @clintropolis what are your thoughts on the implications of adding float 
array? Do we need to update any other code?



##########
extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchToQuantilesPostAggregator.java:
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.datasketches.kll;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import org.apache.datasketches.kll.KllDoublesSketch;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.PostAggregator;
+import org.apache.druid.query.aggregation.post.PostAggregatorIds;
+import org.apache.druid.query.cache.CacheKeyBuilder;
+import org.apache.druid.segment.ColumnInspector;
+import org.apache.druid.segment.column.ColumnType;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.Set;
+
+public class KllDoublesSketchToQuantilesPostAggregator implements 
PostAggregator
+{
+
+  private final String name;
+  private final PostAggregator field;
+  private final double[] fractions;
+
+  @JsonCreator
+  public KllDoublesSketchToQuantilesPostAggregator(
+      @JsonProperty("name") final String name,
+      @JsonProperty("field") final PostAggregator field,
+      @JsonProperty("fractions") final double[] fractions)
+  {
+    this.name = Preconditions.checkNotNull(name, "name is null");
+    this.field = Preconditions.checkNotNull(field, "field is null");
+    this.fractions = Preconditions.checkNotNull(fractions, "array of fractions 
is null");
+  }
+
+  @Override
+  @JsonProperty
+  public String getName()
+  {
+    return name;
+  }
+
+  @Override
+  public ColumnType getType(ColumnInspector signature)
+  {
+    return ColumnType.DOUBLE_ARRAY;
+  }
+
+  @JsonProperty
+  public PostAggregator getField()
+  {
+    return field;
+  }
+
+  @JsonProperty
+  public double[] getFractions()
+  {
+    return fractions;
+  }
+
+  @Override
+  public Object compute(final Map<String, Object> combinedAggregators)
+  {
+    final KllDoublesSketch sketch = (KllDoublesSketch) 
field.compute(combinedAggregators);
+    if (sketch.isEmpty()) {
+      final double[] quantiles = new double[fractions.length];

Review Comment:
   @clintropolis similar question here about `DOUBLE_ARRAY` and `double[]`?



##########
extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchAggregatorFactory.java:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.datasketches.kll;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.datasketches.kll.KllDoublesSketch;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import 
org.apache.druid.query.aggregation.AggregatorFactoryNotMergeableException;
+import org.apache.druid.query.aggregation.AggregatorUtil;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.ColumnProcessors;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.VectorColumnProcessorFactory;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
+import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
+import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
+import org.apache.druid.segment.vector.VectorObjectSelector;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+public class KllDoublesSketchAggregatorFactory extends 
KllSketchAggregatorFactory<KllDoublesSketch, Double>
+{
+  public static final Comparator<KllDoublesSketch> COMPARATOR =
+      Comparator.nullsFirst(Comparator.comparingLong(KllDoublesSketch::getN));
+
+  @JsonCreator
+  public KllDoublesSketchAggregatorFactory(
+      @JsonProperty("name") final String name,
+      @JsonProperty("fieldName") final String fieldName,
+      @JsonProperty("k") @Nullable final Integer k,
+      @JsonProperty("maxStreamLength") @Nullable final Long maxStreamLength
+  )
+  {
+    this(name, fieldName, k, maxStreamLength, 
AggregatorUtil.KLL_DOUBLES_SKETCH_BUILD_CACHE_TYPE_ID);
+  }
+
+  KllDoublesSketchAggregatorFactory(
+      final String name,
+      final String fieldName,
+      @Nullable final Integer k,
+      @Nullable final Long maxStreamLength,
+      final byte cacheTypeId
+  )
+  {
+    super(
+        name,
+        fieldName,
+        k,
+        maxStreamLength,
+        cacheTypeId
+    );
+  }
+
+  @Override
+  public Comparator<KllDoublesSketch> getComparator()
+  {
+    return COMPARATOR;
+  }
+
+  @Override
+  public List<AggregatorFactory> getRequiredColumns()
+  {
+    return Collections.singletonList(
+        new KllDoublesSketchAggregatorFactory(
+            getFieldName(),
+            getFieldName(),
+            getK(),
+            getMaxStreamLength()
+        )
+    );
+  }
+
+  @Override
+  public AggregatorFactory getMergingFactory(final AggregatorFactory other)
+      throws AggregatorFactoryNotMergeableException
+  {
+    if (other.getName().equals(this.getName()) && other instanceof 
KllDoublesSketchAggregatorFactory) {
+      // KllSketch supports merging with different k.
+      // The result will have effective k between the specified k and the 
minimum k from all input sketches
+      // to achieve higher accuracy as much as possible.
+      return new KllDoublesSketchMergeAggregatorFactory(
+          getName(),
+          Math.max(getK(), ((KllDoublesSketchAggregatorFactory) other).getK()),
+          getMaxStreamLength()

Review Comment:
   It's nicer for this stuff to be symmetric; would it make sense to use max 
here?
   
   Like, `Math.max(getMaxStreamLength(), ((KllDoublesSketchAggregatorFactory) 
other).getMaxStreamLength())`



##########
extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllFloatsSketchAggregatorFactory.java:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.datasketches.kll;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.datasketches.kll.KllFloatsSketch;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import 
org.apache.druid.query.aggregation.AggregatorFactoryNotMergeableException;
+import org.apache.druid.query.aggregation.AggregatorUtil;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.ColumnProcessors;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.VectorColumnProcessorFactory;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
+import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
+import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
+import org.apache.druid.segment.vector.VectorObjectSelector;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+public class KllFloatsSketchAggregatorFactory extends 
KllSketchAggregatorFactory<KllFloatsSketch, Float>
+{
+  public static final Comparator<KllFloatsSketch> COMPARATOR =
+      Comparator.nullsFirst(Comparator.comparingLong(KllFloatsSketch::getN));
+
+  @JsonCreator
+  public KllFloatsSketchAggregatorFactory(
+      @JsonProperty("name") final String name,
+      @JsonProperty("fieldName") final String fieldName,
+      @JsonProperty("k") @Nullable final Integer k,
+      @JsonProperty("maxStreamLength") @Nullable final Long maxStreamLength
+  )
+  {
+    this(name, fieldName, k, maxStreamLength, 
AggregatorUtil.KLL_FLOATS_SKETCH_BUILD_CACHE_TYPE_ID);
+  }
+
+  KllFloatsSketchAggregatorFactory(
+      final String name,
+      final String fieldName,
+      @Nullable final Integer k,
+      @Nullable final Long maxStreamLength,
+      final byte cacheTypeId
+  )
+  {
+    super(
+        name,
+        fieldName,
+        k,
+        maxStreamLength,
+        cacheTypeId
+    );
+  }
+
+  @Override
+  public Comparator<KllFloatsSketch> getComparator()
+  {
+    return COMPARATOR;
+  }
+
+  @Override
+  public List<AggregatorFactory> getRequiredColumns()
+  {
+    return Collections.singletonList(
+        new KllFloatsSketchAggregatorFactory(
+            getFieldName(),
+            getFieldName(),
+            getK(),
+            getMaxStreamLength()
+        )
+    );
+  }
+
+  @Override
+  public AggregatorFactory getMergingFactory(final AggregatorFactory other)
+      throws AggregatorFactoryNotMergeableException
+  {
+    if (other.getName().equals(this.getName()) && other instanceof 
KllFloatsSketchAggregatorFactory) {
+      // KllSketch supports merging with different k.
+      // The result will have effective k between the specified k and the 
minimum k from all input sketches
+      // to achieve higher accuracy as much as possible.
+      return new KllFloatsSketchMergeAggregatorFactory(
+          getName(),
+          Math.max(getK(), ((KllFloatsSketchAggregatorFactory) other).getK()),
+          getMaxStreamLength()

Review Comment:
   Similar comment to KllDoublesSketchAggregatorFactory: It's nicer for this 
stuff to be symmetric; would it make sense to use max here?



##########
docs/development/extensions-core/datasketches-kll.md:
##########
@@ -0,0 +1,138 @@
+---
+id: datasketches-kll
+title: "DataSketches KLL Sketch module"
+---
+
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+
+This module provides Apache Druid aggregators based on numeric quantiles 
KllFloatsSketch and KllDoublesSketch from [Apache 
DataSketches](https://datasketches.apache.org/) library. KLL quantiles sketch 
is a mergeable streaming algorithm to estimate the distribution of values, and 
approximately answer queries about the rank of a value, probability mass 
function of the distribution (PMF) or histogram, cumulative distribution 
function (CDF), and quantiles (median, min, max, 95th percentile and such). See 
[Quantiles Sketch 
Overview](https://datasketches.apache.org/docs/Quantiles/QuantilesOverview). 
This document applies to both KllFloatsSketch and KllDoublesSketch. Only one of 
them will be used in the examples.
+
+There are three major modes of operation:
+
+1. Ingesting sketches built outside of Druid (say, with Pig or Hive)
+2. Building sketches from raw data during ingestion
+3. Building sketches from raw data at query time
+
+To use this aggregator, make sure you 
[include](../../development/extensions.md#loading-extensions) the extension in 
your config file:
+
+```
+druid.extensions.loadList=["druid-datasketches"]
+```
+
+### Aggregator
+
+The result of the aggregation is a KllFloatsSketch or KllDoublesSketch that is 
the union of all sketches either built from raw data or read from the segments.
+
+```json
+{
+  "type" : "KllDoublesSketch",
+  "name" : <output_name>,
+  "fieldName" : <metric_name>,
+  "k": <parameter that controls size and accuracy>
+ }
+```
+
+|property|description|required?|
+|--------|-----------|---------|
+|type|This String should be "KllFloatsSketch" or "KllDoublesSketch"|yes|
+|name|A String for the output (result) name of the calculation.|yes|
+|fieldName|A String for the name of the input field (can contain sketches or 
raw numeric values).|yes|
+|k|Parameter that determines the accuracy and size of the sketch. Higher k 
means higher accuracy but more space to store sketches. Must be from 8 to 
65535.|no, defaults to 200|
+|maxStreamLength|This parameter defines the number of items presented to each 
sketch before it might, in the context of a BufferAggregator, grow larger than 
a preallocated memory region and need to move on heap. Ideally just a few 
sketches should grow that large.|no, defaults to 1000000000|

Review Comment:
   Users won't necessarily know what BufferAggregators are, since they're an 
implementation detail. Suggested alternate wording:
   
   > This parameter defines the number of items that can be presented to each 
sketch before it may need to move from off-heap to on-heap memory. This is 
relevant to query types that use off-heap memory, including 
`[TopN](../../querying/topnquery.md)` and 
`[GroupBy](../../querying/groupbyquery.md)`. Ideally, should be set high enough 
such that most sketches can stay off-heap.
   
   This raises the question in users' minds: why not set it to some huge value 
like `Long.MAX_VALUE`? I guess that the downside of setting `maxStreamLength` 
higher is increased off-heap memory requirements. Is there a table or formula 
available about the relationship between `maxStreamLength` and required 
off-heap memory?



##########
extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchToCDFPostAggregator.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.datasketches.kll;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import org.apache.datasketches.kll.KllDoublesSketch;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.PostAggregator;
+import org.apache.druid.query.aggregation.post.PostAggregatorIds;
+import org.apache.druid.query.cache.CacheKeyBuilder;
+import org.apache.druid.segment.ColumnInspector;
+import org.apache.druid.segment.column.ColumnType;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.Set;
+
+public class KllDoublesSketchToCDFPostAggregator implements PostAggregator
+{
+
+  private final String name;
+  private final PostAggregator field;
+  private final double[] splitPoints;
+
+  @JsonCreator
+  public KllDoublesSketchToCDFPostAggregator(
+      @JsonProperty("name") final String name,
+      @JsonProperty("field") final PostAggregator field,
+      @JsonProperty("splitPoints") final double[] splitPoints)
+  {
+    this.name = Preconditions.checkNotNull(name, "name is null");
+    this.field = Preconditions.checkNotNull(field, "field is null");
+    this.splitPoints = Preconditions.checkNotNull(splitPoints, "array of split 
points is null");
+  }
+
+  @Override
+  public Object compute(final Map<String, Object> combinedAggregators)
+  {
+    final KllDoublesSketch sketch = (KllDoublesSketch) 
field.compute(combinedAggregators);
+    if (sketch.isEmpty()) {
+      final double[] cdf = new double[splitPoints.length + 1];
+      Arrays.fill(cdf, Double.NaN);
+      return cdf;

Review Comment:
   @clintropolis is `double[]` a valid return class for something declared as 
`DOUBLE_ARRAY`? (Or should it be `Double[]` or `Object[]`?) I didn't find 
javadocs where we codify this. Ideallyย it should be linked from ColumnType or 
ValueType.



##########
extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllSketchAggregatorFactory.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.datasketches.kll;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.datasketches.kll.KllSketch;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.query.aggregation.AggregateCombiner;
+import org.apache.druid.query.aggregation.Aggregator;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.BufferAggregator;
+import org.apache.druid.query.aggregation.ObjectAggregateCombiner;
+import org.apache.druid.query.cache.CacheKeyBuilder;
+import org.apache.druid.segment.ColumnInspector;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.NilColumnValueSelector;
+import org.apache.druid.segment.column.ColumnType;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+abstract class KllSketchAggregatorFactory<SketchType extends KllSketch, 
ValueType> extends AggregatorFactory
+{
+  public static final int DEFAULT_K = 200;
+
+  // Used for sketch size estimation.
+  public static final long DEFAULT_MAX_STREAM_LENGTH = 1_000_000_000;
+
+  private final String name;
+  private final String fieldName;
+  private final int k;
+  private final long maxStreamLength;
+  private final byte cacheTypeId;
+
+  KllSketchAggregatorFactory(
+      final String name,
+      final String fieldName,
+      @Nullable final Integer k,
+      @Nullable final Long maxStreamLength,
+      final byte cacheTypeId
+  )
+  {
+    if (name == null) {
+      throw new IAE("Must have a valid, non-null aggregator name");
+    }
+    this.name = name;
+    if (fieldName == null) {
+      throw new IAE("Parameter fieldName must be specified");
+    }
+    this.fieldName = fieldName;
+    this.k = k == null ? DEFAULT_K : k;
+    this.maxStreamLength = maxStreamLength == null ? DEFAULT_MAX_STREAM_LENGTH 
: maxStreamLength;
+    this.cacheTypeId = cacheTypeId;
+  }
+
+  @Override
+  public Aggregator factorize(final ColumnSelectorFactory metricFactory)
+  {
+    if (metricFactory.getColumnCapabilities(fieldName) != null
+        && metricFactory.getColumnCapabilities(fieldName).isNumeric()) {
+      final ColumnValueSelector<ValueType> selector = 
metricFactory.makeColumnValueSelector(fieldName);
+      if (selector instanceof NilColumnValueSelector) {
+        return new KllSketchNoOpAggregator<SketchType>(getEmptySketch());
+      }
+      return getBuildAggregator(selector);
+    }
+    final ColumnValueSelector<SketchType> selector = 
metricFactory.makeColumnValueSelector(fieldName);
+    if (selector instanceof NilColumnValueSelector) {
+      return new KllSketchNoOpAggregator<SketchType>(getEmptySketch());
+    }
+    return getMergeAggregator(selector);
+  }
+
+  @Override
+  public BufferAggregator factorizeBuffered(final ColumnSelectorFactory 
metricFactory)
+  {
+    if (metricFactory.getColumnCapabilities(fieldName) != null
+        && metricFactory.getColumnCapabilities(fieldName).isNumeric()) {
+      final ColumnValueSelector<ValueType> selector = 
metricFactory.makeColumnValueSelector(fieldName);
+      if (selector instanceof NilColumnValueSelector) {
+        return new KllSketchNoOpBufferAggregator<SketchType>(getEmptySketch());
+      }
+      return getBuildBufferAggregator(selector);
+    }
+    final ColumnValueSelector<SketchType> selector = 
metricFactory.makeColumnValueSelector(fieldName);
+    if (selector instanceof NilColumnValueSelector) {
+      return new KllSketchNoOpBufferAggregator<SketchType>(getEmptySketch());
+    }
+    return getMergeBufferAggregator(selector);
+  }
+
+  @Override
+  public boolean canVectorize(ColumnInspector columnInspector)
+  {
+    return true;

Review Comment:
   ๐Ÿš€ ๐Ÿš€ ๐Ÿš€ 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to