abhishekagarwal87 commented on code in PR #15049: URL: https://github.com/apache/druid/pull/15049#discussion_r1367857410
########## docs/development/extensions-contrib/ddsketch-quantiles.md: ########## Review Comment: Please add a bit more colour here so users can decide when to use the dd sketch. ########## extensions-contrib/ddsketch/src/main/java/org/apache/druid/query/aggregation/ddsketch/DDSketchAggregator.java: ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.ddsketch; + +import com.datadoghq.sketch.ddsketch.DDSketch; +import com.datadoghq.sketch.ddsketch.DDSketches; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.query.aggregation.Aggregator; +import org.apache.druid.segment.ColumnValueSelector; + +import javax.annotation.Nullable; + + +/** + * Aggregator to build DDsketches on numeric values. + * It generally makes sense to use this aggregator during the ingestion time. + * <p> + * One can use this aggregator to build these sketches during query time too, just + * that it will be slower and more resource intensive. + */ +public class DDSketchAggregator implements Aggregator +{ + + private final ColumnValueSelector selector; + + @GuardedBy("this") + private DDSketch histogram; + + public DDSketchAggregator(ColumnValueSelector selector, @Nullable Double relativeError, @Nullable Integer numBins) + { + int effectiveNumBins = numBins != null ? numBins : DDSketchAggregatorFactory.DEFAULT_NUM_BINS; + double effectiveRelativeError = relativeError != null ? relativeError : DDSketchAggregatorFactory.DEFAULT_RELATIVE_ERROR; + this.selector = selector; + this.histogram = DDSketches.collapsingLowestDense(effectiveRelativeError, effectiveNumBins); + } + + @Override + public void aggregate() + { + Object obj = selector.getObject(); + if (obj == null) { + return; + } + if (obj instanceof Number) { + synchronized (this) { + this.histogram.accept(((Number) obj).doubleValue()); + } + } else if (obj instanceof DDSketch) { + synchronized (this) { + this.histogram.mergeWith((DDSketch) obj); + } + } else { + throw new IAE( + "Expected a number or an instance of DDSketch, but received [%s] of type [%s]", + obj, + obj.getClass() + ); + } Review Comment: ```suggestion synchronized (this) { if (obj instanceof Number) { this.histogram.accept(((Number) obj).doubleValue()); } else if (obj instanceof DDSketch) { this.histogram.mergeWith((DDSketch) obj); } else { throw new IAE( "Expected a number or an instance of DDSketch, but received [%s] of type [%s]", obj, obj.getClass() ); } ``` ########## extensions-contrib/ddsketch/src/main/java/org/apache/druid/query/aggregation/ddsketch/DDSketchAggregator.java: ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.ddsketch; + +import com.datadoghq.sketch.ddsketch.DDSketch; +import com.datadoghq.sketch.ddsketch.DDSketches; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.query.aggregation.Aggregator; +import org.apache.druid.segment.ColumnValueSelector; + +import javax.annotation.Nullable; + + +/** + * Aggregator to build DDsketches on numeric values. + * It generally makes sense to use this aggregator during the ingestion time. + * <p> + * One can use this aggregator to build these sketches during query time too, just + * that it will be slower and more resource intensive. + */ +public class DDSketchAggregator implements Aggregator +{ + + private final ColumnValueSelector selector; + + @GuardedBy("this") + private DDSketch histogram; + + public DDSketchAggregator(ColumnValueSelector selector, @Nullable Double relativeError, @Nullable Integer numBins) + { + int effectiveNumBins = numBins != null ? numBins : DDSketchAggregatorFactory.DEFAULT_NUM_BINS; + double effectiveRelativeError = relativeError != null ? relativeError : DDSketchAggregatorFactory.DEFAULT_RELATIVE_ERROR; + this.selector = selector; + this.histogram = DDSketches.collapsingLowestDense(effectiveRelativeError, effectiveNumBins); + } + + @Override + public void aggregate() + { + Object obj = selector.getObject(); + if (obj == null) { + return; + } + if (obj instanceof Number) { + synchronized (this) { + this.histogram.accept(((Number) obj).doubleValue()); + } + } else if (obj instanceof DDSketch) { + synchronized (this) { + this.histogram.mergeWith((DDSketch) obj); + } + } else { + throw new IAE( + "Expected a number or an instance of DDSketch, but received [%s] of type [%s]", + obj, + obj.getClass() + ); + } Review Comment: nit : above seems a better structure ########## extensions-contrib/ddsketch/pom.xml: ########## @@ -0,0 +1,188 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>druid</artifactId> + <groupId>org.apache.druid</groupId> + <version>28.0.0-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + <groupId>org.apache.druid.extensions.contrib</groupId> + <artifactId>druid-ddsketch</artifactId> + <name>druid-ddsketch</name> + <description>Druid extension for generating ddsketch backed sketches</description> + + <dependencies> + <dependency> + <groupId>com.datadoghq</groupId> + <artifactId>sketches-java</artifactId> + <version>0.8.2</version> + </dependency> + <dependency> + <groupId>org.apache.druid</groupId> + <artifactId>druid-processing</artifactId> + <version>${project.parent.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + <version>${protobuf.version}</version> + </dependency> + <dependency> + <groupId>com.google.inject</groupId> + <artifactId>guice</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.google.code.findbugs</groupId> + <artifactId>jsr305</artifactId> + <version>2.0.1</version> Review Comment: lets not hardcode the version here. ########## docs/development/extensions-contrib/ddsketch-quantiles.md: ########## @@ -0,0 +1,111 @@ +--- +id: ddsketch-quantiles +title: "DDSketches for Approximate Quantiles module" +--- + +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> + + +This module provides aggregators for approximate quantile queries using the [DDSketch](https://github.com/datadog/sketches-java) library. The DDSketch library provides a fast, and fully-mergeable quantile sketch with relative error. If the true quantile is 100, a sketch with relative error of 1% guarantees a quantile value between 101 and 99. This is important and highly valuable behavior for long tail distributions. + +To use this Apache Druid extension, [include](../../configuration/extensions.md#loading-extensions) in the extensions load list. + +### Aggregator + +The result of the aggregation is a DDSketch that is the union of all sketches either built from raw data or read from the segments. The single number that is returned represents the total number of included data points. The default aggregator type of `ddSketch` uses the collapsingLowestDense strategy for storing and merging sketch. This means that in favor of keeping the highest values represented at the highest accuracy, the sketch will collapse and merge lower, smaller values in the sketch. Collapsed bins will lose accuracy guarantees. The default number of bins is 1000. Sketches can only be merged when using the same relativeError values. + +The `ddSketch` aggregator operates over raw data and precomputed sketches. + +```json +{ + "type" : "ddSketch", + "name" : <output_name>, + "fieldName" : <input_name>, + "relativeError" : <double(0, 1)>, + "numBins": <int> + } +``` + +|property|description|required?| +|--------|-----------|---------| +|type|Must be "ddSketch" |yes| +|name|A String for the output (result) name of the calculation.|yes| +|fieldName|A String for the name of the input field (can contain sketches or raw numeric values).|yes| +|relativeError||Describes the precision in which to store the sketch. Must be a number between 0 and 1.|no, defaults to 0.01 (1% error)| +|numBins|Total number of bins the sketch is allowed to use to describe the distribution. This has a direct impact on max memory used|no, defaults to 1000| + +### Post Aggregators + +Users can query for a set of quantiles using the `quantilesFromDDSketch` post-aggregator on the sketches created by the `ddSketch` aggregators. + +```json Review Comment: can you add table description for the field in post aggregator? ########## extensions-contrib/ddsketch/src/main/java/org/apache/druid/query/aggregation/ddsketch/DDSketchAggregatorFactory.java: ########## @@ -0,0 +1,339 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.ddsketch; + +import com.datadoghq.sketch.ddsketch.DDSketch; +import com.datadoghq.sketch.ddsketch.DDSketches; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.curator.shaded.com.google.common.math.IntMath; +import org.apache.druid.query.aggregation.AggregateCombiner; +import org.apache.druid.query.aggregation.Aggregator; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.AggregatorFactoryNotMergeableException; +import org.apache.druid.query.aggregation.AggregatorUtil; +import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.query.aggregation.ObjectAggregateCombiner; +import org.apache.druid.query.cache.CacheKeyBuilder; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.column.ColumnType; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Objects; + +/** + * Aggregation operations over the ddsketch quantile sketch + * available on <a href="https://github.com/DataDog/sketches-java">github</a> and described + * in the paper + * <a href="https://blog.acolyer.org/2019/09/06/ddsketch/"> + * Computing relative error quantiles using ddsketch</a>. + * <p> + */ +@JsonTypeName(DDSketchAggregatorFactory.TYPE_NAME) +public class DDSketchAggregatorFactory extends AggregatorFactory +{ + // Default relative error + public static final double DEFAULT_RELATIVE_ERROR = 0.01; + + // Default num bins + public static final int DEFAULT_NUM_BINS = 1000; + + @Nonnull + private final String name; + @Nonnull + private final String fieldName; + + private final double relativeError; + + private final int numBins; + + private final byte cacheTypeId; + + public static final String TYPE_NAME = "ddSketch"; + public static final ColumnType TYPE = ColumnType.ofComplex(TYPE_NAME); + + @JsonCreator + public DDSketchAggregatorFactory( + @JsonProperty("name") final String name, + @JsonProperty("fieldName") final String fieldName, + @JsonProperty("relativeError") final Double relativeError, + @JsonProperty("numBins") final Integer numBins + ) + { + this(name, fieldName, relativeError, numBins, AggregatorUtil.DDSKETCH_CACHE_TYPE_ID); + } + + DDSketchAggregatorFactory( + final String name, + final String fieldName, + @Nullable final Double relativeError, + @Nullable final Integer numBins, + final byte cacheTypeId + ) + { + this.name = Objects.requireNonNull(name, "Must have a valid, non-null aggregator name"); + this.fieldName = Objects.requireNonNull(fieldName, "Parameter fieldName must be specified"); + this.relativeError = relativeError == null ? DEFAULT_RELATIVE_ERROR : relativeError; + this.numBins = numBins == null ? DEFAULT_NUM_BINS : numBins; + this.cacheTypeId = cacheTypeId; + } + + + @Override + public byte[] getCacheKey() + { + return new CacheKeyBuilder( + cacheTypeId + ).appendString(fieldName).appendDouble(relativeError).appendInt(numBins).build(); + } + + + @Override + public Aggregator factorize(ColumnSelectorFactory metricFactory) + { + return new DDSketchAggregator(metricFactory.makeColumnValueSelector(fieldName), relativeError, numBins); + } + + @Override + public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) + { + return new DDSketchBufferAggregator(metricFactory.makeColumnValueSelector(fieldName), relativeError, numBins); + } + + public static final Comparator<DDSketch> COMPARATOR = Comparator.nullsFirst( + Comparator.comparingLong(a -> a.serializedSize()) + ); + + @Override + public Comparator getComparator() + { + return COMPARATOR; + } + + @Override + public Object combine(@Nullable Object lhs, @Nullable Object rhs) + { + if (lhs == null) { + return rhs; + } + if (rhs == null) { + return lhs; + } + DDSketch union = (DDSketch) lhs; + union.mergeWith((DDSketch) rhs); + return union; + } + + @Override + public AggregatorFactory getCombiningFactory() + { + return new DDSketchAggregatorFactory(name, name, relativeError, numBins); + } + + @Override + public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException + { + if (other.getName().equals(this.getName()) && this.getClass() == other.getClass()) { + return getCombiningFactory(); + } else { + throw new AggregatorFactoryNotMergeableException(this, other); + } + } + + @Override + public List<AggregatorFactory> getRequiredColumns() + { + return Collections.singletonList( + new DDSketchAggregatorFactory( + name, + fieldName, + relativeError, + numBins + ) + ); + } + + @Override + public Object deserialize(Object serializedSketch) + { + return DDSketchUtils.deserialize(serializedSketch); + } + + @Nullable + @Override + public Object finalizeComputation(@Nullable Object object) + { + return object == null ? null : ((DDSketch) object).getCount(); + } + + @Override + @JsonProperty + public String getName() + { + return name; + } + + @JsonProperty + public String getFieldName() + { + return fieldName; + } + + @JsonProperty + public double getRelativeError() + { + return relativeError; + } + + @JsonProperty + public int getNumBins() + { + return numBins; + } + + @Override + public List<String> requiredFields() + { + return Collections.singletonList(fieldName); + } + + /** + * actual type is {@link DDSketch} + */ + @Override + public ColumnType getIntermediateType() + { + return TYPE; + } + + @Override + public ColumnType getResultType() + { + return TYPE; + } + + /* + * Each bounded lower collapsing store yields a max size of numBins * 8 bytes (size Of Double) in terms of size. + * Since the sketch contains a store for positive values and negative values, a fully filled sketch at maximum would contain: + * 2 * numBins * 8Bytes for storage. Other tracked members of the serialized sketch are constant, + * so we add 1k as buffer for these members. These members include mapping reconstruction, and zero counts. + * These are tracked through doubles and integers and do not increase in size as the sketch accepts new values and merged. + * + */ + @Override + public int getMaxIntermediateSize() + { + return IntMath.checkedMultiply(numBins, Double.BYTES * 2) // Postive + Negative Stores + + Double.BYTES // zeroCount + + Double.BYTES // gamma + + Double.BYTES // indexOffset + + Integer.BYTES // interpolationEnum + + 12; // collective protoscope descriptor max sizes + + } + + @Override + public AggregatorFactory withName(String newName) + { + return new DDSketchAggregatorFactory(newName, getFieldName(), getRelativeError(), getNumBins(), cacheTypeId); + } + + @Override + public AggregateCombiner<DDSketch> makeAggregateCombiner() + { + return new ObjectAggregateCombiner<DDSketch>() + { + private DDSketch combined = DDSketches.collapsingLowestDense(relativeError, numBins); + + @Override + public void reset(final ColumnValueSelector selector) + { + combined.clear(); + fold(selector); + } + + @Override + public void fold(final ColumnValueSelector selector) + { + DDSketch other = (DDSketch) selector.getObject(); + if (other == null) { + return; + } + if (combined == null) { + combined = DDSketches.collapsingLowestDense(relativeError, numBins); + } Review Comment: this seems unnecessary since combined will never be null. ########## extensions-contrib/ddsketch/pom.xml: ########## @@ -0,0 +1,188 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>druid</artifactId> + <groupId>org.apache.druid</groupId> + <version>28.0.0-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + <groupId>org.apache.druid.extensions.contrib</groupId> + <artifactId>druid-ddsketch</artifactId> + <name>druid-ddsketch</name> + <description>Druid extension for generating ddsketch backed sketches</description> + + <dependencies> + <dependency> + <groupId>com.datadoghq</groupId> + <artifactId>sketches-java</artifactId> + <version>0.8.2</version> + </dependency> + <dependency> + <groupId>org.apache.druid</groupId> + <artifactId>druid-processing</artifactId> + <version>${project.parent.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + <version>${protobuf.version}</version> + </dependency> + <dependency> + <groupId>com.google.inject</groupId> + <artifactId>guice</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.google.code.findbugs</groupId> + <artifactId>jsr305</artifactId> + <version>2.0.1</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>31.1-jre</version> Review Comment: need not be hardcoded here. ########## extensions-contrib/ddsketch/src/main/java/org/apache/druid/query/aggregation/ddsketch/DDSketchAggregatorFactory.java: ########## @@ -0,0 +1,339 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.ddsketch; + +import com.datadoghq.sketch.ddsketch.DDSketch; +import com.datadoghq.sketch.ddsketch.DDSketches; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.curator.shaded.com.google.common.math.IntMath; +import org.apache.druid.query.aggregation.AggregateCombiner; +import org.apache.druid.query.aggregation.Aggregator; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.AggregatorFactoryNotMergeableException; +import org.apache.druid.query.aggregation.AggregatorUtil; +import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.query.aggregation.ObjectAggregateCombiner; +import org.apache.druid.query.cache.CacheKeyBuilder; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.column.ColumnType; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Objects; + +/** + * Aggregation operations over the ddsketch quantile sketch + * available on <a href="https://github.com/DataDog/sketches-java">github</a> and described + * in the paper + * <a href="https://blog.acolyer.org/2019/09/06/ddsketch/"> + * Computing relative error quantiles using ddsketch</a>. + * <p> + */ +@JsonTypeName(DDSketchAggregatorFactory.TYPE_NAME) +public class DDSketchAggregatorFactory extends AggregatorFactory +{ + // Default relative error + public static final double DEFAULT_RELATIVE_ERROR = 0.01; + + // Default num bins + public static final int DEFAULT_NUM_BINS = 1000; + + @Nonnull + private final String name; + @Nonnull + private final String fieldName; + + private final double relativeError; + + private final int numBins; + + private final byte cacheTypeId; + + public static final String TYPE_NAME = "ddSketch"; + public static final ColumnType TYPE = ColumnType.ofComplex(TYPE_NAME); + + @JsonCreator + public DDSketchAggregatorFactory( + @JsonProperty("name") final String name, + @JsonProperty("fieldName") final String fieldName, + @JsonProperty("relativeError") final Double relativeError, + @JsonProperty("numBins") final Integer numBins + ) + { + this(name, fieldName, relativeError, numBins, AggregatorUtil.DDSKETCH_CACHE_TYPE_ID); + } + + DDSketchAggregatorFactory( + final String name, + final String fieldName, + @Nullable final Double relativeError, + @Nullable final Integer numBins, + final byte cacheTypeId + ) + { + this.name = Objects.requireNonNull(name, "Must have a valid, non-null aggregator name"); + this.fieldName = Objects.requireNonNull(fieldName, "Parameter fieldName must be specified"); + this.relativeError = relativeError == null ? DEFAULT_RELATIVE_ERROR : relativeError; + this.numBins = numBins == null ? DEFAULT_NUM_BINS : numBins; + this.cacheTypeId = cacheTypeId; + } + + + @Override + public byte[] getCacheKey() + { + return new CacheKeyBuilder( + cacheTypeId + ).appendString(fieldName).appendDouble(relativeError).appendInt(numBins).build(); + } + + + @Override + public Aggregator factorize(ColumnSelectorFactory metricFactory) + { + return new DDSketchAggregator(metricFactory.makeColumnValueSelector(fieldName), relativeError, numBins); + } + + @Override + public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) + { + return new DDSketchBufferAggregator(metricFactory.makeColumnValueSelector(fieldName), relativeError, numBins); + } + + public static final Comparator<DDSketch> COMPARATOR = Comparator.nullsFirst( + Comparator.comparingLong(a -> a.serializedSize()) + ); + + @Override + public Comparator getComparator() + { + return COMPARATOR; + } + + @Override + public Object combine(@Nullable Object lhs, @Nullable Object rhs) + { + if (lhs == null) { + return rhs; + } + if (rhs == null) { + return lhs; + } + DDSketch union = (DDSketch) lhs; + union.mergeWith((DDSketch) rhs); + return union; + } + + @Override + public AggregatorFactory getCombiningFactory() + { + return new DDSketchAggregatorFactory(name, name, relativeError, numBins); + } + + @Override + public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException + { + if (other.getName().equals(this.getName()) && this.getClass() == other.getClass()) { + return getCombiningFactory(); + } else { + throw new AggregatorFactoryNotMergeableException(this, other); + } + } + + @Override + public List<AggregatorFactory> getRequiredColumns() + { + return Collections.singletonList( + new DDSketchAggregatorFactory( + name, + fieldName, + relativeError, + numBins + ) + ); + } + + @Override + public Object deserialize(Object serializedSketch) + { + return DDSketchUtils.deserialize(serializedSketch); + } + + @Nullable + @Override + public Object finalizeComputation(@Nullable Object object) + { + return object == null ? null : ((DDSketch) object).getCount(); + } + + @Override + @JsonProperty + public String getName() + { + return name; + } + + @JsonProperty + public String getFieldName() + { + return fieldName; + } + + @JsonProperty + public double getRelativeError() + { + return relativeError; + } + + @JsonProperty + public int getNumBins() + { + return numBins; + } + + @Override + public List<String> requiredFields() + { + return Collections.singletonList(fieldName); + } + + /** + * actual type is {@link DDSketch} + */ + @Override + public ColumnType getIntermediateType() + { + return TYPE; + } + + @Override + public ColumnType getResultType() + { + return TYPE; + } + + /* + * Each bounded lower collapsing store yields a max size of numBins * 8 bytes (size Of Double) in terms of size. + * Since the sketch contains a store for positive values and negative values, a fully filled sketch at maximum would contain: + * 2 * numBins * 8Bytes for storage. Other tracked members of the serialized sketch are constant, + * so we add 1k as buffer for these members. These members include mapping reconstruction, and zero counts. Review Comment: where are you adding this buffer? And is this something you have measured? ########## docs/development/extensions-contrib/ddsketch-quantiles.md: ########## @@ -0,0 +1,111 @@ +--- +id: ddsketch-quantiles +title: "DDSketches for Approximate Quantiles module" +--- + +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> + + +This module provides aggregators for approximate quantile queries using the [DDSketch](https://github.com/datadog/sketches-java) library. The DDSketch library provides a fast, and fully-mergeable quantile sketch with relative error. If the true quantile is 100, a sketch with relative error of 1% guarantees a quantile value between 101 and 99. This is important and highly valuable behavior for long tail distributions. + +To use this Apache Druid extension, [include](../../configuration/extensions.md#loading-extensions) in the extensions load list. + +### Aggregator + +The result of the aggregation is a DDSketch that is the union of all sketches either built from raw data or read from the segments. The single number that is returned represents the total number of included data points. The default aggregator type of `ddSketch` uses the collapsingLowestDense strategy for storing and merging sketch. This means that in favor of keeping the highest values represented at the highest accuracy, the sketch will collapse and merge lower, smaller values in the sketch. Collapsed bins will lose accuracy guarantees. The default number of bins is 1000. Sketches can only be merged when using the same relativeError values. + +The `ddSketch` aggregator operates over raw data and precomputed sketches. + +```json +{ + "type" : "ddSketch", + "name" : <output_name>, + "fieldName" : <input_name>, + "relativeError" : <double(0, 1)>, + "numBins": <int> + } +``` + +|property|description|required?| +|--------|-----------|---------| +|type|Must be "ddSketch" |yes| +|name|A String for the output (result) name of the calculation.|yes| +|fieldName|A String for the name of the input field (can contain sketches or raw numeric values).|yes| +|relativeError||Describes the precision in which to store the sketch. Must be a number between 0 and 1.|no, defaults to 0.01 (1% error)| +|numBins|Total number of bins the sketch is allowed to use to describe the distribution. This has a direct impact on max memory used|no, defaults to 1000| Review Comment: can you add some more details about the trade-offs? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
