This is an automated email from the ASF dual-hosted git repository. frankchen pushed a commit to branch spdi-31 in repository https://gitbox.apache.org/repos/asf/druid.git
commit 49ca9eddc171a8f4c052fe72eb9cc095aea7b3ce Author: Phua Guan Wei <[email protected]> AuthorDate: Mon Jan 27 06:32:29 2025 +0000 Add Bitmap64 and Bitmap32 extensions --- distribution/pom.xml | 2 + extensions-contrib/druid-exactcount/README.md | 238 +++++++++++++++++++++ extensions-contrib/druid-exactcount/pom.xml | 182 ++++++++++++++++ .../exactcount/bitmap32/Bitmap32Counter.java | 31 +++ .../Bitmap32ExactCountAggregatorFactory.java | 210 ++++++++++++++++++ .../Bitmap32ExactCountBuildAggregator.java | 68 ++++++ .../Bitmap32ExactCountBuildAggregatorFactory.java | 73 +++++++ .../Bitmap32ExactCountBuildBufferAggregator.java | 134 ++++++++++++ .../Bitmap32ExactCountBuildComplexMetricSerde.java | 52 +++++ .../Bitmap32ExactCountMergeAggregator.java | 68 ++++++ .../Bitmap32ExactCountMergeAggregatorFactory.java | 78 +++++++ .../Bitmap32ExactCountMergeBufferAggregator.java | 107 +++++++++ .../Bitmap32ExactCountMergeComplexMetricSerde.java | 108 ++++++++++ .../bitmap32/Bitmap32ExactCountModule.java | 119 +++++++++++ .../bitmap32/Bitmap32ExactCountObjectStrategy.java | 66 ++++++ .../bitmap32/Bitmap32ExactCountPostAggregator.java | 138 ++++++++++++ .../bitmap32/RoaringBitmap32Counter.java | 102 +++++++++ .../sql/Bitmap32ExactCountSqlAggregator.java | 162 ++++++++++++++ .../exactcount/bitmap64/Bitmap64Counter.java | 33 +++ .../Bitmap64ExactCountAggregatorFactory.java | 210 ++++++++++++++++++ .../Bitmap64ExactCountBuildAggregator.java | 68 ++++++ .../Bitmap64ExactCountBuildAggregatorFactory.java | 73 +++++++ .../Bitmap64ExactCountBuildBufferAggregator.java | 134 ++++++++++++ .../Bitmap64ExactCountBuildComplexMetricSerde.java | 52 +++++ .../Bitmap64ExactCountMergeAggregator.java | 68 ++++++ .../Bitmap64ExactCountMergeAggregatorFactory.java | 76 +++++++ .../Bitmap64ExactCountMergeBufferAggregator.java | 107 +++++++++ .../Bitmap64ExactCountMergeComplexMetricSerde.java | 106 +++++++++ .../bitmap64/Bitmap64ExactCountModule.java | 72 +++++++ .../bitmap64/Bitmap64ExactCountObjectStrategy.java | 64 ++++++ .../bitmap64/Bitmap64ExactCountPostAggregator.java | 138 ++++++++++++ .../bitmap64/RoaringBitmap64Counter.java | 97 +++++++++ .../RoaringBitmap64CounterJsonSerializer.java | 42 ++++ .../sql/Bitmap64ExactCountSqlAggregator.java | 163 ++++++++++++++ .../org.apache.druid.initialization.DruidModule | 17 ++ pom.xml | 1 + .../druid/query/aggregation/AggregatorUtil.java | 9 + .../query/aggregation/post/PostAggregatorIds.java | 2 + 38 files changed, 3470 insertions(+) diff --git a/distribution/pom.xml b/distribution/pom.xml index de18b289f6b..240a897726f 100644 --- a/distribution/pom.xml +++ b/distribution/pom.xml @@ -458,6 +458,8 @@ <argument>org.apache.druid.extensions.contrib:druid-spectator-histogram</argument> <argument>-c</argument> <argument>org.apache.druid.extensions.contrib:druid-rabbit-indexing-service</argument> + <argument>-c</argument> + <argument>org.apache.druid.extensions:druid-exactcount</argument> </arguments> </configuration> </execution> diff --git a/extensions-contrib/druid-exactcount/README.md b/extensions-contrib/druid-exactcount/README.md new file mode 100644 index 00000000000..3d9a50af872 --- /dev/null +++ b/extensions-contrib/druid-exactcount/README.md @@ -0,0 +1,238 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> + +This module provides exact distinct count for long type column. + +Usage for `Bitmap64ExactCountBuild` and `Bitmap64ExactCountMerge`: +Kafka ingestion task spec: +``` +{ + "type": "kafka", + "spec": { + "dataSchema": { + "dataSource": "ticker_event_bitmap64_exact_count_rollup", + "timestampSpec": { + "column": "timestamp", + "format": "millis", + "missingValue": null + }, + "dimensionsSpec": { + "dimensions": [ + { + "type": "string", + "name": "key" + } + ], + "dimensionExclusions": [] + }, + "metricsSpec": [ + { + "type": "Bitmap64ExactCountBuild", + "name": "count", + "fieldName": "value" + } + ], + "granularitySpec": { + "type": "uniform", + "segmentGranularity": "HOUR", + "queryGranularity": "HOUR", + "rollup": true, + "intervals": null + }, + "transformSpec": { + "filter": null, + "transforms": [] + } + }, + "ioConfig": { + "topic": "ticker_event", + "inputFormat": { + "type": "json", + "flattenSpec": { + "useFieldDiscovery": true, + "fields": [] + }, + "featureSpec": {} + }, + "replicas": 1, + "taskCount": 1, + "taskDuration": "PT3600S", + "consumerProperties": { + "bootstrap.servers": "localhost:9092" + }, + "pollTimeout": 100, + "startDelay": "PT5S", + "period": "PT30S", + "useEarliestOffset": false, + "completionTimeout": "PT1800S", + "lateMessageRejectionPeriod": null, + "earlyMessageRejectionPeriod": null, + "lateMessageRejectionStartDateTime": null, + "stream": "ticker_event", + "useEarliestSequenceNumber": false, + "type": "kafka" + } + } +} +``` +Query from rollup datasource: +``` +{ + "queryType": "timeseries", + "dataSource": { + "type": "table", + "name": "ticker_event_bitmap64_exact_count_rollup" + }, + "intervals": { + "type": "intervals", + "intervals": [ + "2020-09-13T06:35:35.000Z/146140482-04-24T15:36:27.903Z" + ] + }, + "descending": false, + "virtualColumns": [], + "filter": null, + "granularity": { + "type": "all" + }, + "aggregations": [ + { + "type": "Bitmap64ExactCountMerge", + "name": "a0", + "fieldName": "count" + } + ], + "postAggregations": [], + "limit": 2147483647 +} +``` +query with post aggregator: +``` +{ + "queryType": "timeseries", + "dataSource": { + "type": "table", + "name": "ticker_event_bitmap64_exact_count_rollup" + }, + "intervals": { + "type": "intervals", + "intervals": [ + "2020-09-13T06:35:35.000Z/146140482-04-24T15:36:27.903Z" + ] + }, + "descending": false, + "virtualColumns": [], + "filter": null, + "granularity": { + "type": "all" + }, + "aggregations": [ + { + "type": "count", + "name": "cnt" + }, + { + "type": "Bitmap64ExactCountMerge", + "name": "a0", + "fieldName": "count" + } + ], + "postAggregations": [ + { + "type": "arithmetic", + "fn": "/", + "fields": [ + { + "type": "bitmap64ExactCountCardinality", + "name": "a0", + "fieldName": "a0" + }, + { + "type": "fieldAccess", + "name": "cnt", + "fieldName": "cnt" + } + ], + "name": "rollup_rate" + } + ], + "limit": 2147483647 +} +``` +sql query: +``` +SELECT "key", BITMAP64_EXACT_COUNT("count") +FROM "ticker_event_bitmap64_exact_count_rollup" +WHERE "__time" >= CURRENT_TIMESTAMP - INTERVAL '1' DAY +GROUP BY key +``` + +Note: this `longExactCount` aggregator is recommended to use in `timeseries` type query though it also works for `topN` +and `groupBy` query. eg: +``` +{ + "queryType": "timeseries", + "dataSource": "ticker_event", + "intervals": { + "type": "intervals", + "intervals": [ + "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" + ] + }, + "descending": false, + "virtualColumns": [], + "filter": null, + "granularity": { + "type": "all" + }, + "aggregations": [ + { + "type": "longExactCount", + "name": "exactCount", + "fieldName": "value", + "expression": null + } + ] +} +``` +``` +{ + "queryType": "groupBy", + "dataSource": "ticker_event", + "dimensions": ["key"], + "intervals": { + "type": "intervals", + "intervals": [ + "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" + ] + }, + "granularity": { + "type": "all" + }, + "aggregations": [ + { + "type": "longExactCount", + "name": "a0", + "fieldName": "value" + } + ] +} +``` + + diff --git a/extensions-contrib/druid-exactcount/pom.xml b/extensions-contrib/druid-exactcount/pom.xml new file mode 100644 index 00000000000..b9dcb90695e --- /dev/null +++ b/extensions-contrib/druid-exactcount/pom.xml @@ -0,0 +1,182 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> + +<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>org.apache.druid.extensions</groupId> + <artifactId>druid-exactcount</artifactId> + <name>druid-exactcount</name> + <description>Druid Aggregators for exact dinstinct count</description> + + <parent> + <groupId>org.apache.druid</groupId> + <artifactId>druid</artifactId> + <version>27.0.0-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + + <dependencies> + <dependency> + <groupId>org.apache.calcite</groupId> + <artifactId>calcite-core</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-math3</artifactId> + </dependency> + <dependency> + <groupId>org.apache.druid</groupId> + <artifactId>druid-server</artifactId> + <version>${project.parent.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.druid</groupId> + <artifactId>druid-processing</artifactId> + <version>${project.parent.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.druid</groupId> + <artifactId>druid-sql</artifactId> + <version>${project.parent.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.google.code.findbugs</groupId> + <artifactId>jsr305</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.google.inject</groupId> + <artifactId>guice</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>commons-codec</groupId> + <artifactId>commons-codec</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>it.unimi.dsi</groupId> + <artifactId>fastutil</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.datatype</groupId> + <artifactId>jackson-datatype-guava</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.datatype</groupId> + <artifactId>jackson-datatype-joda</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.dataformat</groupId> + <artifactId>jackson-dataformat-smile</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.jaxrs</groupId> + <artifactId>jackson-jaxrs-json-provider</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.jaxrs</groupId> + <artifactId>jackson-jaxrs-smile-provider</artifactId> + <scope>provided</scope> + </dependency> + + <!-- Test Dependencies --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.easymock</groupId> + <artifactId>easymock</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.druid</groupId> + <artifactId>druid-processing</artifactId> + <version>${project.parent.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.druid</groupId> + <artifactId>druid-server</artifactId> + <version>${project.parent.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.druid</groupId> + <artifactId>druid-sql</artifactId> + <version>${project.parent.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> diff --git a/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap32/Bitmap32Counter.java b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap32/Bitmap32Counter.java new file mode 100644 index 00000000000..542b69ea400 --- /dev/null +++ b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap32/Bitmap32Counter.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.exactcount.bitmap32; + +public interface Bitmap32Counter +{ + void add(int value); + + long getCardinality(); + + Bitmap32Counter fold(Bitmap32Counter rhs); + + byte[] toBytes(); +} diff --git a/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap32/Bitmap32ExactCountAggregatorFactory.java b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap32/Bitmap32ExactCountAggregatorFactory.java new file mode 100644 index 00000000000..c0f3cb7d003 --- /dev/null +++ b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap32/Bitmap32ExactCountAggregatorFactory.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.exactcount.bitmap32; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.query.aggregation.AggregateCombiner; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.ObjectAggregateCombiner; +import org.apache.druid.query.cache.CacheKeyBuilder; +import org.apache.druid.segment.ColumnValueSelector; + +import javax.annotation.Nullable; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Objects; + +/** + * Base class for both build and merge factories + */ +@SuppressWarnings("NullableProblems") +public abstract class Bitmap32ExactCountAggregatorFactory extends AggregatorFactory +{ + static final int MAX_INTERMEDIATE_SIZE = 5 * 1024 * 1024; // 5 MB + static final Comparator<Bitmap32Counter> COMPARATOR = + Comparator.nullsFirst(Comparator.comparingLong(Bitmap32Counter::getCardinality)); + + private final String name; + private final String fieldName; + + Bitmap32ExactCountAggregatorFactory( + final String name, + final String fieldName + ) + { + this.name = Objects.requireNonNull(name); + this.fieldName = Objects.requireNonNull(fieldName); + } + + @Override + @JsonProperty + public String getName() + { + return name; + } + + @JsonProperty + public String getFieldName() + { + return fieldName; + } + + @Override + public List<String> requiredFields() + { + return Collections.singletonList(fieldName); + } + + /** + * This is a convoluted way to return a list of input field names this aggregator needs. + * Currently the returned factories are only used to obtain a field name by calling getName() method. + */ + @Override + public List<AggregatorFactory> getRequiredColumns() + { + return Collections.singletonList( + new Bitmap32ExactCountBuildAggregatorFactory(fieldName, fieldName) + ); + } + + @Override + public Bitmap32Counter deserialize(final Object object) + { + return Bitmap32ExactCountMergeComplexMetricSerde.deserializeRoaringBitmap32Counter(object); + } + + @Override + public Bitmap32Counter combine(final Object objectA, final Object objectB) + { + if (objectB == null) { + return (Bitmap32Counter) objectA; + } + if (objectA == null) { + return (Bitmap32Counter) objectB; + } + ((Bitmap32Counter) objectA).fold((Bitmap32Counter) objectB); + return (Bitmap32Counter) objectA; + } + + @Override + public AggregateCombiner makeAggregateCombiner() + { + return new ObjectAggregateCombiner<Bitmap32Counter>() + { + private Bitmap32Counter union = new RoaringBitmap32Counter(false); + + @Override + public void reset(final ColumnValueSelector selector) + { + union = new RoaringBitmap32Counter(false); + fold(selector); + } + + @Override + public void fold(final ColumnValueSelector selector) + { + final Bitmap32Counter bitmap32Counter = (Bitmap32Counter) selector.getObject(); + union.fold(bitmap32Counter); + } + + @Nullable + @Override + public Bitmap32Counter getObject() + { + return union; + } + + @Override + public Class<Bitmap32Counter> classOfObject() + { + return Bitmap32Counter.class; + } + }; + } + + @Nullable + @Override + public Object finalizeComputation(@Nullable final Object object) + { + if (object == null) { + return null; + } + return ((Bitmap32Counter) object).getCardinality(); + } + + @Override + public Comparator<Bitmap32Counter> getComparator() + { + return COMPARATOR; + } + + @Override + public AggregatorFactory getCombiningFactory() + { + return new Bitmap32ExactCountMergeAggregatorFactory(getName(), getName()); + } + + @Override + public byte[] getCacheKey() + { + return new CacheKeyBuilder(getCacheTypeId()).appendString(name).appendString(fieldName).build(); + } + + @Override + public int getMaxIntermediateSize() + { + return MAX_INTERMEDIATE_SIZE; + } + + @Override + public boolean equals(final Object object) + { + if (this == object) { + return true; + } + if (object == null || !getClass().equals(object.getClass())) { + return false; + } + final Bitmap32ExactCountAggregatorFactory that = (Bitmap32ExactCountAggregatorFactory) object; + if (!name.equals(that.getName())) { + return false; + } + return fieldName.equals(that.getFieldName()); + } + + @Override + public int hashCode() + { + return Objects.hash(name, fieldName); + } + + @Override + public String toString() + { + return getClass().getSimpleName() + " {" + + " name=" + name + + ", fieldName=" + fieldName + + " }"; + } + + protected abstract byte getCacheTypeId(); + +} diff --git a/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap32/Bitmap32ExactCountBuildAggregator.java b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap32/Bitmap32ExactCountBuildAggregator.java new file mode 100644 index 00000000000..8af51bb04db --- /dev/null +++ b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap32/Bitmap32ExactCountBuildAggregator.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.exactcount.bitmap32; + +import org.apache.druid.query.aggregation.Aggregator; +import org.apache.druid.segment.BaseLongColumnValueSelector; + +import javax.annotation.Nullable; + +public class Bitmap32ExactCountBuildAggregator implements Aggregator +{ + private BaseLongColumnValueSelector selector; + private Bitmap32Counter bitmap; + + public Bitmap32ExactCountBuildAggregator(BaseLongColumnValueSelector selector) + { + this.selector = selector; + this.bitmap = new RoaringBitmap32Counter(true); + } + + @Override + public void aggregate() + { + bitmap.add(Math.toIntExact(selector.getLong())); + } + + @Nullable + @Override + public Object get() + { + return bitmap; + } + + @Override + public float getFloat() + { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public long getLong() + { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public void close() + { + bitmap = null; + } +} diff --git a/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap32/Bitmap32ExactCountBuildAggregatorFactory.java b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap32/Bitmap32ExactCountBuildAggregatorFactory.java new file mode 100644 index 00000000000..3df594335dc --- /dev/null +++ b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap32/Bitmap32ExactCountBuildAggregatorFactory.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.exactcount.bitmap32; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.query.aggregation.Aggregator; +import org.apache.druid.query.aggregation.AggregatorUtil; +import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.column.ColumnType; + +@SuppressWarnings("NullableProblems") +public class Bitmap32ExactCountBuildAggregatorFactory extends Bitmap32ExactCountAggregatorFactory +{ + public static final ColumnType TYPE = ColumnType.ofComplex(Bitmap32ExactCountModule.BUILD_TYPE_NAME); + + @JsonCreator + public Bitmap32ExactCountBuildAggregatorFactory( + @JsonProperty("name") String name, + @JsonProperty("fieldName") String fieldName + ) + { + super(name, fieldName); + } + + @Override + protected byte getCacheTypeId() + { + return AggregatorUtil.BITMAP32_EXACT_COUNT_BUILD_CACHE_TYPE_ID; + } + + @Override + public Aggregator factorize(ColumnSelectorFactory columnSelectorFactory) + { + return new Bitmap32ExactCountBuildAggregator(columnSelectorFactory.makeColumnValueSelector(getFieldName())); + } + + @Override + public BufferAggregator factorizeBuffered(ColumnSelectorFactory columnSelectorFactory) + { + return new Bitmap32ExactCountBuildBufferAggregator(columnSelectorFactory.makeColumnValueSelector(getFieldName())); + } + + @Override + public ColumnType getIntermediateType() + { + return TYPE; + } + + @Override + public ColumnType getResultType() + { + return ColumnType.LONG; + } +} diff --git a/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap32/Bitmap32ExactCountBuildBufferAggregator.java b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap32/Bitmap32ExactCountBuildBufferAggregator.java new file mode 100644 index 00000000000..9699048cbb7 --- /dev/null +++ b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap32/Bitmap32ExactCountBuildBufferAggregator.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.exactcount.bitmap32; + +import it.unimi.dsi.fastutil.ints.Int2ObjectMap; +import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; +import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.segment.BaseLongColumnValueSelector; + +import java.nio.ByteBuffer; +import java.util.IdentityHashMap; + +public class Bitmap32ExactCountBuildBufferAggregator implements BufferAggregator +{ + private final BaseLongColumnValueSelector selector; + private final IdentityHashMap<ByteBuffer, Int2ObjectMap<Bitmap32Counter>> collectors = new IdentityHashMap<>(); + + public Bitmap32ExactCountBuildBufferAggregator(BaseLongColumnValueSelector selector) + { + this.selector = selector; + } + + @Override + public void init(ByteBuffer buf, int position) + { + createNewCollector(buf, position); + } + + @Override + public void aggregate(ByteBuffer buf, int position) + { + final int oldPosition = buf.position(); + try { + buf.position(position); + Bitmap32Counter bitmap32Counter = getOrCreateCollector(buf, position); + bitmap32Counter.add(Math.toIntExact(selector.getLong())); + } + finally { + buf.position(oldPosition); + } + } + + @Override + public Object get(ByteBuffer buf, int position) + { + return getOrCreateCollector(buf, position); + } + + @Override + public long getLong(ByteBuffer buf, int position) + { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public double getDouble(ByteBuffer buf, int position) + { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public float getFloat(ByteBuffer buf, int position) + { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public void close() + { + + } + + @Override + public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer) + { + createNewCollector(newBuffer, newPosition); + Bitmap32Counter collector = collectors.get(oldBuffer).get(oldPosition); + putCollectors(newBuffer, newPosition, collector); + Int2ObjectMap<Bitmap32Counter> collectorMap = collectors.get(oldBuffer); + if (collectorMap != null) { + collectorMap.remove(oldPosition); + if (collectorMap.isEmpty()) { + collectors.remove(oldBuffer); + } + } + } + + private void putCollectors(final ByteBuffer buffer, final int position, final Bitmap32Counter collector) + { + Int2ObjectMap<Bitmap32Counter> map = collectors.computeIfAbsent(buffer, buf -> new Int2ObjectOpenHashMap<>()); + map.put(position, collector); + } + + private Bitmap32Counter getOrCreateCollector(ByteBuffer buf, int position) + { + Int2ObjectMap<Bitmap32Counter> collectMap = collectors.get(buf); + Bitmap32Counter bitmap32Counter = collectMap != null ? collectMap.get(position) : null; + if (bitmap32Counter != null) { + return bitmap32Counter; + } + + return createNewCollector(buf, position); + } + + private Bitmap32Counter createNewCollector(ByteBuffer buf, int position) + { + buf.position(position); + Bitmap32Counter bitmap32Counter = new RoaringBitmap32Counter(false); + Int2ObjectMap<Bitmap32Counter> collectorMap = collectors.get(buf); + if (collectorMap == null) { + collectorMap = new Int2ObjectOpenHashMap<>(); + collectors.put(buf, collectorMap); + } + collectorMap.put(position, bitmap32Counter); + return bitmap32Counter; + } +} diff --git a/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap32/Bitmap32ExactCountBuildComplexMetricSerde.java b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap32/Bitmap32ExactCountBuildComplexMetricSerde.java new file mode 100644 index 00000000000..3989fd381ff --- /dev/null +++ b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap32/Bitmap32ExactCountBuildComplexMetricSerde.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.exactcount.bitmap32; + +import org.apache.druid.data.input.InputRow; +import org.apache.druid.segment.serde.ComplexMetricExtractor; + +import javax.annotation.Nullable; + +public class Bitmap32ExactCountBuildComplexMetricSerde extends Bitmap32ExactCountMergeComplexMetricSerde +{ + private static final ComplexMetricExtractor EXTRACTOR = new ComplexMetricExtractor() + { + + @Override + public Class extractedClass() + { + return Object.class; + } + + @Nullable + @Override + public Object extractValue(InputRow inputRow, String metricName) + { + return inputRow.getRaw(metricName); + } + }; + + @Override + public ComplexMetricExtractor getExtractor() + { + return EXTRACTOR; + } + +} diff --git a/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap32/Bitmap32ExactCountMergeAggregator.java b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap32/Bitmap32ExactCountMergeAggregator.java new file mode 100644 index 00000000000..2ef158e72bc --- /dev/null +++ b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap32/Bitmap32ExactCountMergeAggregator.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.exactcount.bitmap32; + +import org.apache.druid.query.aggregation.Aggregator; +import org.apache.druid.segment.ColumnValueSelector; + +import javax.annotation.Nullable; + +public class Bitmap32ExactCountMergeAggregator implements Aggregator +{ + private final ColumnValueSelector<Bitmap32Counter> selector; + private Bitmap32Counter bitmap; + + public Bitmap32ExactCountMergeAggregator(ColumnValueSelector<Bitmap32Counter> selector) + { + this.selector = selector; + this.bitmap = new RoaringBitmap32Counter(false); + } + + @Override + public void aggregate() + { + bitmap.fold(selector.getObject()); + } + + @Nullable + @Override + public Object get() + { + return bitmap; + } + + @Override + public float getFloat() + { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public long getLong() + { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public void close() + { + bitmap = null; + } +} diff --git a/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap32/Bitmap32ExactCountMergeAggregatorFactory.java b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap32/Bitmap32ExactCountMergeAggregatorFactory.java new file mode 100644 index 00000000000..28df0a346f7 --- /dev/null +++ b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap32/Bitmap32ExactCountMergeAggregatorFactory.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.exactcount.bitmap32; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.query.aggregation.Aggregator; +import org.apache.druid.query.aggregation.AggregatorUtil; +import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.column.ColumnType; + +@SuppressWarnings("NullableProblems") +public class Bitmap32ExactCountMergeAggregatorFactory extends Bitmap32ExactCountAggregatorFactory +{ + public static final ColumnType TYPE = ColumnType.ofComplex(Bitmap32ExactCountModule.MERGE_TYPE_NAME); + + @JsonCreator + public Bitmap32ExactCountMergeAggregatorFactory( + @JsonProperty("name") String name, + @JsonProperty("fieldName") String fieldName + ) + { + super(name, fieldName); + } + + @Override + protected byte getCacheTypeId() + { + return AggregatorUtil.BITMAP32_EXACT_COUNT_MERGE_CACHE_TYPE_ID; + } + + @Override + public Aggregator factorize(ColumnSelectorFactory columnSelectorFactory) + { + @SuppressWarnings("unchecked") + ColumnValueSelector<Bitmap32Counter> selector = columnSelectorFactory.makeColumnValueSelector(getFieldName()); + return new Bitmap32ExactCountMergeAggregator(selector); + } + + @Override + public BufferAggregator factorizeBuffered(ColumnSelectorFactory columnSelectorFactory) + { + @SuppressWarnings("unchecked") + ColumnValueSelector<Bitmap32Counter> selector = columnSelectorFactory.makeColumnValueSelector(getFieldName()); + return new Bitmap32ExactCountMergeBufferAggregator(selector); + } + + @Override + public ColumnType getIntermediateType() + { + return TYPE; + } + + @Override + public ColumnType getResultType() + { + return ColumnType.LONG; + } +} diff --git a/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap32/Bitmap32ExactCountMergeBufferAggregator.java b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap32/Bitmap32ExactCountMergeBufferAggregator.java new file mode 100644 index 00000000000..1c626326712 --- /dev/null +++ b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap32/Bitmap32ExactCountMergeBufferAggregator.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.exactcount.bitmap32; + +import it.unimi.dsi.fastutil.ints.Int2ObjectMap; +import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; +import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.segment.ColumnValueSelector; + +import java.nio.ByteBuffer; +import java.util.IdentityHashMap; + +public class Bitmap32ExactCountMergeBufferAggregator implements BufferAggregator +{ + private final ColumnValueSelector<Bitmap32Counter> selector; + private final IdentityHashMap<ByteBuffer, Int2ObjectMap<Bitmap32Counter>> counterCache = new IdentityHashMap<>(); + + public Bitmap32ExactCountMergeBufferAggregator(ColumnValueSelector<Bitmap32Counter> selector) + { + this.selector = selector; + } + + @Override + public void init(ByteBuffer buf, int position) + { + RoaringBitmap32Counter emptyCounter = new RoaringBitmap32Counter(false); + addToCache(buf, position, emptyCounter); + } + + @Override + public void aggregate(ByteBuffer buf, int position) + { + Object x = selector.getObject(); + if (x == null) { + return; + } + Bitmap32Counter bitmap32Counter = counterCache.get(buf).get(position); + bitmap32Counter.fold((RoaringBitmap32Counter) x); + } + + @Override + public Object get(ByteBuffer buf, int position) + { + return counterCache.get(buf).get(position); + } + + @Override + public long getLong(ByteBuffer buf, int position) + { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public double getDouble(ByteBuffer buf, int position) + { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public float getFloat(ByteBuffer buf, int position) + { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public void close() + { + + } + + @Override + public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer) + { + Bitmap32Counter counter = counterCache.get(oldBuffer).get(oldPosition); + addToCache(newBuffer, newPosition, counter); + Int2ObjectMap<Bitmap32Counter> counterMap = counterCache.get(oldBuffer); + if (counterMap != null) { + counterMap.remove(oldPosition); + if (counterMap.isEmpty()) { + counterCache.remove(oldBuffer); + } + } + } + + private void addToCache(final ByteBuffer buffer, final int position, final Bitmap32Counter counter) + { + Int2ObjectMap<Bitmap32Counter> map = counterCache.computeIfAbsent(buffer, buf -> new Int2ObjectOpenHashMap<>()); + map.put(position, counter); + } +} diff --git a/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap32/Bitmap32ExactCountMergeComplexMetricSerde.java b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap32/Bitmap32ExactCountMergeComplexMetricSerde.java new file mode 100644 index 00000000000..e9160cb5d9a --- /dev/null +++ b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap32/Bitmap32ExactCountMergeComplexMetricSerde.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.exactcount.bitmap32; + +import org.apache.druid.data.input.InputRow; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.segment.GenericColumnSerializer; +import org.apache.druid.segment.column.ColumnBuilder; +import org.apache.druid.segment.data.GenericIndexed; +import org.apache.druid.segment.data.ObjectStrategy; +import org.apache.druid.segment.serde.ComplexColumnPartSupplier; +import org.apache.druid.segment.serde.ComplexMetricExtractor; +import org.apache.druid.segment.serde.ComplexMetricSerde; +import org.apache.druid.segment.serde.LargeColumnSupportedComplexColumnSerializer; +import org.apache.druid.segment.writeout.SegmentWriteOutMedium; +import org.roaringbitmap.buffer.ImmutableRoaringBitmap; + +import java.nio.ByteBuffer; + +public class Bitmap32ExactCountMergeComplexMetricSerde extends ComplexMetricSerde +{ + + static RoaringBitmap32Counter deserializeRoaringBitmap32Counter(final Object object) + { + if (object instanceof String) { + return new RoaringBitmap32Counter( + new ImmutableRoaringBitmap(ByteBuffer.wrap(StringUtils.decodeBase64(StringUtils.toUtf8((String) object))))); + } else if (object instanceof byte[]) { + return new RoaringBitmap32Counter(new ImmutableRoaringBitmap(ByteBuffer.wrap((byte[]) object))); + } else if (object instanceof RoaringBitmap32Counter) { + return (RoaringBitmap32Counter) object; + } + throw new IAE("Object is not of a type that can be deserialized to an RoaringBitmap32Counter:" + object.getClass() + .getName()); + } + + @Override + public String getTypeName() + { + return Bitmap32ExactCountModule.TYPE_NAME; // must be common type name + } + + @Override + public ObjectStrategy getObjectStrategy() + { + return Bitmap32ExactCountObjectStrategy.STRATEGY; + } + + @Override + public ComplexMetricExtractor getExtractor() + { + return new ComplexMetricExtractor() + { + @Override + public Class<?> extractedClass() + { + return RoaringBitmap32Counter.class; + } + + @Override + public RoaringBitmap32Counter extractValue(final InputRow inputRow, final String metricName) + { + final Object object = inputRow.getRaw(metricName); + if (object == null) { + return null; + } + return deserializeRoaringBitmap32Counter(object); + } + }; + } + + @Override + public void deserializeColumn(final ByteBuffer buf, final ColumnBuilder columnBuilder) + { + columnBuilder.setComplexColumnSupplier( + new ComplexColumnPartSupplier( + getTypeName(), + GenericIndexed.read(buf, Bitmap32ExactCountObjectStrategy.STRATEGY, columnBuilder.getFileMapper()) + ) + ); + } + + // support large columns + @Override + public GenericColumnSerializer getSerializer(final SegmentWriteOutMedium segmentWriteOutMedium, final String column) + { + return LargeColumnSupportedComplexColumnSerializer.create(segmentWriteOutMedium, column, this.getObjectStrategy()); + } + +} diff --git a/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap32/Bitmap32ExactCountModule.java b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap32/Bitmap32ExactCountModule.java new file mode 100644 index 00000000000..9c8810c25d0 --- /dev/null +++ b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap32/Bitmap32ExactCountModule.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.exactcount.bitmap32; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.deser.std.StdDeserializer; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.fasterxml.jackson.databind.ser.std.StdSerializer; +import com.google.common.annotations.VisibleForTesting; +import com.google.inject.Binder; +import org.apache.druid.initialization.DruidModule; +import org.apache.druid.query.aggregation.exactcount.bitmap32.sql.Bitmap32ExactCountSqlAggregator; +import org.apache.druid.segment.serde.ComplexMetrics; +import org.apache.druid.sql.guice.SqlBindings; +import org.roaringbitmap.buffer.ImmutableRoaringBitmap; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; + +public class Bitmap32ExactCountModule implements DruidModule +{ + public static final String TYPE_NAME = "Bitmap32ExactCount"; // common type name to be associated with segment data + public static final String BUILD_TYPE_NAME = "Bitmap32ExactCountBuild"; + public static final String MERGE_TYPE_NAME = "Bitmap32ExactCountMerge"; + + @Override + public List<? extends Module> getJacksonModules() + { + return Collections.singletonList( + new SimpleModule("Bitmap32ExactCountModule") + .registerSubtypes( + new NamedType(Bitmap32ExactCountMergeAggregatorFactory.class, MERGE_TYPE_NAME), + new NamedType(Bitmap32ExactCountBuildAggregatorFactory.class, BUILD_TYPE_NAME), + new NamedType(Bitmap32ExactCountPostAggregator.class, "bitmap32ExactCountCardinality") + ) + .addSerializer( + RoaringBitmap32Counter.class, + new RoaringBitmap32CounterJsonSerializer() + ) + .addDeserializer( + RoaringBitmap32Counter.class, + new RoaringBitmap32CounterJsonDeserializer() + ) + ); + } + + @Override + public void configure(Binder binder) + { + registerSerde(); + SqlBindings.addAggregator(binder, Bitmap32ExactCountSqlAggregator.class); + } + + @VisibleForTesting + public static void registerSerde() + { + ComplexMetrics.registerSerde(TYPE_NAME, new Bitmap32ExactCountMergeComplexMetricSerde()); + ComplexMetrics.registerSerde(BUILD_TYPE_NAME, new Bitmap32ExactCountBuildComplexMetricSerde()); + ComplexMetrics.registerSerde(MERGE_TYPE_NAME, new Bitmap32ExactCountMergeComplexMetricSerde()); + } + + private static class RoaringBitmap32CounterJsonDeserializer extends StdDeserializer<RoaringBitmap32Counter> + { + + protected RoaringBitmap32CounterJsonDeserializer() + { + super(RoaringBitmap32Counter.class); + } + + @Override + public RoaringBitmap32Counter deserialize(JsonParser p, DeserializationContext ctxt) throws IOException + { + return new RoaringBitmap32Counter(new ImmutableRoaringBitmap(ByteBuffer.wrap(p.getBinaryValue()))); + } + } + + private static class RoaringBitmap32CounterJsonSerializer extends StdSerializer<RoaringBitmap32Counter> + { + protected RoaringBitmap32CounterJsonSerializer() + { + super(RoaringBitmap32Counter.class); + } + + @Override + public void serialize( + final RoaringBitmap32Counter bitmap32Counter, + final JsonGenerator jgen, + final SerializerProvider provider + ) + throws IOException + { + jgen.writeBinary(bitmap32Counter.toBytes()); + } + } +} diff --git a/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap32/Bitmap32ExactCountObjectStrategy.java b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap32/Bitmap32ExactCountObjectStrategy.java new file mode 100644 index 00000000000..9f598427559 --- /dev/null +++ b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap32/Bitmap32ExactCountObjectStrategy.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.exactcount.bitmap32; + +import org.apache.druid.segment.data.ObjectStrategy; +import org.roaringbitmap.buffer.ImmutableRoaringBitmap; +import org.roaringbitmap.buffer.MutableRoaringBitmap; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +public class Bitmap32ExactCountObjectStrategy implements ObjectStrategy<Bitmap32Counter> +{ + + static final Bitmap32ExactCountObjectStrategy STRATEGY = new Bitmap32ExactCountObjectStrategy(); + + @Override + public Class<? extends Bitmap32Counter> getClazz() + { + return RoaringBitmap32Counter.class; + } + + @Nullable + @Override + public Bitmap32Counter fromByteBuffer(ByteBuffer buffer, int numBytes) + { + if (numBytes == 0) { + return new RoaringBitmap32Counter(new MutableRoaringBitmap()); + } + buffer.limit(buffer.position() + numBytes); + return new RoaringBitmap32Counter(new ImmutableRoaringBitmap(buffer)); + } + + @Nullable + @Override + public byte[] toBytes(@Nullable Bitmap32Counter val) + { + if (val == null || val.getCardinality() == 0) { + return new byte[]{}; + } + return val.toBytes(); + } + + @Override + public int compare(Bitmap32Counter o1, Bitmap32Counter o2) + { + return Bitmap32ExactCountAggregatorFactory.COMPARATOR.compare(o1, o2); + } +} diff --git a/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap32/Bitmap32ExactCountPostAggregator.java b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap32/Bitmap32ExactCountPostAggregator.java new file mode 100644 index 00000000000..c726dc062db --- /dev/null +++ b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap32/Bitmap32ExactCountPostAggregator.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.exactcount.bitmap32; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.collect.Sets; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.PostAggregator; +import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator; +import org.apache.druid.query.aggregation.post.PostAggregatorIds; +import org.apache.druid.query.cache.CacheKeyBuilder; +import org.apache.druid.segment.ColumnInspector; +import org.apache.druid.segment.column.ColumnType; + +import javax.annotation.Nullable; +import java.util.Comparator; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +@JsonTypeName("bitmap32ExactCountCardinality") +public class Bitmap32ExactCountPostAggregator implements PostAggregator +{ + private final String name; + private final String fieldName; + + @JsonCreator + public Bitmap32ExactCountPostAggregator( + @JsonProperty("name") String name, + @JsonProperty("fieldName") String fieldName + ) + { + this.name = name; + this.fieldName = fieldName; + } + + @JsonProperty + public String getFieldName() + { + return fieldName; + } + + @Override + public Set<String> getDependentFields() + { + return Sets.newHashSet(fieldName); + } + + @Override + public Comparator getComparator() + { + return ArithmeticPostAggregator.DEFAULT_COMPARATOR; + } + + @Override + @JsonProperty + public Object compute(Map<String, Object> combinedAggregators) + { + Object value = combinedAggregators.get(fieldName); + return ((Bitmap32Counter) value).getCardinality(); + } + + @Override + @JsonProperty + public String getName() + { + return name; + } + + @Nullable + @Override + public ColumnType getType(ColumnInspector signature) + { + return ColumnType.LONG; + } + + @Override + public PostAggregator decorate(Map<String, AggregatorFactory> aggregators) + { + return this; + } + + @Override + public byte[] getCacheKey() + { + return new CacheKeyBuilder(PostAggregatorIds.BITMAP32_EXACT_COUNT_MERGE_TYPE_ID) + .appendString(fieldName) + .build(); + } + + @Override + public String toString() + { + return "Bitmap32ExactCountPostAggregator{" + + "name='" + name + '\'' + + ", field=" + fieldName + + '}'; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Bitmap32ExactCountPostAggregator that = (Bitmap32ExactCountPostAggregator) o; + return name.equals(that.name) && + fieldName.equals(that.fieldName); + } + + @Override + public int hashCode() + { + return Objects.hash(name, fieldName); + } +} diff --git a/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap32/RoaringBitmap32Counter.java b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap32/RoaringBitmap32Counter.java new file mode 100644 index 00000000000..c2da7722223 --- /dev/null +++ b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap32/RoaringBitmap32Counter.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.exactcount.bitmap32; + +import org.roaringbitmap.buffer.ImmutableRoaringBitmap; +import org.roaringbitmap.buffer.MutableRoaringBitmap; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +public class RoaringBitmap32Counter implements Bitmap32Counter +{ + + private boolean mutable = false; + private MutableRoaringBitmap mutableBitmap; + private final List<ImmutableRoaringBitmap> bitmaps = new ArrayList<>(); + + public RoaringBitmap32Counter(boolean mutable) + { + this.mutable = mutable; + if (mutable) { + this.mutableBitmap = new MutableRoaringBitmap(); + this.bitmaps.add(mutableBitmap); + } + } + + public RoaringBitmap32Counter(ImmutableRoaringBitmap bitmap) + { + this.bitmaps.add(bitmap); + } + + @Override + public void add(int value) + { + // to meet business requirement, we use the literal int value 0 to represent the null value, so here we will not count value 0 + if (value != 0) { + mutableBitmap.add(value); + } + } + + @Override + public long getCardinality() + { + if (mutable) { + return mutableBitmap.getCardinality(); + } + return ImmutableRoaringBitmap.or(bitmaps.iterator()).getCardinality(); + } + + @Override + public Bitmap32Counter fold(Bitmap32Counter rhs) + { + if (rhs != null) { + bitmaps.addAll(((RoaringBitmap32Counter) rhs).bitmaps); + } + return this; + } + + @Override + public byte[] toBytes() + { + if (mutable) { + // dump to disk + mutableBitmap.runOptimize(); + return toBytes(mutableBitmap); + } + MutableRoaringBitmap bitmap = ImmutableRoaringBitmap.or(bitmaps.iterator()); + bitmap.runOptimize(); + return toBytes(bitmap.toImmutableRoaringBitmap()); + } + + private static byte[] toBytes(ImmutableRoaringBitmap bitmap) + { + try { + ByteBuffer buffer = ByteBuffer.allocate(bitmap.serializedSizeInBytes()); + bitmap.serialize(buffer); + return buffer.array(); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + +} diff --git a/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap32/sql/Bitmap32ExactCountSqlAggregator.java b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap32/sql/Bitmap32ExactCountSqlAggregator.java new file mode 100644 index 00000000000..636e21ba24c --- /dev/null +++ b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap32/sql/Bitmap32ExactCountSqlAggregator.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.exactcount.bitmap32.sql; + +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlAggFunction; +import org.apache.calcite.sql.SqlFunctionCategory; +import org.apache.calcite.sql.type.InferTypes; +import org.apache.calcite.sql.type.SqlTypeFamily; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.exactcount.bitmap32.Bitmap32ExactCountBuildAggregatorFactory; +import org.apache.druid.query.aggregation.exactcount.bitmap32.Bitmap32ExactCountMergeAggregatorFactory; +import org.apache.druid.query.aggregation.post.FinalizingFieldAccessPostAggregator; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.column.ValueType; +import org.apache.druid.sql.calcite.aggregation.Aggregation; +import org.apache.druid.sql.calcite.aggregation.SqlAggregator; +import org.apache.druid.sql.calcite.expression.DruidExpression; +import org.apache.druid.sql.calcite.expression.Expressions; +import org.apache.druid.sql.calcite.expression.OperatorConversions; +import org.apache.druid.sql.calcite.planner.Calcites; +import org.apache.druid.sql.calcite.planner.PlannerContext; +import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry; + +import javax.annotation.Nullable; +import java.util.Collections; +import java.util.List; + +public class Bitmap32ExactCountSqlAggregator implements SqlAggregator +{ + private static final String NAME = "BITMAP32_EXACT_COUNT"; + private static final SqlAggFunction FUNCTION_INSTANCE = OperatorConversions.aggregatorBuilder(NAME) + .operandNames("column") + .operandTypes(SqlTypeFamily.ANY) + .operandTypeInference(InferTypes.VARCHAR_1024) + .requiredOperandCount(1) + .returnTypeNonNull(SqlTypeName.BIGINT) + .functionCategory(SqlFunctionCategory.NUMERIC) + .build(); + + @Override + public SqlAggFunction calciteFunction() + { + return FUNCTION_INSTANCE; + } + + @Nullable + @Override + public Aggregation toDruidAggregation( + PlannerContext plannerContext, + RowSignature rowSignature, + VirtualColumnRegistry virtualColumnRegistry, + RexBuilder rexBuilder, + String name, + AggregateCall aggregateCall, + Project project, + List<Aggregation> existingAggregations, + boolean finalizeAggregations + ) + { + // Don't use Aggregations.getArgumentsForSimpleAggregator, since it won't let us use direct column access + // for string columns. + final RexNode columnRexNode = Expressions.fromFieldAccess( + rexBuilder.getTypeFactory(), + rowSignature, + project, + aggregateCall.getArgList().get(0) + ); + + final DruidExpression columnArg = Expressions.toDruidExpression(plannerContext, rowSignature, columnRexNode); + if (columnArg == null) { + return null; + } + + final AggregatorFactory aggregatorFactory; + final String aggregatorName = finalizeAggregations ? Calcites.makePrefixedName(name, "a") : name; + + if (columnArg.isDirectColumnAccess() + && rowSignature.getColumnType(columnArg.getDirectColumn()) + .map(type -> type.is(ValueType.COMPLEX)) + .orElse(false)) { + aggregatorFactory = new Bitmap32ExactCountMergeAggregatorFactory( + aggregatorName, + columnArg.getDirectColumn() + ); + } else { + final RelDataType dataType = columnRexNode.getType(); + final ColumnType inputType = Calcites.getColumnTypeForRelDataType(dataType); + if (inputType == null) { + throw new ISE( + "Cannot translate sqlTypeName[%s] to Druid type for field[%s]", + dataType.getSqlTypeName(), + aggregatorName + ); + } + + final DimensionSpec dimensionSpec; + + if (columnArg.isDirectColumnAccess()) { + dimensionSpec = columnArg.getSimpleExtraction().toDimensionSpec(null, inputType); + } else { + String virtualColumnName = virtualColumnRegistry.getOrCreateVirtualColumnForExpression( + columnArg, + dataType + ); + dimensionSpec = new DefaultDimensionSpec(virtualColumnName, null, inputType); + } + + aggregatorFactory = new Bitmap32ExactCountBuildAggregatorFactory( + aggregatorName, + dimensionSpec.getDimension() + ); + } + + return toAggregation( + name, + finalizeAggregations, + aggregatorFactory + ); + } + + private Aggregation toAggregation( + String name, + boolean finalizeAggregations, + AggregatorFactory aggregatorFactory + ) + { + return Aggregation.create( + Collections.singletonList(aggregatorFactory), + finalizeAggregations ? new FinalizingFieldAccessPostAggregator( + name, + aggregatorFactory.getName() + ) : null + ); + } +} diff --git a/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap64/Bitmap64Counter.java b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap64/Bitmap64Counter.java new file mode 100644 index 00000000000..3403243298f --- /dev/null +++ b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap64/Bitmap64Counter.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.exactcount.bitmap64; + +import java.nio.ByteBuffer; + +public interface Bitmap64Counter +{ + void add(long value); + + long getCardinality(); + + Bitmap64Counter fold(Bitmap64Counter rhs); + + ByteBuffer toByteBuffer(); +} diff --git a/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap64/Bitmap64ExactCountAggregatorFactory.java b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap64/Bitmap64ExactCountAggregatorFactory.java new file mode 100644 index 00000000000..53e036cd6b9 --- /dev/null +++ b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap64/Bitmap64ExactCountAggregatorFactory.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.exactcount.bitmap64; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.query.aggregation.AggregateCombiner; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.ObjectAggregateCombiner; +import org.apache.druid.query.cache.CacheKeyBuilder; +import org.apache.druid.segment.ColumnValueSelector; + +import javax.annotation.Nullable; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Objects; + +/** + * Base class for both build and merge factories + */ +@SuppressWarnings("NullableProblems") +public abstract class Bitmap64ExactCountAggregatorFactory extends AggregatorFactory +{ + static final int MAX_INTERMEDIATE_SIZE = 5 * 1024 * 1024; // 5 MB + static final Comparator<Bitmap64Counter> COMPARATOR = + Comparator.nullsFirst(Comparator.comparingLong(Bitmap64Counter::getCardinality)); + + private final String name; + private final String fieldName; + + Bitmap64ExactCountAggregatorFactory( + final String name, + final String fieldName + ) + { + this.name = Objects.requireNonNull(name); + this.fieldName = Objects.requireNonNull(fieldName); + } + + @Override + @JsonProperty + public String getName() + { + return name; + } + + @JsonProperty + public String getFieldName() + { + return fieldName; + } + + @Override + public List<String> requiredFields() + { + return Collections.singletonList(fieldName); + } + + /** + * This is a convoluted way to return a list of input field names this aggregator needs. + * Currently the returned factories are only used to obtain a field name by calling getName() method. + */ + @Override + public List<AggregatorFactory> getRequiredColumns() + { + return Collections.singletonList( + new Bitmap64ExactCountBuildAggregatorFactory(fieldName, fieldName) + ); + } + + @Override + public Bitmap64Counter deserialize(final Object object) + { + return Bitmap64ExactCountMergeComplexMetricSerde.deserializeRoaringBitmap64Counter(object); + } + + @Override + public Bitmap64Counter combine(final Object objectA, final Object objectB) + { + if (objectB == null) { + return (Bitmap64Counter) objectA; + } + if (objectA == null) { + return (Bitmap64Counter) objectB; + } + ((Bitmap64Counter) objectA).fold((Bitmap64Counter) objectB); + return (Bitmap64Counter) objectA; + } + + @Override + public AggregateCombiner makeAggregateCombiner() + { + return new ObjectAggregateCombiner<Bitmap64Counter>() + { + private Bitmap64Counter union = new RoaringBitmap64Counter(); + + @Override + public void reset(final ColumnValueSelector selector) + { + union = new RoaringBitmap64Counter(); + fold(selector); + } + + @Override + public void fold(final ColumnValueSelector selector) + { + final Bitmap64Counter bitmap64Counter = (Bitmap64Counter) selector.getObject(); + union.fold(bitmap64Counter); + } + + @Nullable + @Override + public Bitmap64Counter getObject() + { + return union; + } + + @Override + public Class<Bitmap64Counter> classOfObject() + { + return Bitmap64Counter.class; + } + }; + } + + @Nullable + @Override + public Object finalizeComputation(@Nullable final Object object) + { + if (object == null) { + return null; + } + return ((Bitmap64Counter) object).getCardinality(); + } + + @Override + public Comparator<Bitmap64Counter> getComparator() + { + return COMPARATOR; + } + + @Override + public AggregatorFactory getCombiningFactory() + { + return new Bitmap64ExactCountMergeAggregatorFactory(getName(), getName()); + } + + @Override + public byte[] getCacheKey() + { + return new CacheKeyBuilder(getCacheTypeId()).appendString(name).appendString(fieldName).build(); + } + + @Override + public int getMaxIntermediateSize() + { + return MAX_INTERMEDIATE_SIZE; + } + + @Override + public boolean equals(final Object object) + { + if (this == object) { + return true; + } + if (object == null || !getClass().equals(object.getClass())) { + return false; + } + final Bitmap64ExactCountAggregatorFactory that = (Bitmap64ExactCountAggregatorFactory) object; + if (!name.equals(that.getName())) { + return false; + } + return fieldName.equals(that.getFieldName()); + } + + @Override + public int hashCode() + { + return Objects.hash(name, fieldName); + } + + @Override + public String toString() + { + return getClass().getSimpleName() + " {" + + " name=" + name + + ", fieldName=" + fieldName + + " }"; + } + + protected abstract byte getCacheTypeId(); + +} diff --git a/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap64/Bitmap64ExactCountBuildAggregator.java b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap64/Bitmap64ExactCountBuildAggregator.java new file mode 100644 index 00000000000..f286cb1287a --- /dev/null +++ b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap64/Bitmap64ExactCountBuildAggregator.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.exactcount.bitmap64; + +import org.apache.druid.query.aggregation.Aggregator; +import org.apache.druid.segment.BaseLongColumnValueSelector; + +import javax.annotation.Nullable; + +public class Bitmap64ExactCountBuildAggregator implements Aggregator +{ + private BaseLongColumnValueSelector selector; + private Bitmap64Counter bitmap; + + public Bitmap64ExactCountBuildAggregator(BaseLongColumnValueSelector selector) + { + this.selector = selector; + this.bitmap = new RoaringBitmap64Counter(); + } + + @Override + public void aggregate() + { + bitmap.add(selector.getLong()); + } + + @Nullable + @Override + public Object get() + { + return bitmap; + } + + @Override + public float getFloat() + { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public long getLong() + { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public void close() + { + bitmap = null; + } +} diff --git a/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap64/Bitmap64ExactCountBuildAggregatorFactory.java b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap64/Bitmap64ExactCountBuildAggregatorFactory.java new file mode 100644 index 00000000000..3f436f07167 --- /dev/null +++ b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap64/Bitmap64ExactCountBuildAggregatorFactory.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.exactcount.bitmap64; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.query.aggregation.Aggregator; +import org.apache.druid.query.aggregation.AggregatorUtil; +import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.column.ColumnType; + +@SuppressWarnings("NullableProblems") +public class Bitmap64ExactCountBuildAggregatorFactory extends Bitmap64ExactCountAggregatorFactory +{ + public static final ColumnType TYPE = ColumnType.ofComplex(Bitmap64ExactCountModule.BUILD_TYPE_NAME); + + @JsonCreator + public Bitmap64ExactCountBuildAggregatorFactory( + @JsonProperty("name") String name, + @JsonProperty("fieldName") String fieldName + ) + { + super(name, fieldName); + } + + @Override + protected byte getCacheTypeId() + { + return AggregatorUtil.BITMAP64_EXACT_COUNT_BUILD_CACHE_TYPE_ID; + } + + @Override + public Aggregator factorize(ColumnSelectorFactory metricFactory) + { + return new Bitmap64ExactCountBuildAggregator(metricFactory.makeColumnValueSelector(getFieldName())); + } + + @Override + public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) + { + return new Bitmap64ExactCountBuildBufferAggregator(metricFactory.makeColumnValueSelector(getFieldName())); + } + + @Override + public ColumnType getIntermediateType() + { + return TYPE; + } + + @Override + public ColumnType getResultType() + { + return ColumnType.LONG; + } +} diff --git a/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap64/Bitmap64ExactCountBuildBufferAggregator.java b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap64/Bitmap64ExactCountBuildBufferAggregator.java new file mode 100644 index 00000000000..24c6723c7a0 --- /dev/null +++ b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap64/Bitmap64ExactCountBuildBufferAggregator.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.exactcount.bitmap64; + +import it.unimi.dsi.fastutil.ints.Int2ObjectMap; +import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; +import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.segment.BaseLongColumnValueSelector; + +import java.nio.ByteBuffer; +import java.util.IdentityHashMap; + +public class Bitmap64ExactCountBuildBufferAggregator implements BufferAggregator +{ + private final BaseLongColumnValueSelector selector; + private final IdentityHashMap<ByteBuffer, Int2ObjectMap<Bitmap64Counter>> collectors = new IdentityHashMap<>(); + + public Bitmap64ExactCountBuildBufferAggregator(BaseLongColumnValueSelector selector) + { + this.selector = selector; + } + + @Override + public void init(ByteBuffer buf, int position) + { + createNewCollector(buf, position); + } + + @Override + public void aggregate(ByteBuffer buf, int position) + { + final int oldPosition = buf.position(); + try { + buf.position(position); + Bitmap64Counter bitmap64Counter = getOrCreateCollector(buf, position); + bitmap64Counter.add(selector.getLong()); + } + finally { + buf.position(oldPosition); + } + } + + @Override + public Object get(ByteBuffer buf, int position) + { + return getOrCreateCollector(buf, position); + } + + @Override + public long getLong(ByteBuffer buf, int position) + { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public double getDouble(ByteBuffer buf, int position) + { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public float getFloat(ByteBuffer buf, int position) + { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public void close() + { + + } + + @Override + public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer) + { + createNewCollector(newBuffer, newPosition); + Bitmap64Counter collector = collectors.get(oldBuffer).get(oldPosition); + putCollectors(newBuffer, newPosition, collector); + Int2ObjectMap<Bitmap64Counter> collectorMap = collectors.get(oldBuffer); + if (collectorMap != null) { + collectorMap.remove(oldPosition); + if (collectorMap.isEmpty()) { + collectors.remove(oldBuffer); + } + } + } + + private void putCollectors(final ByteBuffer buffer, final int position, final Bitmap64Counter collector) + { + Int2ObjectMap<Bitmap64Counter> map = collectors.computeIfAbsent(buffer, buf -> new Int2ObjectOpenHashMap<>()); + map.put(position, collector); + } + + private Bitmap64Counter getOrCreateCollector(ByteBuffer buf, int position) + { + Int2ObjectMap<Bitmap64Counter> collectMap = collectors.get(buf); + Bitmap64Counter bitmap64Counter = collectMap != null ? collectMap.get(position) : null; + if (bitmap64Counter != null) { + return bitmap64Counter; + } + + return createNewCollector(buf, position); + } + + private Bitmap64Counter createNewCollector(ByteBuffer buf, int position) + { + buf.position(position); + Bitmap64Counter bitmap64Counter = new RoaringBitmap64Counter(); + Int2ObjectMap<Bitmap64Counter> collectorMap = collectors.get(buf); + if (collectorMap == null) { + collectorMap = new Int2ObjectOpenHashMap<>(); + collectors.put(buf, collectorMap); + } + collectorMap.put(position, bitmap64Counter); + return bitmap64Counter; + } +} diff --git a/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap64/Bitmap64ExactCountBuildComplexMetricSerde.java b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap64/Bitmap64ExactCountBuildComplexMetricSerde.java new file mode 100644 index 00000000000..ed11da37af0 --- /dev/null +++ b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap64/Bitmap64ExactCountBuildComplexMetricSerde.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.exactcount.bitmap64; + +import org.apache.druid.data.input.InputRow; +import org.apache.druid.segment.serde.ComplexMetricExtractor; + +import javax.annotation.Nullable; + +public class Bitmap64ExactCountBuildComplexMetricSerde extends Bitmap64ExactCountMergeComplexMetricSerde +{ + private static final ComplexMetricExtractor EXTRACTOR = new ComplexMetricExtractor() + { + + @Override + public Class extractedClass() + { + return Object.class; + } + + @Nullable + @Override + public Object extractValue(InputRow inputRow, String metricName) + { + return inputRow.getRaw(metricName); + } + }; + + @Override + public ComplexMetricExtractor getExtractor() + { + return EXTRACTOR; + } + +} diff --git a/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap64/Bitmap64ExactCountMergeAggregator.java b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap64/Bitmap64ExactCountMergeAggregator.java new file mode 100644 index 00000000000..c5dabe4e8f6 --- /dev/null +++ b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap64/Bitmap64ExactCountMergeAggregator.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.exactcount.bitmap64; + +import org.apache.druid.query.aggregation.Aggregator; +import org.apache.druid.segment.ColumnValueSelector; + +import javax.annotation.Nullable; + +public class Bitmap64ExactCountMergeAggregator implements Aggregator +{ + private final ColumnValueSelector<Bitmap64Counter> selector; + private Bitmap64Counter bitmap; + + public Bitmap64ExactCountMergeAggregator(ColumnValueSelector<Bitmap64Counter> selector) + { + this.selector = selector; + this.bitmap = new RoaringBitmap64Counter(); + } + + @Override + public void aggregate() + { + bitmap.fold(selector.getObject()); + } + + @Nullable + @Override + public Object get() + { + return bitmap; + } + + @Override + public float getFloat() + { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public long getLong() + { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public void close() + { + bitmap = null; + } +} diff --git a/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap64/Bitmap64ExactCountMergeAggregatorFactory.java b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap64/Bitmap64ExactCountMergeAggregatorFactory.java new file mode 100644 index 00000000000..f17910d9813 --- /dev/null +++ b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap64/Bitmap64ExactCountMergeAggregatorFactory.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.exactcount.bitmap64; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.query.aggregation.Aggregator; +import org.apache.druid.query.aggregation.AggregatorUtil; +import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.column.ColumnType; + +@SuppressWarnings("NullableProblems") +public class Bitmap64ExactCountMergeAggregatorFactory extends Bitmap64ExactCountAggregatorFactory +{ + public static final ColumnType TYPE = ColumnType.ofComplex(Bitmap64ExactCountModule.MERGE_TYPE_NAME); + + @JsonCreator + public Bitmap64ExactCountMergeAggregatorFactory( + @JsonProperty("name") String name, + @JsonProperty("fieldName") String fieldName + ) + { + super(name, fieldName); + } + + @Override + protected byte getCacheTypeId() + { + return AggregatorUtil.BITMAP64_EXACT_COUNT_MERGE_CACHE_TYPE_ID; + } + + @Override + public Aggregator factorize(ColumnSelectorFactory metricFactory) + { + ColumnValueSelector<Bitmap64Counter> selector = metricFactory.makeColumnValueSelector(getFieldName()); + return new Bitmap64ExactCountMergeAggregator(selector); + } + + @Override + public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) + { + ColumnValueSelector<Bitmap64Counter> selector = metricFactory.makeColumnValueSelector(getFieldName()); + return new Bitmap64ExactCountMergeBufferAggregator(selector); + } + + @Override + public ColumnType getIntermediateType() + { + return TYPE; + } + + @Override + public ColumnType getResultType() + { + return ColumnType.LONG; + } +} diff --git a/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap64/Bitmap64ExactCountMergeBufferAggregator.java b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap64/Bitmap64ExactCountMergeBufferAggregator.java new file mode 100644 index 00000000000..3cc89c962c4 --- /dev/null +++ b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap64/Bitmap64ExactCountMergeBufferAggregator.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.exactcount.bitmap64; + +import it.unimi.dsi.fastutil.ints.Int2ObjectMap; +import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; +import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.segment.ColumnValueSelector; + +import java.nio.ByteBuffer; +import java.util.IdentityHashMap; + +public class Bitmap64ExactCountMergeBufferAggregator implements BufferAggregator +{ + private final ColumnValueSelector<Bitmap64Counter> selector; + private final IdentityHashMap<ByteBuffer, Int2ObjectMap<Bitmap64Counter>> counterCache = new IdentityHashMap<>(); + + public Bitmap64ExactCountMergeBufferAggregator(ColumnValueSelector<Bitmap64Counter> selector) + { + this.selector = selector; + } + + @Override + public void init(ByteBuffer buf, int position) + { + RoaringBitmap64Counter emptyCounter = new RoaringBitmap64Counter(); + addToCache(buf, position, emptyCounter); + } + + @Override + public void aggregate(ByteBuffer buf, int position) + { + Object x = selector.getObject(); + if (x == null) { + return; + } + Bitmap64Counter bitmap64Counter = counterCache.get(buf).get(position); + bitmap64Counter.fold((RoaringBitmap64Counter) x); + } + + @Override + public Object get(ByteBuffer buf, int position) + { + return counterCache.get(buf).get(position); + } + + @Override + public long getLong(ByteBuffer buf, int position) + { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public double getDouble(ByteBuffer buf, int position) + { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public float getFloat(ByteBuffer buf, int position) + { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public void close() + { + counterCache.clear(); + } + + @Override + public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer) + { + Bitmap64Counter counter = counterCache.get(oldBuffer).get(oldPosition); + addToCache(newBuffer, newPosition, counter); + Int2ObjectMap<Bitmap64Counter> counterMap = counterCache.get(oldBuffer); + if (counterMap != null) { + counterMap.remove(oldPosition); + if (counterMap.isEmpty()) { + counterCache.remove(oldBuffer); + } + } + } + + private void addToCache(final ByteBuffer buffer, final int position, final Bitmap64Counter counter) + { + Int2ObjectMap<Bitmap64Counter> map = counterCache.computeIfAbsent(buffer, buf -> new Int2ObjectOpenHashMap<>()); + map.put(position, counter); + } +} diff --git a/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap64/Bitmap64ExactCountMergeComplexMetricSerde.java b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap64/Bitmap64ExactCountMergeComplexMetricSerde.java new file mode 100644 index 00000000000..7a5ebda49d1 --- /dev/null +++ b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap64/Bitmap64ExactCountMergeComplexMetricSerde.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.exactcount.bitmap64; + +import org.apache.druid.data.input.InputRow; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.segment.GenericColumnSerializer; +import org.apache.druid.segment.column.ColumnBuilder; +import org.apache.druid.segment.data.GenericIndexed; +import org.apache.druid.segment.data.ObjectStrategy; +import org.apache.druid.segment.serde.ComplexColumnPartSupplier; +import org.apache.druid.segment.serde.ComplexMetricExtractor; +import org.apache.druid.segment.serde.ComplexMetricSerde; +import org.apache.druid.segment.serde.LargeColumnSupportedComplexColumnSerializer; +import org.apache.druid.segment.writeout.SegmentWriteOutMedium; + +import java.nio.ByteBuffer; + +public class Bitmap64ExactCountMergeComplexMetricSerde extends ComplexMetricSerde +{ + + static RoaringBitmap64Counter deserializeRoaringBitmap64Counter(final Object object) + { + if (object instanceof String) { + return RoaringBitmap64Counter.fromBytes(StringUtils.decodeBase64(StringUtils.toUtf8((String) object))); + } else if (object instanceof byte[]) { + return RoaringBitmap64Counter.fromBytes((byte[]) object); + } else if (object instanceof RoaringBitmap64Counter) { + return (RoaringBitmap64Counter) object; + } + throw new IAE("Object is not of a type that can be deserialized to an RoaringBitmap64Counter:" + object.getClass() + .getName()); + } + + @Override + public String getTypeName() + { + return Bitmap64ExactCountModule.TYPE_NAME; // must be common type name + } + + @Override + public ObjectStrategy getObjectStrategy() + { + return Bitmap64ExactCountObjectStrategy.STRATEGY; + } + + @Override + public ComplexMetricExtractor getExtractor() + { + return new ComplexMetricExtractor() + { + @Override + public Class<?> extractedClass() + { + return RoaringBitmap64Counter.class; + } + + @Override + public RoaringBitmap64Counter extractValue(final InputRow inputRow, final String metricName) + { + final Object object = inputRow.getRaw(metricName); + if (object == null) { + return null; + } + return deserializeRoaringBitmap64Counter(object); + } + }; + } + + @Override + public void deserializeColumn(final ByteBuffer buf, final ColumnBuilder columnBuilder) + { + columnBuilder.setComplexColumnSupplier( + new ComplexColumnPartSupplier( + getTypeName(), + GenericIndexed.read(buf, Bitmap64ExactCountObjectStrategy.STRATEGY, columnBuilder.getFileMapper()) + ) + ); + } + + // support large columns + @Override + public GenericColumnSerializer getSerializer(final SegmentWriteOutMedium segmentWriteOutMedium, final String column) + { + return LargeColumnSupportedComplexColumnSerializer.create(segmentWriteOutMedium, column, this.getObjectStrategy()); + } + +} diff --git a/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap64/Bitmap64ExactCountModule.java b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap64/Bitmap64ExactCountModule.java new file mode 100644 index 00000000000..695c0ec0cf3 --- /dev/null +++ b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap64/Bitmap64ExactCountModule.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.exactcount.bitmap64; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.common.annotations.VisibleForTesting; +import com.google.inject.Binder; +import org.apache.druid.initialization.DruidModule; +import org.apache.druid.query.aggregation.exactcount.bitmap64.sql.Bitmap64ExactCountSqlAggregator; +import org.apache.druid.segment.serde.ComplexMetrics; +import org.apache.druid.sql.guice.SqlBindings; + +import java.util.Collections; +import java.util.List; + +public class Bitmap64ExactCountModule implements DruidModule +{ + public static final String TYPE_NAME = "Bitmap64ExactCount"; // common type name to be associated with segment data + public static final String BUILD_TYPE_NAME = "Bitmap64ExactCountBuild"; + public static final String MERGE_TYPE_NAME = "Bitmap64ExactCountMerge"; + + @Override + public List<? extends Module> getJacksonModules() + { + return Collections.singletonList( + new SimpleModule("Bitmap64ExactCountModule") + .registerSubtypes( + new NamedType(Bitmap64ExactCountMergeAggregatorFactory.class, MERGE_TYPE_NAME), + new NamedType(Bitmap64ExactCountBuildAggregatorFactory.class, BUILD_TYPE_NAME), + new NamedType(Bitmap64ExactCountPostAggregator.class, "bitmap64ExactCountCardinality") + ) + .addSerializer( + RoaringBitmap64Counter.class, + new RoaringBitmap64CounterJsonSerializer() + ) + ); + } + + @Override + public void configure(Binder binder) + { + registerSerde(); + SqlBindings.addAggregator(binder, Bitmap64ExactCountSqlAggregator.class); + } + + @VisibleForTesting + public static void registerSerde() + { + ComplexMetrics.registerSerde(TYPE_NAME, new Bitmap64ExactCountMergeComplexMetricSerde()); + ComplexMetrics.registerSerde(BUILD_TYPE_NAME, new Bitmap64ExactCountBuildComplexMetricSerde()); + ComplexMetrics.registerSerde(MERGE_TYPE_NAME, new Bitmap64ExactCountMergeComplexMetricSerde()); + } +} diff --git a/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap64/Bitmap64ExactCountObjectStrategy.java b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap64/Bitmap64ExactCountObjectStrategy.java new file mode 100644 index 00000000000..44fbcf57ad3 --- /dev/null +++ b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap64/Bitmap64ExactCountObjectStrategy.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.exactcount.bitmap64; + +import org.apache.druid.segment.data.ObjectStrategy; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +public class Bitmap64ExactCountObjectStrategy implements ObjectStrategy<Bitmap64Counter> +{ + + static final Bitmap64ExactCountObjectStrategy STRATEGY = new Bitmap64ExactCountObjectStrategy(); + + @Override + public Class<? extends Bitmap64Counter> getClazz() + { + return RoaringBitmap64Counter.class; + } + + @Nullable + @Override + public Bitmap64Counter fromByteBuffer(ByteBuffer buffer, int numBytes) + { + final ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer(); + readOnlyBuffer.limit(readOnlyBuffer.position() + numBytes); + byte[] bytes = new byte[readOnlyBuffer.remaining()]; + readOnlyBuffer.get(bytes, 0, numBytes); + return RoaringBitmap64Counter.fromBytes(bytes); + } + + @Nullable + @Override + public byte[] toBytes(@Nullable Bitmap64Counter val) + { + if (val == null) { + return new byte[0]; + } + return val.toByteBuffer().array(); + } + + @Override + public int compare(Bitmap64Counter o1, Bitmap64Counter o2) + { + return Bitmap64ExactCountAggregatorFactory.COMPARATOR.compare(o1, o2); + } +} diff --git a/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap64/Bitmap64ExactCountPostAggregator.java b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap64/Bitmap64ExactCountPostAggregator.java new file mode 100644 index 00000000000..a83df309097 --- /dev/null +++ b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap64/Bitmap64ExactCountPostAggregator.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.exactcount.bitmap64; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.collect.Sets; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.PostAggregator; +import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator; +import org.apache.druid.query.aggregation.post.PostAggregatorIds; +import org.apache.druid.query.cache.CacheKeyBuilder; +import org.apache.druid.segment.ColumnInspector; +import org.apache.druid.segment.column.ColumnType; + +import javax.annotation.Nullable; +import java.util.Comparator; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +@JsonTypeName("bitmap64ExactCountCardinality") +public class Bitmap64ExactCountPostAggregator implements PostAggregator +{ + private final String name; + private final String fieldName; + + @JsonCreator + public Bitmap64ExactCountPostAggregator( + @JsonProperty("name") String name, + @JsonProperty("fieldName") String fieldName + ) + { + this.name = name; + this.fieldName = fieldName; + } + + @JsonProperty + public String getFieldName() + { + return fieldName; + } + + @Override + public Set<String> getDependentFields() + { + return Sets.newHashSet(fieldName); + } + + @Override + public Comparator getComparator() + { + return ArithmeticPostAggregator.DEFAULT_COMPARATOR; + } + + @Override + @JsonProperty + public Object compute(Map<String, Object> combinedAggregators) + { + Object value = combinedAggregators.get(fieldName); + return ((Bitmap64Counter) value).getCardinality(); + } + + @Override + @JsonProperty + public String getName() + { + return name; + } + + @Nullable + @Override + public ColumnType getType(ColumnInspector signature) + { + return ColumnType.LONG; + } + + @Override + public PostAggregator decorate(Map<String, AggregatorFactory> aggregators) + { + return this; + } + + @Override + public byte[] getCacheKey() + { + return new CacheKeyBuilder(PostAggregatorIds.BITMAP64_EXACT_COUNT_MERGE_TYPE_ID) + .appendString(fieldName) + .build(); + } + + @Override + public String toString() + { + return "Bitmap64ExactCountPostAggregator{" + + "name='" + name + '\'' + + ", field=" + fieldName + + '}'; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Bitmap64ExactCountPostAggregator that = (Bitmap64ExactCountPostAggregator) o; + return name.equals(that.name) && + fieldName.equals(that.fieldName); + } + + @Override + public int hashCode() + { + return Objects.hash(name, fieldName); + } +} diff --git a/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap64/RoaringBitmap64Counter.java b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap64/RoaringBitmap64Counter.java new file mode 100644 index 00000000000..2ab47541db7 --- /dev/null +++ b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap64/RoaringBitmap64Counter.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.exactcount.bitmap64; + +import com.google.common.base.Throwables; +import org.apache.druid.java.util.common.logger.Logger; +import org.roaringbitmap.longlong.Roaring64NavigableMap; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.nio.ByteBuffer; + +public class RoaringBitmap64Counter implements Bitmap64Counter +{ + private static Logger logger = new Logger(RoaringBitmap64Counter.class); + + private Roaring64NavigableMap bitmap; + + public RoaringBitmap64Counter() + { + this.bitmap = new Roaring64NavigableMap(); + } + + private RoaringBitmap64Counter(Roaring64NavigableMap bitmap) + { + this.bitmap = bitmap; + } + + public static RoaringBitmap64Counter fromBytes(byte[] bytes) + { + ByteArrayInputStream byteIn = new ByteArrayInputStream(bytes); + try { + DataInputStream in = new DataInputStream(byteIn); + Roaring64NavigableMap bitmap = new Roaring64NavigableMap(); + bitmap.deserialize(in); + return new RoaringBitmap64Counter(bitmap); + } + catch (Exception e) { + logger.info(e.getMessage(), e); + } + return null; + } + + @Override + public void add(long value) + { + bitmap.addLong(value); + } + + @Override + public long getCardinality() + { + return bitmap.getLongCardinality(); + } + + @Override + public Bitmap64Counter fold(Bitmap64Counter rhs) + { + // TODO: 7/9/20 is it correct? + bitmap.or(((RoaringBitmap64Counter) rhs).bitmap); + return this; + } + + @Override + public ByteBuffer toByteBuffer() + { + bitmap.runOptimize(); + try { + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + bitmap.serialize(new DataOutputStream(out)); + return ByteBuffer.wrap(out.toByteArray()); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + +} diff --git a/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap64/RoaringBitmap64CounterJsonSerializer.java b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap64/RoaringBitmap64CounterJsonSerializer.java new file mode 100644 index 00000000000..452890ba2c9 --- /dev/null +++ b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap64/RoaringBitmap64CounterJsonSerializer.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.exactcount.bitmap64; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializerProvider; + +import java.io.IOException; + +public class RoaringBitmap64CounterJsonSerializer extends JsonSerializer<RoaringBitmap64Counter> +{ + + @Override + public void serialize( + final RoaringBitmap64Counter bitmap64Counter, + final JsonGenerator jgen, + final SerializerProvider provider + ) + throws IOException + { + jgen.writeBinary(bitmap64Counter.toByteBuffer().array()); + } + +} diff --git a/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap64/sql/Bitmap64ExactCountSqlAggregator.java b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap64/sql/Bitmap64ExactCountSqlAggregator.java new file mode 100644 index 00000000000..dfa369f0eb5 --- /dev/null +++ b/extensions-contrib/druid-exactcount/src/main/java/org/apache/druid/query/aggregation/exactcount/bitmap64/sql/Bitmap64ExactCountSqlAggregator.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.exactcount.bitmap64.sql; + +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlAggFunction; +import org.apache.calcite.sql.SqlFunctionCategory; +import org.apache.calcite.sql.type.InferTypes; +import org.apache.calcite.sql.type.SqlTypeFamily; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.exactcount.bitmap64.Bitmap64ExactCountBuildAggregatorFactory; +import org.apache.druid.query.aggregation.exactcount.bitmap64.Bitmap64ExactCountMergeAggregatorFactory; +import org.apache.druid.query.aggregation.post.FinalizingFieldAccessPostAggregator; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.column.ValueType; +import org.apache.druid.sql.calcite.aggregation.Aggregation; +import org.apache.druid.sql.calcite.aggregation.SqlAggregator; +import org.apache.druid.sql.calcite.expression.DruidExpression; +import org.apache.druid.sql.calcite.expression.Expressions; +import org.apache.druid.sql.calcite.expression.OperatorConversions; +import org.apache.druid.sql.calcite.planner.Calcites; +import org.apache.druid.sql.calcite.planner.PlannerContext; +import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry; + +import javax.annotation.Nullable; +import java.util.Collections; +import java.util.List; + +public class Bitmap64ExactCountSqlAggregator implements SqlAggregator +{ + + private static final String NAME = "BITMAP64_EXACT_COUNT"; + private static final SqlAggFunction FUNCTION_INSTANCE = OperatorConversions.aggregatorBuilder(NAME) + .operandNames("column") + .operandTypes(SqlTypeFamily.ANY) + .operandTypeInference(InferTypes.VARCHAR_1024) + .requiredOperandCount(1) + .returnTypeNonNull(SqlTypeName.BIGINT) + .functionCategory(SqlFunctionCategory.NUMERIC) + .build(); + + @Override + public SqlAggFunction calciteFunction() + { + return FUNCTION_INSTANCE; + } + + @Nullable + @Override + public Aggregation toDruidAggregation( + PlannerContext plannerContext, + RowSignature rowSignature, + VirtualColumnRegistry virtualColumnRegistry, + RexBuilder rexBuilder, + String name, + AggregateCall aggregateCall, + Project project, + List<Aggregation> existingAggregations, + boolean finalizeAggregations + ) + { + // Don't use Aggregations.getArgumentsForSimpleAggregator, since it won't let us use direct column access + // for string columns. + final RexNode columnRexNode = Expressions.fromFieldAccess( + rexBuilder.getTypeFactory(), + rowSignature, + project, + aggregateCall.getArgList().get(0) + ); + + final DruidExpression columnArg = Expressions.toDruidExpression(plannerContext, rowSignature, columnRexNode); + if (columnArg == null) { + return null; + } + + final AggregatorFactory aggregatorFactory; + final String aggregatorName = finalizeAggregations ? Calcites.makePrefixedName(name, "a") : name; + + if (columnArg.isDirectColumnAccess() + && rowSignature.getColumnType(columnArg.getDirectColumn()) + .map(type -> type.is(ValueType.COMPLEX)) + .orElse(false)) { + aggregatorFactory = new Bitmap64ExactCountMergeAggregatorFactory( + aggregatorName, + columnArg.getDirectColumn() + ); + } else { + final RelDataType dataType = columnRexNode.getType(); + final ColumnType inputType = Calcites.getColumnTypeForRelDataType(dataType); + if (inputType == null) { + throw new ISE( + "Cannot translate sqlTypeName[%s] to Druid type for field[%s]", + dataType.getSqlTypeName(), + aggregatorName + ); + } + + final DimensionSpec dimensionSpec; + + if (columnArg.isDirectColumnAccess()) { + dimensionSpec = columnArg.getSimpleExtraction().toDimensionSpec(null, inputType); + } else { + String virtualColumnName = virtualColumnRegistry.getOrCreateVirtualColumnForExpression( + columnArg, + dataType + ); + dimensionSpec = new DefaultDimensionSpec(virtualColumnName, null, inputType); + } + + aggregatorFactory = new Bitmap64ExactCountBuildAggregatorFactory( + aggregatorName, + dimensionSpec.getDimension() + ); + } + + return toAggregation( + name, + finalizeAggregations, + aggregatorFactory + ); + } + + private Aggregation toAggregation( + String name, + boolean finalizeAggregations, + AggregatorFactory aggregatorFactory + ) + { + return Aggregation.create( + Collections.singletonList(aggregatorFactory), + finalizeAggregations ? new FinalizingFieldAccessPostAggregator( + name, + aggregatorFactory.getName() + ) : null + ); + } +} diff --git a/extensions-contrib/druid-exactcount/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule b/extensions-contrib/druid-exactcount/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule new file mode 100644 index 00000000000..8618cf4efb1 --- /dev/null +++ b/extensions-contrib/druid-exactcount/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.druid.query.aggregation.exactcount.bitmap64.Bitmap64ExactCountModule +org.apache.druid.query.aggregation.exactcount.bitmap32.Bitmap32ExactCountModule diff --git a/pom.xml b/pom.xml index f4a1bcea812..63d556bb325 100644 --- a/pom.xml +++ b/pom.xml @@ -209,6 +209,7 @@ <module>extensions-contrib/cloudfiles-extensions</module> <module>extensions-contrib/graphite-emitter</module> <module>extensions-contrib/distinctcount</module> + <module>extensions-contrib/druid-exactcount</module> <module>extensions-contrib/statsd-emitter</module> <module>extensions-contrib/time-min-max</module> <module>extensions-contrib/virtual-columns</module> diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java index c4c9a7875ef..9c265c7cebf 100755 --- a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java @@ -167,6 +167,15 @@ public class AggregatorUtil // DDSketch aggregator public static final byte DDSKETCH_CACHE_TYPE_ID = 0x50; + // Bitmap64 exact count aggregator + public static final byte BITMAP64_EXACT_COUNT_BUILD_CACHE_TYPE_ID = 0x60; + public static final byte BITMAP64_EXACT_COUNT_MERGE_CACHE_TYPE_ID = 0x61; + + // Bitmap32 exact count aggregator + public static final byte BITMAP32_EXACT_COUNT_BUILD_CACHE_TYPE_ID = 0x62; + public static final byte BITMAP32_EXACT_COUNT_MERGE_CACHE_TYPE_ID = 0x63; + + /** * Given a list of PostAggregators and the name of an output column, returns the minimal list of PostAggregators * required to compute the output column. diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/post/PostAggregatorIds.java b/processing/src/main/java/org/apache/druid/query/aggregation/post/PostAggregatorIds.java index 9d097b6e4f2..e904596a301 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/post/PostAggregatorIds.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/post/PostAggregatorIds.java @@ -70,4 +70,6 @@ public class PostAggregatorIds public static final byte SPECTATOR_HISTOGRAM_SKETCH_PERCENTILES_CACHE_TYPE_ID = 46; public static final byte DDSKETCH_QUANTILES_TYPE_ID = 51; public static final byte DDSKETCH_QUANTILE_TYPE_ID = 52; + public static final byte BITMAP64_EXACT_COUNT_MERGE_TYPE_ID = 53; + public static final byte BITMAP32_EXACT_COUNT_MERGE_TYPE_ID = 54; } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
