gianm commented on code in PR #12498: URL: https://github.com/apache/druid/pull/12498#discussion_r941590264
########## docs/development/extensions-core/datasketches-quantiles.md: ########## @@ -56,7 +56,7 @@ The result of the aggregation is a DoublesSketch that is the union of all sketch |name|A String for the output (result) name of the calculation.|yes| |fieldName|A String for the name of the input field (can contain sketches or raw numeric values).|yes| |k|Parameter that determines the accuracy and size of the sketch. Higher k means higher accuracy but more space to store sketches. Must be a power of 2 from 2 to 32768. See [accuracy information](https://datasketches.apache.org/docs/Quantiles/OrigQuantilesSketch) in the DataSketches documentation for details.|no, defaults to 128| -|maxStreamLength|This parameter is a temporary solution to avoid a [known issue](https://github.com/apache/druid/issues/11544). It may be removed in a future release after the bug is fixed. This parameter defines the maximum number of items to store in each sketch. If a sketch reaches the limit, the query can throw `IllegalStateException`. To workaround this issue, increase the maximum stream length. See [accuracy information](https://datasketches.apache.org/docs/Quantiles/OrigQuantilesSketch) in the DataSketches documentation for how many bytes are required per stream length.|no, defaults to 1000000000| Review Comment: Yeah, good call. Documenting something as a temporary solution is a great way to ensure it sticks around forever ๐ ########## docs/development/extensions-core/datasketches-kll.md: ########## @@ -0,0 +1,138 @@ +--- +id: datasketches-kll +title: "DataSketches KLL Sketch module" +--- + +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> + + +This module provides Apache Druid aggregators based on numeric quantiles KllFloatsSketch and KllDoublesSketch from [Apache DataSketches](https://datasketches.apache.org/) library. KLL quantiles sketch is a mergeable streaming algorithm to estimate the distribution of values, and approximately answer queries about the rank of a value, probability mass function of the distribution (PMF) or histogram, cumulative distribution function (CDF), and quantiles (median, min, max, 95th percentile and such). See [Quantiles Sketch Overview](https://datasketches.apache.org/docs/Quantiles/QuantilesOverview). This document applies to both KllFloatsSketch and KllDoublesSketch. Only one of them will be used in the examples. + +There are three major modes of operation: + +1. Ingesting sketches built outside of Druid (say, with Pig or Hive) +2. Building sketches from raw data during ingestion +3. Building sketches from raw data at query time + +To use this aggregator, make sure you [include](../../development/extensions.md#loading-extensions) the extension in your config file: + +``` +druid.extensions.loadList=["druid-datasketches"] +``` + +### Aggregator + +The result of the aggregation is a KllFloatsSketch or KllDoublesSketch that is the union of all sketches either built from raw data or read from the segments. + +```json +{ + "type" : "KllDoublesSketch", + "name" : <output_name>, + "fieldName" : <metric_name>, + "k": <parameter that controls size and accuracy> + } +``` + +|property|description|required?| +|--------|-----------|---------| +|type|This String should be "KllFloatsSketch" or "KllDoublesSketch"|yes| +|name|A String for the output (result) name of the calculation.|yes| +|fieldName|A String for the name of the input field (can contain sketches or raw numeric values).|yes| +|k|Parameter that determines the accuracy and size of the sketch. Higher k means higher accuracy but more space to store sketches. Must be from 8 to 65535.|no, defaults to 200| Review Comment: Is there a table available of k -> space required and expected accuracy? Would be great to provide some additional guidance to users about this. It's the most common question I get about using sketches in Druid. ########## core/src/main/java/org/apache/druid/segment/column/ColumnType.java: ########## @@ -34,10 +34,10 @@ public class ColumnType extends BaseTypeSignature<ValueType> public static final ColumnType DOUBLE = new ColumnType(ValueType.DOUBLE, null, null); public static final ColumnType FLOAT = new ColumnType(ValueType.FLOAT, null, null); // currently, arrays only come from expressions or aggregators - // and there are no native float expressions (or aggs which produce float arrays) public static final ColumnType STRING_ARRAY = new ColumnType(ValueType.ARRAY, null, STRING); public static final ColumnType LONG_ARRAY = new ColumnType(ValueType.ARRAY, null, LONG); public static final ColumnType DOUBLE_ARRAY = new ColumnType(ValueType.ARRAY, null, DOUBLE); + public static final ColumnType FLOAT_ARRAY = new ColumnType(ValueType.ARRAY, null, FLOAT); Review Comment: @clintropolis what are your thoughts on the implications of adding float array? Do we need to update any other code? ########## extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchToQuantilesPostAggregator.java: ########## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.datasketches.kll; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import org.apache.datasketches.kll.KllDoublesSketch; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.PostAggregator; +import org.apache.druid.query.aggregation.post.PostAggregatorIds; +import org.apache.druid.query.cache.CacheKeyBuilder; +import org.apache.druid.segment.ColumnInspector; +import org.apache.druid.segment.column.ColumnType; + +import java.util.Arrays; +import java.util.Comparator; +import java.util.Map; +import java.util.Set; + +public class KllDoublesSketchToQuantilesPostAggregator implements PostAggregator +{ + + private final String name; + private final PostAggregator field; + private final double[] fractions; + + @JsonCreator + public KllDoublesSketchToQuantilesPostAggregator( + @JsonProperty("name") final String name, + @JsonProperty("field") final PostAggregator field, + @JsonProperty("fractions") final double[] fractions) + { + this.name = Preconditions.checkNotNull(name, "name is null"); + this.field = Preconditions.checkNotNull(field, "field is null"); + this.fractions = Preconditions.checkNotNull(fractions, "array of fractions is null"); + } + + @Override + @JsonProperty + public String getName() + { + return name; + } + + @Override + public ColumnType getType(ColumnInspector signature) + { + return ColumnType.DOUBLE_ARRAY; + } + + @JsonProperty + public PostAggregator getField() + { + return field; + } + + @JsonProperty + public double[] getFractions() + { + return fractions; + } + + @Override + public Object compute(final Map<String, Object> combinedAggregators) + { + final KllDoublesSketch sketch = (KllDoublesSketch) field.compute(combinedAggregators); + if (sketch.isEmpty()) { + final double[] quantiles = new double[fractions.length]; Review Comment: @clintropolis similar question here about `DOUBLE_ARRAY` and `double[]`? ########## extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchAggregatorFactory.java: ########## @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.datasketches.kll; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.datasketches.kll.KllDoublesSketch; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.AggregatorFactoryNotMergeableException; +import org.apache.druid.query.aggregation.AggregatorUtil; +import org.apache.druid.query.aggregation.VectorAggregator; +import org.apache.druid.segment.ColumnProcessors; +import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.VectorColumnProcessorFactory; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector; +import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; +import org.apache.druid.segment.vector.VectorObjectSelector; +import org.apache.druid.segment.vector.VectorValueSelector; + +import javax.annotation.Nullable; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + +public class KllDoublesSketchAggregatorFactory extends KllSketchAggregatorFactory<KllDoublesSketch, Double> +{ + public static final Comparator<KllDoublesSketch> COMPARATOR = + Comparator.nullsFirst(Comparator.comparingLong(KllDoublesSketch::getN)); + + @JsonCreator + public KllDoublesSketchAggregatorFactory( + @JsonProperty("name") final String name, + @JsonProperty("fieldName") final String fieldName, + @JsonProperty("k") @Nullable final Integer k, + @JsonProperty("maxStreamLength") @Nullable final Long maxStreamLength + ) + { + this(name, fieldName, k, maxStreamLength, AggregatorUtil.KLL_DOUBLES_SKETCH_BUILD_CACHE_TYPE_ID); + } + + KllDoublesSketchAggregatorFactory( + final String name, + final String fieldName, + @Nullable final Integer k, + @Nullable final Long maxStreamLength, + final byte cacheTypeId + ) + { + super( + name, + fieldName, + k, + maxStreamLength, + cacheTypeId + ); + } + + @Override + public Comparator<KllDoublesSketch> getComparator() + { + return COMPARATOR; + } + + @Override + public List<AggregatorFactory> getRequiredColumns() + { + return Collections.singletonList( + new KllDoublesSketchAggregatorFactory( + getFieldName(), + getFieldName(), + getK(), + getMaxStreamLength() + ) + ); + } + + @Override + public AggregatorFactory getMergingFactory(final AggregatorFactory other) + throws AggregatorFactoryNotMergeableException + { + if (other.getName().equals(this.getName()) && other instanceof KllDoublesSketchAggregatorFactory) { + // KllSketch supports merging with different k. + // The result will have effective k between the specified k and the minimum k from all input sketches + // to achieve higher accuracy as much as possible. + return new KllDoublesSketchMergeAggregatorFactory( + getName(), + Math.max(getK(), ((KllDoublesSketchAggregatorFactory) other).getK()), + getMaxStreamLength() Review Comment: It's nicer for this stuff to be symmetric; would it make sense to use max here? Like, `Math.max(getMaxStreamLength(), ((KllDoublesSketchAggregatorFactory) other).getMaxStreamLength())` ########## extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllFloatsSketchAggregatorFactory.java: ########## @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.datasketches.kll; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.datasketches.kll.KllFloatsSketch; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.AggregatorFactoryNotMergeableException; +import org.apache.druid.query.aggregation.AggregatorUtil; +import org.apache.druid.query.aggregation.VectorAggregator; +import org.apache.druid.segment.ColumnProcessors; +import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.VectorColumnProcessorFactory; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector; +import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; +import org.apache.druid.segment.vector.VectorObjectSelector; +import org.apache.druid.segment.vector.VectorValueSelector; + +import javax.annotation.Nullable; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + +public class KllFloatsSketchAggregatorFactory extends KllSketchAggregatorFactory<KllFloatsSketch, Float> +{ + public static final Comparator<KllFloatsSketch> COMPARATOR = + Comparator.nullsFirst(Comparator.comparingLong(KllFloatsSketch::getN)); + + @JsonCreator + public KllFloatsSketchAggregatorFactory( + @JsonProperty("name") final String name, + @JsonProperty("fieldName") final String fieldName, + @JsonProperty("k") @Nullable final Integer k, + @JsonProperty("maxStreamLength") @Nullable final Long maxStreamLength + ) + { + this(name, fieldName, k, maxStreamLength, AggregatorUtil.KLL_FLOATS_SKETCH_BUILD_CACHE_TYPE_ID); + } + + KllFloatsSketchAggregatorFactory( + final String name, + final String fieldName, + @Nullable final Integer k, + @Nullable final Long maxStreamLength, + final byte cacheTypeId + ) + { + super( + name, + fieldName, + k, + maxStreamLength, + cacheTypeId + ); + } + + @Override + public Comparator<KllFloatsSketch> getComparator() + { + return COMPARATOR; + } + + @Override + public List<AggregatorFactory> getRequiredColumns() + { + return Collections.singletonList( + new KllFloatsSketchAggregatorFactory( + getFieldName(), + getFieldName(), + getK(), + getMaxStreamLength() + ) + ); + } + + @Override + public AggregatorFactory getMergingFactory(final AggregatorFactory other) + throws AggregatorFactoryNotMergeableException + { + if (other.getName().equals(this.getName()) && other instanceof KllFloatsSketchAggregatorFactory) { + // KllSketch supports merging with different k. + // The result will have effective k between the specified k and the minimum k from all input sketches + // to achieve higher accuracy as much as possible. + return new KllFloatsSketchMergeAggregatorFactory( + getName(), + Math.max(getK(), ((KllFloatsSketchAggregatorFactory) other).getK()), + getMaxStreamLength() Review Comment: Similar comment to KllDoublesSketchAggregatorFactory: It's nicer for this stuff to be symmetric; would it make sense to use max here? ########## docs/development/extensions-core/datasketches-kll.md: ########## @@ -0,0 +1,138 @@ +--- +id: datasketches-kll +title: "DataSketches KLL Sketch module" +--- + +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> + + +This module provides Apache Druid aggregators based on numeric quantiles KllFloatsSketch and KllDoublesSketch from [Apache DataSketches](https://datasketches.apache.org/) library. KLL quantiles sketch is a mergeable streaming algorithm to estimate the distribution of values, and approximately answer queries about the rank of a value, probability mass function of the distribution (PMF) or histogram, cumulative distribution function (CDF), and quantiles (median, min, max, 95th percentile and such). See [Quantiles Sketch Overview](https://datasketches.apache.org/docs/Quantiles/QuantilesOverview). This document applies to both KllFloatsSketch and KllDoublesSketch. Only one of them will be used in the examples. + +There are three major modes of operation: + +1. Ingesting sketches built outside of Druid (say, with Pig or Hive) +2. Building sketches from raw data during ingestion +3. Building sketches from raw data at query time + +To use this aggregator, make sure you [include](../../development/extensions.md#loading-extensions) the extension in your config file: + +``` +druid.extensions.loadList=["druid-datasketches"] +``` + +### Aggregator + +The result of the aggregation is a KllFloatsSketch or KllDoublesSketch that is the union of all sketches either built from raw data or read from the segments. + +```json +{ + "type" : "KllDoublesSketch", + "name" : <output_name>, + "fieldName" : <metric_name>, + "k": <parameter that controls size and accuracy> + } +``` + +|property|description|required?| +|--------|-----------|---------| +|type|This String should be "KllFloatsSketch" or "KllDoublesSketch"|yes| +|name|A String for the output (result) name of the calculation.|yes| +|fieldName|A String for the name of the input field (can contain sketches or raw numeric values).|yes| +|k|Parameter that determines the accuracy and size of the sketch. Higher k means higher accuracy but more space to store sketches. Must be from 8 to 65535.|no, defaults to 200| +|maxStreamLength|This parameter defines the number of items presented to each sketch before it might, in the context of a BufferAggregator, grow larger than a preallocated memory region and need to move on heap. Ideally just a few sketches should grow that large.|no, defaults to 1000000000| Review Comment: Users won't necessarily know what BufferAggregators are, since they're an implementation detail. Suggested alternate wording: > This parameter defines the number of items that can be presented to each sketch before it may need to move from off-heap to on-heap memory. This is relevant to query types that use off-heap memory, including `[TopN](../../querying/topnquery.md)` and `[GroupBy](../../querying/groupbyquery.md)`. Ideally, should be set high enough such that most sketches can stay off-heap. This raises the question in users' minds: why not set it to some huge value like `Long.MAX_VALUE`? I guess that the downside of setting `maxStreamLength` higher is increased off-heap memory requirements. Is there a table or formula available about the relationship between `maxStreamLength` and required off-heap memory? ########## extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchToCDFPostAggregator.java: ########## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.datasketches.kll; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import org.apache.datasketches.kll.KllDoublesSketch; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.PostAggregator; +import org.apache.druid.query.aggregation.post.PostAggregatorIds; +import org.apache.druid.query.cache.CacheKeyBuilder; +import org.apache.druid.segment.ColumnInspector; +import org.apache.druid.segment.column.ColumnType; + +import java.util.Arrays; +import java.util.Comparator; +import java.util.Map; +import java.util.Set; + +public class KllDoublesSketchToCDFPostAggregator implements PostAggregator +{ + + private final String name; + private final PostAggregator field; + private final double[] splitPoints; + + @JsonCreator + public KllDoublesSketchToCDFPostAggregator( + @JsonProperty("name") final String name, + @JsonProperty("field") final PostAggregator field, + @JsonProperty("splitPoints") final double[] splitPoints) + { + this.name = Preconditions.checkNotNull(name, "name is null"); + this.field = Preconditions.checkNotNull(field, "field is null"); + this.splitPoints = Preconditions.checkNotNull(splitPoints, "array of split points is null"); + } + + @Override + public Object compute(final Map<String, Object> combinedAggregators) + { + final KllDoublesSketch sketch = (KllDoublesSketch) field.compute(combinedAggregators); + if (sketch.isEmpty()) { + final double[] cdf = new double[splitPoints.length + 1]; + Arrays.fill(cdf, Double.NaN); + return cdf; Review Comment: @clintropolis is `double[]` a valid return class for something declared as `DOUBLE_ARRAY`? (Or should it be `Double[]` or `Object[]`?) I didn't find javadocs where we codify this. Ideallyย it should be linked from ColumnType or ValueType. ########## extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllSketchAggregatorFactory.java: ########## @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.datasketches.kll; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.datasketches.kll.KllSketch; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.query.aggregation.AggregateCombiner; +import org.apache.druid.query.aggregation.Aggregator; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.query.aggregation.ObjectAggregateCombiner; +import org.apache.druid.query.cache.CacheKeyBuilder; +import org.apache.druid.segment.ColumnInspector; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.NilColumnValueSelector; +import org.apache.druid.segment.column.ColumnType; + +import javax.annotation.Nullable; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +abstract class KllSketchAggregatorFactory<SketchType extends KllSketch, ValueType> extends AggregatorFactory +{ + public static final int DEFAULT_K = 200; + + // Used for sketch size estimation. + public static final long DEFAULT_MAX_STREAM_LENGTH = 1_000_000_000; + + private final String name; + private final String fieldName; + private final int k; + private final long maxStreamLength; + private final byte cacheTypeId; + + KllSketchAggregatorFactory( + final String name, + final String fieldName, + @Nullable final Integer k, + @Nullable final Long maxStreamLength, + final byte cacheTypeId + ) + { + if (name == null) { + throw new IAE("Must have a valid, non-null aggregator name"); + } + this.name = name; + if (fieldName == null) { + throw new IAE("Parameter fieldName must be specified"); + } + this.fieldName = fieldName; + this.k = k == null ? DEFAULT_K : k; + this.maxStreamLength = maxStreamLength == null ? DEFAULT_MAX_STREAM_LENGTH : maxStreamLength; + this.cacheTypeId = cacheTypeId; + } + + @Override + public Aggregator factorize(final ColumnSelectorFactory metricFactory) + { + if (metricFactory.getColumnCapabilities(fieldName) != null + && metricFactory.getColumnCapabilities(fieldName).isNumeric()) { + final ColumnValueSelector<ValueType> selector = metricFactory.makeColumnValueSelector(fieldName); + if (selector instanceof NilColumnValueSelector) { + return new KllSketchNoOpAggregator<SketchType>(getEmptySketch()); + } + return getBuildAggregator(selector); + } + final ColumnValueSelector<SketchType> selector = metricFactory.makeColumnValueSelector(fieldName); + if (selector instanceof NilColumnValueSelector) { + return new KllSketchNoOpAggregator<SketchType>(getEmptySketch()); + } + return getMergeAggregator(selector); + } + + @Override + public BufferAggregator factorizeBuffered(final ColumnSelectorFactory metricFactory) + { + if (metricFactory.getColumnCapabilities(fieldName) != null + && metricFactory.getColumnCapabilities(fieldName).isNumeric()) { + final ColumnValueSelector<ValueType> selector = metricFactory.makeColumnValueSelector(fieldName); + if (selector instanceof NilColumnValueSelector) { + return new KllSketchNoOpBufferAggregator<SketchType>(getEmptySketch()); + } + return getBuildBufferAggregator(selector); + } + final ColumnValueSelector<SketchType> selector = metricFactory.makeColumnValueSelector(fieldName); + if (selector instanceof NilColumnValueSelector) { + return new KllSketchNoOpBufferAggregator<SketchType>(getEmptySketch()); + } + return getMergeBufferAggregator(selector); + } + + @Override + public boolean canVectorize(ColumnInspector columnInspector) + { + return true; Review Comment: ๐ ๐ ๐ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
