This is an automated email from the ASF dual-hosted git repository. kangkaisen pushed a commit to branch kylin-on-druid in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-druid by this push: new 4b6668a Add patch for precise-distinct-count,extend-column,decima measure 4b6668a is described below commit 4b6668af91c14a318b3e06a7ac8deef91a1b326b Author: kangkaisen <kangkai...@meituan.com> AuthorDate: Thu Dec 27 20:43:51 2018 +0800 Add patch for precise-distinct-count,extend-column,decima measure --- ...tinct-count-extend-column-decimal-measure.patch | 3960 ++++++++++++++++++++ 1 file changed, 3960 insertions(+) diff --git a/kod-precise-distinct-count-extend-column-decimal-measure.patch b/kod-precise-distinct-count-extend-column-decimal-measure.patch new file mode 100644 index 0000000..b7a0539 --- /dev/null +++ b/kod-precise-distinct-count-extend-column-decimal-measure.patch @@ -0,0 +1,3960 @@ +From 4e7daa032d5e6867666b65843e2804ffe1de2c67 Mon Sep 17 00:00:00 2001 +From: kangkaisen <kangkai...@meituan.com> +Date: Wed, 15 Nov 2017 19:30:54 +0800 +Subject: [PATCH] Druid support kylin precise distinct count measure Druid + support kylin extend-column measure Druid support decimal measure + +--- + .../bitmap/WrappedImmutableRoaringBitmap.java | 2 + + distribution/pom.xml | 12 + + extensions-contrib/decimal/pom.xml | 59 +++++ + .../decimal/DecimalAggregatorFactory.java | 114 ++++++++ + .../decimal/DecimalBufferAggregator.java | 99 +++++++ + .../decimal/DecimalDruidModule.java | 71 +++++ + .../decimal/DecimalMaxAggregator.java | 82 ++++++ + .../decimal/DecimalMaxAggregatorFactory.java | 160 +++++++++++ + .../decimal/DecimalMaxBufferAggregator.java | 56 ++++ + .../aggregation/decimal/DecimalMaxSerde.java | 29 ++ + .../decimal/DecimalMinAggregator.java | 83 ++++++ + .../decimal/DecimalMinAggregatorFactory.java | 149 +++++++++++ + .../decimal/DecimalMinBufferAggregator.java | 56 ++++ + .../aggregation/decimal/DecimalMinSerde.java | 29 ++ + .../aggregation/decimal/DecimalSerde.java | 135 ++++++++++ + .../decimal/DecimalSumAggregator.java | 80 ++++++ + .../decimal/DecimalSumAggregatorFactory.java | 143 ++++++++++ + .../decimal/DecimalSumBufferAggregator.java | 54 ++++ + .../aggregation/decimal/DecimalSumSerde.java | 29 ++ + .../io.druid.initialization.DruidModule | 1 + + .../DecimalGroupByQueryTest.java | 250 ++++++++++++++++++ + extensions-contrib/kylin-distinccount/pom.xml | 59 +++++ + .../DistinctCountAggregator.java | 73 +++++ + .../DistinctCountAggregatorFactory.java | 223 ++++++++++++++++ + .../DistinctCountBufferAggregator.java | 120 +++++++++ + .../DistinctCountDruidModule.java | 53 ++++ + .../DistinctCountSerde.java | 95 +++++++ + .../io.druid.initialization.DruidModule | 1 + + .../DistinctCountBufferAggregatorTest.java | 71 +++++ + .../DistinctCountGroupByQueryTest.java | 234 ++++++++++++++++ + extensions-contrib/kylin-extendcolumn/pom.xml | 59 +++++ + .../ExtendColumnAggregator.java | 83 ++++++ + .../ExtendColumnAggregatorFactory.java | 227 ++++++++++++++++ + .../ExtendColumnBufferAggregator.java | 117 ++++++++ + .../ExtendColumnDruidModule.java | 51 ++++ + .../kylin.extendcolumn/ExtendColumnSerde.java | 139 ++++++++++ + .../io.druid.initialization.DruidModule | 1 + + .../ExtendColumnGroupByQueryTest.java | 229 ++++++++++++++++ + pom.xml | 3 + + .../DruidDefaultSerializersModule.java | 18 ++ + .../io/druid/segment/IndexMergerTestBase.java | 2 +- + .../java/io/druid/segment/TestHelper.java | 9 +- + 42 files changed, 3558 insertions(+), 2 deletions(-) + create mode 100644 extensions-contrib/decimal/pom.xml + create mode 100644 extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalAggregatorFactory.java + create mode 100644 extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalBufferAggregator.java + create mode 100644 extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalDruidModule.java + create mode 100644 extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMaxAggregator.java + create mode 100644 extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMaxAggregatorFactory.java + create mode 100644 extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMaxBufferAggregator.java + create mode 100644 extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMaxSerde.java + create mode 100644 extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMinAggregator.java + create mode 100644 extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMinAggregatorFactory.java + create mode 100644 extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMinBufferAggregator.java + create mode 100644 extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMinSerde.java + create mode 100644 extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalSerde.java + create mode 100644 extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalSumAggregator.java + create mode 100644 extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalSumAggregatorFactory.java + create mode 100644 extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalSumBufferAggregator.java + create mode 100644 extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalSumSerde.java + create mode 100644 extensions-contrib/decimal/src/main/resources/META-INF/services/io.druid.initialization.DruidModule + create mode 100644 extensions-contrib/decimal/src/test/java/io.druid.query.aggregation.decimal/DecimalGroupByQueryTest.java + create mode 100644 extensions-contrib/kylin-distinccount/pom.xml + create mode 100644 extensions-contrib/kylin-distinccount/src/main/java/io/druid/query/aggregation/kylin.distinctcount/DistinctCountAggregator.java + create mode 100644 extensions-contrib/kylin-distinccount/src/main/java/io/druid/query/aggregation/kylin.distinctcount/DistinctCountAggregatorFactory.java + create mode 100644 extensions-contrib/kylin-distinccount/src/main/java/io/druid/query/aggregation/kylin.distinctcount/DistinctCountBufferAggregator.java + create mode 100644 extensions-contrib/kylin-distinccount/src/main/java/io/druid/query/aggregation/kylin.distinctcount/DistinctCountDruidModule.java + create mode 100644 extensions-contrib/kylin-distinccount/src/main/java/io/druid/query/aggregation/kylin.distinctcount/DistinctCountSerde.java + create mode 100644 extensions-contrib/kylin-distinccount/src/main/resources/META-INF/services/io.druid.initialization.DruidModule + create mode 100644 extensions-contrib/kylin-distinccount/src/test/java/io.druid.query.aggregation.kylin.distinctcount/DistinctCountBufferAggregatorTest.java + create mode 100644 extensions-contrib/kylin-distinccount/src/test/java/io.druid.query.aggregation.kylin.distinctcount/DistinctCountGroupByQueryTest.java + create mode 100644 extensions-contrib/kylin-extendcolumn/pom.xml + create mode 100644 extensions-contrib/kylin-extendcolumn/src/main/java/io/druid/query/aggregation/kylin.extendcolumn/ExtendColumnAggregator.java + create mode 100644 extensions-contrib/kylin-extendcolumn/src/main/java/io/druid/query/aggregation/kylin.extendcolumn/ExtendColumnAggregatorFactory.java + create mode 100644 extensions-contrib/kylin-extendcolumn/src/main/java/io/druid/query/aggregation/kylin.extendcolumn/ExtendColumnBufferAggregator.java + create mode 100644 extensions-contrib/kylin-extendcolumn/src/main/java/io/druid/query/aggregation/kylin.extendcolumn/ExtendColumnDruidModule.java + create mode 100644 extensions-contrib/kylin-extendcolumn/src/main/java/io/druid/query/aggregation/kylin.extendcolumn/ExtendColumnSerde.java + create mode 100644 extensions-contrib/kylin-extendcolumn/src/main/resources/META-INF/services/io.druid.initialization.DruidModule + create mode 100644 extensions-contrib/kylin-extendcolumn/src/test/java/io.druid.query.aggregation.kylin.extendcolumn/ExtendColumnGroupByQueryTest.java + +diff --git a/bytebuffer-collections/src/main/java/io/druid/collections/bitmap/WrappedImmutableRoaringBitmap.java b/bytebuffer-collections/src/main/java/io/druid/collections/bitmap/WrappedImmutableRoaringBitmap.java +index 26454d07c..853e3b581 100755 +--- a/bytebuffer-collections/src/main/java/io/druid/collections/bitmap/WrappedImmutableRoaringBitmap.java ++++ b/bytebuffer-collections/src/main/java/io/druid/collections/bitmap/WrappedImmutableRoaringBitmap.java +@@ -19,6 +19,7 @@ + + package io.druid.collections.bitmap; + ++import com.fasterxml.jackson.annotation.JsonValue; + import com.google.common.base.Throwables; + import org.roaringbitmap.IntIterator; + import org.roaringbitmap.buffer.ImmutableRoaringBitmap; +@@ -55,6 +56,7 @@ public class WrappedImmutableRoaringBitmap implements ImmutableBitmap + } + + @Override ++ @JsonValue + public byte[] toBytes() + { + try { +diff --git a/distribution/pom.xml b/distribution/pom.xml +index 751d1ded4..44192a66a 100644 +--- a/distribution/pom.xml ++++ b/distribution/pom.xml +@@ -111,6 +111,12 @@ + <argument>io.druid.extensions:druid-examples</argument> + <argument>-c</argument> + <argument>io.druid.extensions:simple-client-sslcontext</argument> ++ <argument>-c</argument> ++ <argument>io.druid.extensions.contrib:druid-kylin-distinctcount</argument> ++ <argument>-c</argument> ++ <argument>io.druid.extensions.contrib:druid-kylin-extendcolumn</argument> ++ <argument>-c</argument> ++ <argument>io.druid.extensions.contrib:druid-decimal</argument> + <argument>${druid.distribution.pulldeps.opts}</argument> + </arguments> + </configuration> +@@ -215,6 +221,12 @@ + <argument>-c</argument> + <argument>io.druid.extensions.contrib:druid-distinctcount</argument> + <argument>-c</argument> ++ <argument>io.druid.extensions.contrib:druid-kylin-distinctcount</argument> ++ <argument>-c</argument> ++ <argument>io.druid.extensions.contrib:druid-kylin-extendcolumn</argument> ++ <argument>-c</argument> ++ <argument>io.druid.extensions.contrib:druid-decimal</argument> ++ <argument>-c</argument> + <argument>io.druid.extensions.contrib:druid-rocketmq</argument> + <argument>-c</argument> + <argument>io.druid.extensions.contrib:druid-google-extensions</argument> +diff --git a/extensions-contrib/decimal/pom.xml b/extensions-contrib/decimal/pom.xml +new file mode 100644 +index 000000000..1f2740347 +--- /dev/null ++++ b/extensions-contrib/decimal/pom.xml +@@ -0,0 +1,59 @@ ++<?xml version="1.0" encoding="UTF-8"?> ++<!-- ++ ~ Licensed to Metamarkets Group Inc. (Metamarkets) under one ++ ~ or more contributor license agreements. See the NOTICE file ++ ~ distributed with this work for additional information ++ ~ regarding copyright ownership. Metamarkets licenses this file ++ ~ to you under the Apache License, Version 2.0 (the ++ ~ "License"); you may not use this file except in compliance ++ ~ with the License. You may obtain a copy of the License at ++ ~ ++ ~ http://www.apache.org/licenses/LICENSE-2.0 ++ ~ ++ ~ Unless required by applicable law or agreed to in writing, ++ ~ software distributed under the License is distributed on an ++ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ ~ KIND, either express or implied. See the License for the ++ ~ specific language governing permissions and limitations ++ ~ under the License. ++ --> ++ ++<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> ++ <modelVersion>4.0.0</modelVersion> ++ ++ <groupId>io.druid.extensions.contrib</groupId> ++ <artifactId>druid-decimal</artifactId> ++ <name>druid-decimal</name> ++ <description>druid-decimal</description> ++ ++ <parent> ++ <groupId>io.druid</groupId> ++ <artifactId>druid</artifactId> ++ <version>0.11.1-SNAPSHOT</version> ++ <relativePath>../../pom.xml</relativePath> ++ </parent> ++ ++ <dependencies> ++ <dependency> ++ <groupId>io.druid</groupId> ++ <artifactId>druid-processing</artifactId> ++ <version>${project.parent.version}</version> ++ <scope>provided</scope> ++ </dependency> ++ ++ <!-- Tests --> ++ <dependency> ++ <groupId>io.druid</groupId> ++ <artifactId>druid-processing</artifactId> ++ <version>${project.parent.version}</version> ++ <scope>test</scope> ++ <type>test-jar</type> ++ </dependency> ++ <dependency> ++ <groupId>junit</groupId> ++ <artifactId>junit</artifactId> ++ <scope>test</scope> ++ </dependency> ++ </dependencies> ++ ++</project> +diff --git a/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalAggregatorFactory.java b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalAggregatorFactory.java +new file mode 100644 +index 000000000..e09925b34 +--- /dev/null ++++ b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalAggregatorFactory.java +@@ -0,0 +1,114 @@ ++/* ++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. Metamarkets licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ */ ++ ++package io.druid.query.aggregation.decimal; ++ ++import com.fasterxml.jackson.annotation.JsonProperty; ++import com.google.common.base.Preconditions; ++import io.druid.query.aggregation.AggregatorFactory; ++ ++import java.math.BigDecimal; ++import java.util.Arrays; ++import java.util.Comparator; ++import java.util.List; ++ ++public abstract class DecimalAggregatorFactory extends AggregatorFactory ++{ ++ protected final String name; ++ protected final String fieldName; ++ protected final Integer precision; ++ ++ ++ public DecimalAggregatorFactory(String name, String fieldName, Integer precision) ++ { ++ Preconditions.checkNotNull(name); ++ Preconditions.checkNotNull(fieldName); ++ this.name = name; ++ this.fieldName = fieldName; ++ this.precision = precision; ++ } ++ ++ @Override ++ public Comparator getComparator() ++ { ++ return new Comparator() ++ { ++ @Override ++ public int compare(Object o1, Object o2) ++ { ++ return ((BigDecimal) o1).compareTo((BigDecimal) o2); ++ } ++ }; ++ } ++ ++ @Override ++ public Object deserialize(Object object) ++ { ++ if (object instanceof String) { ++ return new BigDecimal(((String) object)); ++ } ++ return object; ++ } ++ ++ @Override ++ public Object finalizeComputation(Object object) ++ { ++ return object; ++ } ++ ++ @JsonProperty ++ public Integer getPrecision() ++ { ++ return precision; ++ } ++ ++ @JsonProperty ++ public String getFieldName() ++ { ++ return fieldName; ++ } ++ ++ @Override ++ @JsonProperty ++ public String getName() ++ { ++ return name; ++ } ++ ++ @Override ++ public List<String> requiredFields() ++ { ++ return Arrays.asList(fieldName); ++ } ++ ++ ++ @Override ++ public int getMaxIntermediateSize() ++ { ++ return 6 + (precision + 1) / 2; ++ } ++ ++ @Override ++ public int hashCode() ++ { ++ int result = name.hashCode(); ++ result = 31 * result + fieldName.hashCode(); ++ return result; ++ } ++} +diff --git a/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalBufferAggregator.java b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalBufferAggregator.java +new file mode 100644 +index 000000000..ff8967ba4 +--- /dev/null ++++ b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalBufferAggregator.java +@@ -0,0 +1,99 @@ ++/* ++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. Metamarkets licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ */ ++ ++package io.druid.query.aggregation.decimal; ++ ++import io.druid.query.aggregation.BufferAggregator; ++import io.druid.segment.ObjectColumnSelector; ++ ++import java.math.BigDecimal; ++import java.math.BigInteger; ++import java.nio.ByteBuffer; ++ ++public abstract class DecimalBufferAggregator implements BufferAggregator ++{ ++ protected final ObjectColumnSelector selector; ++ ++ public DecimalBufferAggregator(ObjectColumnSelector selector) ++ { ++ this.selector = selector; ++ } ++ ++ protected BigDecimal readTFromBuffer(ByteBuffer buf, int position) ++ { ++ int oldPosition = buf.position(); ++ int oldLimit = buf.limit(); ++ ++ buf.position(position); ++ short size = buf.getShort(); ++ int scale = buf.getInt(); ++ buf.limit(position + size); ++ byte[] bytes = new byte[buf.remaining()]; ++ buf.get(bytes, 0, bytes.length); ++ ++ buf.limit(oldLimit); ++ buf.position(oldPosition); ++ ++ return new BigDecimal(new BigInteger(bytes), scale); ++ } ++ ++ protected void writeToBuffer(BigDecimal value, ByteBuffer buf, int position) ++ { ++ int oldPosition = buf.position(); ++ ++ byte[] sumBytes = value.unscaledValue().toByteArray(); ++ short sumSize = (short) (6 + sumBytes.length); ++ ++ buf.position(position); ++ buf.putShort(sumSize); ++ buf.putInt(value.scale()); ++ buf.put(sumBytes); ++ ++ buf.position(oldPosition); ++ } ++ ++ @Override ++ public Object get(ByteBuffer buf, int position) ++ { ++ return readTFromBuffer(buf, position); ++ } ++ ++ @Override ++ public float getFloat(ByteBuffer buf, int position) ++ { ++ throw new UnsupportedOperationException("DecimalBufferAggregator does not support getFloat()"); ++ } ++ ++ @Override ++ public long getLong(ByteBuffer buf, int position) ++ { ++ throw new UnsupportedOperationException("DecimalBufferAggregator does not support getLong()"); ++ } ++ ++ @Override ++ public double getDouble(ByteBuffer buf, int position) ++ { ++ throw new UnsupportedOperationException("DecimalBufferAggregator does not support getDouble()"); ++ } ++ ++ @Override ++ public void close() ++ { ++ } ++} +diff --git a/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalDruidModule.java b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalDruidModule.java +new file mode 100644 +index 000000000..ac7f69549 +--- /dev/null ++++ b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalDruidModule.java +@@ -0,0 +1,71 @@ ++/* ++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. Metamarkets licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ */ ++ ++package io.druid.query.aggregation.decimal; ++ ++import com.fasterxml.jackson.databind.Module; ++import com.fasterxml.jackson.databind.jsontype.NamedType; ++import com.fasterxml.jackson.databind.module.SimpleModule; ++import com.google.common.collect.ImmutableList; ++import com.google.inject.Binder; ++import io.druid.initialization.DruidModule; ++import io.druid.segment.serde.ComplexMetrics; ++ ++import java.util.List; ++ ++public class DecimalDruidModule implements DruidModule ++{ ++ ++ public static final String DECIMALSUM = "decimalSum"; ++ public static final String DECIMALMAX = "decimalMax"; ++ public static final String DECIMALMIN = "decimalMin"; ++ ++ @Override ++ public List<? extends Module> getJacksonModules() ++ { ++ return ImmutableList.of( ++ new SimpleModule("DecimalModule").registerSubtypes( ++ new NamedType(DecimalSumAggregatorFactory.class, DECIMALSUM) ++ ), ++ new SimpleModule("DecimalModule").registerSubtypes( ++ new NamedType(DecimalMaxAggregatorFactory.class, DECIMALMAX) ++ ), ++ new SimpleModule("DecimalModule").registerSubtypes( ++ new NamedType(DecimalMinAggregatorFactory.class, DECIMALMIN) ++ ) ++ ); ++ } ++ ++ @Override ++ public void configure(Binder binder) ++ { ++ if (ComplexMetrics.getSerdeForType(DECIMALSUM) == null) { ++ ComplexMetrics.registerSerde(DECIMALSUM, new DecimalSumSerde()); ++ } ++ ++ if (ComplexMetrics.getSerdeForType(DECIMALMIN) == null) { ++ ComplexMetrics.registerSerde(DECIMALMIN, new DecimalMinSerde()); ++ } ++ ++ if (ComplexMetrics.getSerdeForType(DECIMALMAX) == null) { ++ ComplexMetrics.registerSerde(DECIMALMAX, new DecimalMaxSerde()); ++ } ++ ++ } ++} +diff --git a/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMaxAggregator.java b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMaxAggregator.java +new file mode 100644 +index 000000000..7dca0ae56 +--- /dev/null ++++ b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMaxAggregator.java +@@ -0,0 +1,82 @@ ++/* ++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. Metamarkets licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ */ ++ ++package io.druid.query.aggregation.decimal; ++ ++import io.druid.query.aggregation.Aggregator; ++import io.druid.segment.ObjectColumnSelector; ++ ++import java.math.BigDecimal; ++ ++public class DecimalMaxAggregator implements Aggregator ++{ ++ private final ObjectColumnSelector selector; ++ private BigDecimal max = null; ++ ++ public DecimalMaxAggregator(ObjectColumnSelector selector) ++ { ++ this.selector = selector; ++ } ++ ++ @Override ++ public void aggregate() ++ { ++ BigDecimal value = (BigDecimal) selector.getObject(); ++ if (max == null) { ++ max = value; ++ } else if (max.compareTo(value) < 0) { ++ max = value; ++ } ++ } ++ ++ @Override ++ public void reset() ++ { ++ max = null; ++ } ++ ++ @Override ++ public Object get() ++ { ++ return max; ++ } ++ ++ @Override ++ public void close() ++ { ++ } ++ ++ @Override ++ public float getFloat() ++ { ++ throw new UnsupportedOperationException("DecimalMaxAggregator does not support getFloat()"); ++ } ++ ++ @Override ++ public long getLong() ++ { ++ throw new UnsupportedOperationException("DecimalMaxAggregator does not support getLong()"); ++ } ++ ++ @Override ++ public double getDouble() ++ { ++ throw new UnsupportedOperationException("DecimalMaxAggregator does not support getDouble()"); ++ } ++} +diff --git a/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMaxAggregatorFactory.java b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMaxAggregatorFactory.java +new file mode 100644 +index 000000000..91c826592 +--- /dev/null ++++ b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMaxAggregatorFactory.java +@@ -0,0 +1,160 @@ ++/* ++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. Metamarkets licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ */ ++ ++package io.druid.query.aggregation.decimal; ++ ++import com.fasterxml.jackson.annotation.JsonCreator; ++import com.fasterxml.jackson.annotation.JsonProperty; ++import io.druid.java.util.common.IAE; ++import io.druid.java.util.common.StringUtils; ++import io.druid.query.aggregation.Aggregator; ++import io.druid.query.aggregation.AggregatorFactory; ++import io.druid.query.aggregation.AggregatorUtil; ++import io.druid.query.aggregation.BufferAggregator; ++import io.druid.segment.ColumnSelectorFactory; ++import io.druid.segment.ObjectColumnSelector; ++ ++import java.math.BigDecimal; ++import java.nio.ByteBuffer; ++import java.util.Arrays; ++import java.util.Comparator; ++import java.util.List; ++ ++public class DecimalMaxAggregatorFactory extends DecimalAggregatorFactory ++{ ++ private static final byte CACHE_TYPE_ID = 28; ++ ++ @JsonCreator ++ public DecimalMaxAggregatorFactory( ++ @JsonProperty("name") String name, ++ @JsonProperty("fieldName") String fieldName, ++ @JsonProperty("precision") Integer precision ++ ) ++ { ++ super(name, fieldName, precision); ++ } ++ ++ @Override ++ public Aggregator factorize(ColumnSelectorFactory columnFactory) ++ { ++ ObjectColumnSelector selector = columnFactory.makeObjectColumnSelector(fieldName); ++ if (selector == null) { ++ throw new IAE("selector in ExtendColumnAggregatorFactory should not be Null"); ++ } else { ++ return new DecimalMaxAggregator(selector); ++ } ++ } ++ ++ @Override ++ public BufferAggregator factorizeBuffered(ColumnSelectorFactory columnFactory) ++ { ++ ObjectColumnSelector selector = columnFactory.makeObjectColumnSelector(fieldName); ++ ++ final Class classOfObject = selector.classOfObject(); ++ if (!classOfObject.equals(Object.class) && !BigDecimal.class.isAssignableFrom(classOfObject)) { ++ throw new IAE("Incompatible type for metric[%s], expected a ExtendByteArray, got a %s", fieldName, classOfObject); ++ } ++ ++ return new DecimalMaxBufferAggregator(selector); ++ } ++ ++ @Override ++ public Comparator getComparator() ++ { ++ return new Comparator() ++ { ++ @Override ++ public int compare(Object o1, Object o2) ++ { ++ return ((BigDecimal) o1).compareTo((BigDecimal) o2); ++ } ++ }; ++ } ++ ++ @Override ++ public Object combine(Object o1, Object o2) ++ { ++ BigDecimal left = (BigDecimal) o1; ++ BigDecimal right = (BigDecimal) o2; ++ ++ if (left.compareTo(right) > 0) { ++ return left; ++ } else { ++ return right; ++ } ++ } ++ ++ @Override ++ public AggregatorFactory getCombiningFactory() ++ { ++ return new DecimalMaxAggregatorFactory(name, name, precision); ++ } ++ ++ @Override ++ public List<AggregatorFactory> getRequiredColumns() ++ { ++ return Arrays.<AggregatorFactory>asList(new DecimalMaxAggregatorFactory(fieldName, fieldName, precision)); ++ } ++ ++ @Override ++ public byte[] getCacheKey() ++ { ++ byte[] fieldNameBytes = StringUtils.toUtf8(fieldName); ++ return ByteBuffer.allocate(2 + fieldNameBytes.length) ++ .put(CACHE_TYPE_ID) ++ .put(fieldNameBytes) ++ .put(AggregatorUtil.STRING_SEPARATOR) ++ .array(); ++ } ++ ++ @Override ++ public String getTypeName() ++ { ++ return DecimalDruidModule.DECIMALMAX; ++ } ++ ++ @Override ++ public boolean equals(Object o) ++ { ++ if (this == o) { ++ return true; ++ } ++ if (o == null || getClass() != o.getClass()) { ++ return false; ++ } ++ ++ DecimalMaxAggregatorFactory that = (DecimalMaxAggregatorFactory) o; ++ ++ if (!fieldName.equals(that.fieldName)) { ++ return false; ++ } ++ ++ if (!name.equals(that.name)) { ++ return false; ++ } ++ ++ return true; ++ } ++ ++ @Override ++ public String toString() ++ { ++ return "DecimalMaxAggregatorFactory {" + "name='" + name + '\'' + ", fieldName='" + fieldName + '\'' + '}'; ++ } ++} +diff --git a/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMaxBufferAggregator.java b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMaxBufferAggregator.java +new file mode 100644 +index 000000000..d2a933c39 +--- /dev/null ++++ b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMaxBufferAggregator.java +@@ -0,0 +1,56 @@ ++/* ++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. Metamarkets licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ */ ++ ++package io.druid.query.aggregation.decimal; ++ ++import io.druid.segment.ObjectColumnSelector; ++ ++import java.math.BigDecimal; ++import java.nio.ByteBuffer; ++ ++public class DecimalMaxBufferAggregator extends DecimalBufferAggregator ++{ ++ ++ public DecimalMaxBufferAggregator(ObjectColumnSelector selector) ++ { ++ super(selector); ++ } ++ ++ @Override ++ public void init(ByteBuffer buf, int position) ++ { ++ BigDecimal init = new BigDecimal(Long.MIN_VALUE); ++ ++ writeToBuffer(init, buf, position); ++ } ++ ++ @Override ++ public void aggregate(ByteBuffer buf, int position) ++ { ++ BigDecimal value = (BigDecimal) selector.getObject(); ++ ++ BigDecimal max = readTFromBuffer(buf, position); ++ ++ if (max.compareTo(value) < 0) { ++ max = value; ++ } ++ ++ writeToBuffer(max, buf, position); ++ } ++} +diff --git a/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMaxSerde.java b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMaxSerde.java +new file mode 100644 +index 000000000..eccaec0ba +--- /dev/null ++++ b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMaxSerde.java +@@ -0,0 +1,29 @@ ++/* ++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. Metamarkets licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ */ ++ ++package io.druid.query.aggregation.decimal; ++ ++public class DecimalMaxSerde extends DecimalSerde ++{ ++ @Override ++ public String getTypeName() ++ { ++ return DecimalDruidModule.DECIMALMAX; ++ } ++} +diff --git a/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMinAggregator.java b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMinAggregator.java +new file mode 100644 +index 000000000..f69b6e1e3 +--- /dev/null ++++ b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMinAggregator.java +@@ -0,0 +1,83 @@ ++/* ++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. Metamarkets licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ */ ++ ++package io.druid.query.aggregation.decimal; ++ ++import io.druid.query.aggregation.Aggregator; ++import io.druid.segment.ObjectColumnSelector; ++ ++import java.math.BigDecimal; ++ ++public class DecimalMinAggregator implements Aggregator ++{ ++ private final ObjectColumnSelector selector; ++ private BigDecimal min = null; ++ ++ public DecimalMinAggregator(ObjectColumnSelector selector) ++ { ++ this.selector = selector; ++ } ++ ++ @Override ++ public void aggregate() ++ { ++ BigDecimal value = (BigDecimal) selector.getObject(); ++ ++ if (min == null) { ++ min = value; ++ } else if (min.compareTo(value) > 0) { ++ min = value; ++ } ++ } ++ ++ @Override ++ public void reset() ++ { ++ min = null; ++ } ++ ++ @Override ++ public Object get() ++ { ++ return min; ++ } ++ ++ @Override ++ public void close() ++ { ++ } ++ ++ @Override ++ public float getFloat() ++ { ++ throw new UnsupportedOperationException("ExtendColumnAggregator does not support getFloat()"); ++ } ++ ++ @Override ++ public long getLong() ++ { ++ throw new UnsupportedOperationException("ExtendColumnAggregator does not support getLong()"); ++ } ++ ++ @Override ++ public double getDouble() ++ { ++ throw new UnsupportedOperationException("ExtendColumnAggregator does not support getDouble()"); ++ } ++} +diff --git a/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMinAggregatorFactory.java b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMinAggregatorFactory.java +new file mode 100644 +index 000000000..9612033f4 +--- /dev/null ++++ b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMinAggregatorFactory.java +@@ -0,0 +1,149 @@ ++/* ++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. Metamarkets licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ */ ++ ++package io.druid.query.aggregation.decimal; ++ ++import com.fasterxml.jackson.annotation.JsonCreator; ++import com.fasterxml.jackson.annotation.JsonProperty; ++import io.druid.java.util.common.IAE; ++import io.druid.java.util.common.StringUtils; ++import io.druid.query.aggregation.Aggregator; ++import io.druid.query.aggregation.AggregatorFactory; ++import io.druid.query.aggregation.AggregatorUtil; ++import io.druid.query.aggregation.BufferAggregator; ++import io.druid.segment.ColumnSelectorFactory; ++import io.druid.segment.ObjectColumnSelector; ++ ++import java.math.BigDecimal; ++import java.nio.ByteBuffer; ++import java.util.Arrays; ++import java.util.List; ++ ++/** ++ * Created by kangkaisen on 2017/11/28. ++ */ ++public class DecimalMinAggregatorFactory extends DecimalAggregatorFactory ++{ ++ private static final byte CACHE_TYPE_ID = 29; ++ ++ @JsonCreator ++ public DecimalMinAggregatorFactory( ++ @JsonProperty("name") String name, ++ @JsonProperty("fieldName") String fieldName, ++ @JsonProperty("precision") Integer precision ++ ) ++ { ++ super(name, fieldName, precision); ++ } ++ ++ @Override ++ public Aggregator factorize(ColumnSelectorFactory columnFactory) ++ { ++ ObjectColumnSelector selector = columnFactory.makeObjectColumnSelector(fieldName); ++ if (selector == null) { ++ throw new IAE("selector in ExtendColumnAggregatorFactory should not be Null"); ++ } else { ++ return new DecimalMinAggregator(selector); ++ } ++ } ++ ++ @Override ++ public BufferAggregator factorizeBuffered(ColumnSelectorFactory columnFactory) ++ { ++ ObjectColumnSelector selector = columnFactory.makeObjectColumnSelector(fieldName); ++ ++ final Class classOfObject = selector.classOfObject(); ++ if (!classOfObject.equals(Object.class) && !BigDecimal.class.isAssignableFrom(classOfObject)) { ++ throw new IAE("Incompatible type for metric[%s], expected a ExtendByteArray, got a %s", fieldName, classOfObject); ++ } ++ ++ return new DecimalMinBufferAggregator(selector); ++ } ++ ++ @Override ++ public Object combine(Object o1, Object o2) ++ { ++ BigDecimal left = (BigDecimal) o1; ++ BigDecimal right = (BigDecimal) o2; ++ ++ if (left.compareTo(right) < 0) { ++ return left; ++ } else { ++ return right; ++ } ++ } ++ ++ @Override ++ public AggregatorFactory getCombiningFactory() ++ { ++ return new DecimalMinAggregatorFactory(name, name, precision); ++ } ++ ++ @Override ++ public List<AggregatorFactory> getRequiredColumns() ++ { ++ return Arrays.<AggregatorFactory>asList(new DecimalMinAggregatorFactory(fieldName, fieldName, precision)); ++ } ++ ++ @Override ++ public byte[] getCacheKey() ++ { ++ byte[] fieldNameBytes = StringUtils.toUtf8(fieldName); ++ return ByteBuffer.allocate(2 + fieldNameBytes.length) ++ .put(CACHE_TYPE_ID) ++ .put(fieldNameBytes) ++ .put(AggregatorUtil.STRING_SEPARATOR) ++ .array(); ++ } ++ ++ @Override ++ public String getTypeName() ++ { ++ return DecimalDruidModule.DECIMALMIN; ++ } ++ ++ @Override ++ public boolean equals(Object o) ++ { ++ if (this == o) { ++ return true; ++ } ++ if (o == null || getClass() != o.getClass()) { ++ return false; ++ } ++ ++ DecimalMinAggregatorFactory that = (DecimalMinAggregatorFactory) o; ++ ++ if (!fieldName.equals(that.fieldName)) { ++ return false; ++ } ++ ++ if (!name.equals(that.name)) { ++ return false; ++ } ++ ++ return true; ++ } ++ ++ @Override ++ public String toString() ++ { ++ return "DecimalMinAggregatorFactory {" + "name='" + name + '\'' + ", fieldName='" + fieldName + '\'' + '}'; ++ } ++} +diff --git a/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMinBufferAggregator.java b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMinBufferAggregator.java +new file mode 100644 +index 000000000..db12f46eb +--- /dev/null ++++ b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMinBufferAggregator.java +@@ -0,0 +1,56 @@ ++/* ++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. Metamarkets licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ */ ++ ++package io.druid.query.aggregation.decimal; ++ ++import io.druid.segment.ObjectColumnSelector; ++ ++import java.math.BigDecimal; ++import java.nio.ByteBuffer; ++ ++public class DecimalMinBufferAggregator extends DecimalBufferAggregator ++{ ++ ++ public DecimalMinBufferAggregator(ObjectColumnSelector selector) ++ { ++ super(selector); ++ } ++ ++ @Override ++ public void init(ByteBuffer buf, int position) ++ { ++ BigDecimal init = new BigDecimal(Long.MAX_VALUE); ++ ++ writeToBuffer(init, buf, position); ++ } ++ ++ @Override ++ public void aggregate(ByteBuffer buf, int position) ++ { ++ BigDecimal value = (BigDecimal) selector.getObject(); ++ ++ BigDecimal min = readTFromBuffer(buf, position); ++ ++ if (min.compareTo(value) > 0) { ++ min = value; ++ } ++ ++ writeToBuffer(min, buf, position); ++ } ++} +diff --git a/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMinSerde.java b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMinSerde.java +new file mode 100644 +index 000000000..b09146eb1 +--- /dev/null ++++ b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMinSerde.java +@@ -0,0 +1,29 @@ ++/* ++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. Metamarkets licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ */ ++ ++package io.druid.query.aggregation.decimal; ++ ++public class DecimalMinSerde extends DecimalSerde ++{ ++ @Override ++ public String getTypeName() ++ { ++ return DecimalDruidModule.DECIMALMIN; ++ } ++} +diff --git a/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalSerde.java b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalSerde.java +new file mode 100644 +index 000000000..ea674d0f4 +--- /dev/null ++++ b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalSerde.java +@@ -0,0 +1,135 @@ ++/* ++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. Metamarkets licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ */ ++ ++package io.druid.query.aggregation.decimal; ++ ++import com.google.common.primitives.Ints; ++import io.druid.data.input.InputRow; ++import io.druid.java.util.common.IAE; ++import io.druid.segment.GenericColumnSerializer; ++import io.druid.segment.column.ColumnBuilder; ++import io.druid.segment.data.GenericIndexed; ++import io.druid.segment.data.IOPeon; ++import io.druid.segment.data.ObjectStrategy; ++import io.druid.segment.serde.ComplexColumnPartSupplier; ++import io.druid.segment.serde.ComplexMetricExtractor; ++import io.druid.segment.serde.ComplexMetricSerde; ++import io.druid.segment.serde.LargeColumnSupportedComplexColumnSerializer; ++import org.apache.commons.lang.ArrayUtils; ++ ++import java.math.BigDecimal; ++import java.math.BigInteger; ++import java.nio.ByteBuffer; ++ ++public abstract class DecimalSerde extends ComplexMetricSerde ++{ ++ ++ public DecimalSerde() ++ { ++ } ++ ++ @Override ++ public ComplexMetricExtractor getExtractor() ++ { ++ return new ComplexMetricExtractor() ++ { ++ @Override ++ public Class<BigDecimal> extractedClass() ++ { ++ return BigDecimal.class; ++ } ++ ++ @Override ++ public BigDecimal extractValue(InputRow inputRow, String metricName) ++ { ++ Object rawValue = inputRow.getRaw(metricName); ++ ++ if (BigDecimal.class.isAssignableFrom(rawValue.getClass())) { ++ return (BigDecimal) rawValue; ++ } else { ++ throw new IAE("The class must be ExtendByteArray"); ++ } ++ } ++ }; ++ } ++ ++ @Override ++ public void deserializeColumn(ByteBuffer byteBuffer, ColumnBuilder columnBuilder) ++ { ++ final GenericIndexed column = GenericIndexed.read(byteBuffer, getObjectStrategy()); ++ columnBuilder.setComplexColumn(new ComplexColumnPartSupplier(getTypeName(), column)); ++ } ++ ++ @Override ++ public ObjectStrategy getObjectStrategy() ++ { ++ return new ObjectStrategy() ++ { ++ @Override ++ public Class getClazz() ++ { ++ return BigDecimal.class; ++ } ++ ++ @Override ++ public Object fromByteBuffer(ByteBuffer buffer, int numBytes) ++ { ++ final ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer(); ++ readOnlyBuffer.limit(readOnlyBuffer.position() + numBytes); ++ ++ int scale = readOnlyBuffer.getInt(); ++ byte[] bytes = new byte[readOnlyBuffer.remaining()]; ++ readOnlyBuffer.get(bytes, 0, bytes.length); ++ ++ return new BigDecimal(new BigInteger(bytes), scale); ++ } ++ ++ @Override ++ public byte[] toBytes(Object val) ++ { ++ if (val == null) { ++ return new byte[]{}; ++ } ++ ++ if (val instanceof BigDecimal) { ++ BigDecimal value = (BigDecimal) val; ++ ++ byte[] scaleBytes = Ints.toByteArray(value.scale()); ++ byte[] bytes = value.unscaledValue().toByteArray(); ++ ++ return ArrayUtils.addAll(scaleBytes, bytes); ++ } else { ++ throw new IAE("Unknown class[%s], toString[%s]", val.getClass(), val); ++ } ++ } ++ ++ @Override ++ public int compare(Object o1, Object o2) ++ { ++ return ((BigDecimal) o1).compareTo((BigDecimal) o2); ++ } ++ }; ++ } ++ ++ @Override ++ public GenericColumnSerializer getSerializer(IOPeon peon, String column) ++ { ++ return LargeColumnSupportedComplexColumnSerializer.create(peon, column, this.getObjectStrategy()); ++ } ++} +diff --git a/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalSumAggregator.java b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalSumAggregator.java +new file mode 100644 +index 000000000..f381b47dc +--- /dev/null ++++ b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalSumAggregator.java +@@ -0,0 +1,80 @@ ++/* ++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. Metamarkets licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ */ ++ ++package io.druid.query.aggregation.decimal; ++ ++import io.druid.query.aggregation.Aggregator; ++import io.druid.segment.ObjectColumnSelector; ++ ++import java.math.BigDecimal; ++ ++public class DecimalSumAggregator implements Aggregator ++{ ++ ++ private final ObjectColumnSelector selector; ++ private BigDecimal sum = new BigDecimal(0); ++ ++ public DecimalSumAggregator(ObjectColumnSelector selector) ++ { ++ this.selector = selector; ++ } ++ ++ @Override ++ public void aggregate() ++ { ++ BigDecimal value = (BigDecimal) selector.getObject(); ++ ++ sum = sum.add(value); ++ } ++ ++ @Override ++ public void reset() ++ { ++ sum = new BigDecimal(0); ++ } ++ ++ @Override ++ public Object get() ++ { ++ return sum; ++ } ++ ++ @Override ++ public void close() ++ { ++ } ++ ++ @Override ++ public float getFloat() ++ { ++ throw new UnsupportedOperationException("ExtendColumnAggregator does not support getFloat()"); ++ } ++ ++ @Override ++ public long getLong() ++ { ++ throw new UnsupportedOperationException("ExtendColumnAggregator does not support getLong()"); ++ } ++ ++ @Override ++ public double getDouble() ++ { ++ throw new UnsupportedOperationException("ExtendColumnAggregator does not support getDouble()"); ++ } ++} +diff --git a/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalSumAggregatorFactory.java b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalSumAggregatorFactory.java +new file mode 100644 +index 000000000..329609341 +--- /dev/null ++++ b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalSumAggregatorFactory.java +@@ -0,0 +1,143 @@ ++/* ++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. Metamarkets licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ */ ++ ++package io.druid.query.aggregation.decimal; ++ ++import com.fasterxml.jackson.annotation.JsonCreator; ++import com.fasterxml.jackson.annotation.JsonProperty; ++ ++import io.druid.java.util.common.IAE; ++import io.druid.java.util.common.StringUtils; ++import io.druid.query.aggregation.Aggregator; ++import io.druid.query.aggregation.AggregatorFactory; ++import io.druid.query.aggregation.AggregatorUtil; ++import io.druid.query.aggregation.BufferAggregator; ++import io.druid.segment.ColumnSelectorFactory; ++import io.druid.segment.ObjectColumnSelector; ++ ++import java.math.BigDecimal; ++import java.nio.ByteBuffer; ++import java.util.Arrays; ++import java.util.List; ++ ++public class DecimalSumAggregatorFactory extends DecimalAggregatorFactory ++{ ++ private static final byte CACHE_TYPE_ID = 27; ++ ++ @JsonCreator ++ public DecimalSumAggregatorFactory( ++ @JsonProperty("name") String name, ++ @JsonProperty("fieldName") String fieldName, ++ @JsonProperty("precision") Integer precision ++ ) ++ { ++ super(name, fieldName, precision); ++ } ++ ++ @Override ++ public Aggregator factorize(ColumnSelectorFactory columnFactory) ++ { ++ ObjectColumnSelector selector = columnFactory.makeObjectColumnSelector(fieldName); ++ if (selector == null) { ++ throw new IAE("selector in ExtendColumnAggregatorFactory should not be Null"); ++ } else { ++ return new DecimalSumAggregator(selector); ++ } ++ } ++ ++ @Override ++ public BufferAggregator factorizeBuffered(ColumnSelectorFactory columnFactory) ++ { ++ ObjectColumnSelector selector = columnFactory.makeObjectColumnSelector(fieldName); ++ ++ final Class classOfObject = selector.classOfObject(); ++ if (!classOfObject.equals(Object.class) && !BigDecimal.class.isAssignableFrom(classOfObject)) { ++ throw new IAE("Incompatible type for metric[%s], expected a ExtendByteArray, got a %s", fieldName, classOfObject); ++ } ++ ++ return new DecimalSumBufferAggregator(selector); ++ } ++ ++ @Override ++ public Object combine(Object o1, Object o2) ++ { ++ BigDecimal left = (BigDecimal) o1; ++ BigDecimal right = (BigDecimal) o2; ++ ++ return left.add(right); ++ } ++ ++ @Override ++ public AggregatorFactory getCombiningFactory() ++ { ++ return new DecimalSumAggregatorFactory(name, name, precision); ++ } ++ ++ @Override ++ public List<AggregatorFactory> getRequiredColumns() ++ { ++ return Arrays.<AggregatorFactory>asList(new DecimalSumAggregatorFactory(fieldName, fieldName, precision)); ++ } ++ ++ @Override ++ public byte[] getCacheKey() ++ { ++ byte[] fieldNameBytes = StringUtils.toUtf8(fieldName); ++ return ByteBuffer.allocate(2 + fieldNameBytes.length) ++ .put(CACHE_TYPE_ID) ++ .put(fieldNameBytes) ++ .put(AggregatorUtil.STRING_SEPARATOR) ++ .array(); ++ } ++ ++ @Override ++ public String getTypeName() ++ { ++ return DecimalDruidModule.DECIMALSUM; ++ } ++ ++ @Override ++ public boolean equals(Object o) ++ { ++ if (this == o) { ++ return true; ++ } ++ if (o == null || getClass() != o.getClass()) { ++ return false; ++ } ++ ++ DecimalSumAggregatorFactory that = (DecimalSumAggregatorFactory) o; ++ ++ if (!fieldName.equals(that.fieldName)) { ++ return false; ++ } ++ ++ if (!name.equals(that.name)) { ++ return false; ++ } ++ ++ return true; ++ } ++ ++ @Override ++ public String toString() ++ { ++ return "DecimalSumAggregatorFactory {" + "name='" + name + '\'' + ", fieldName='" + fieldName + '\'' + '}'; ++ } ++} +diff --git a/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalSumBufferAggregator.java b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalSumBufferAggregator.java +new file mode 100644 +index 000000000..69f510eb1 +--- /dev/null ++++ b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalSumBufferAggregator.java +@@ -0,0 +1,54 @@ ++/* ++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. Metamarkets licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ */ ++ ++package io.druid.query.aggregation.decimal; ++ ++import io.druid.segment.ObjectColumnSelector; ++ ++import java.math.BigDecimal; ++import java.nio.ByteBuffer; ++ ++public class DecimalSumBufferAggregator extends DecimalBufferAggregator ++{ ++ ++ public DecimalSumBufferAggregator(ObjectColumnSelector selector) ++ { ++ super(selector); ++ } ++ ++ @Override ++ public void init(ByteBuffer buf, int position) ++ { ++ BigDecimal zero = BigDecimal.ZERO; ++ ++ writeToBuffer(zero, buf, position); ++ } ++ ++ @Override ++ public void aggregate(ByteBuffer buf, int position) ++ { ++ BigDecimal value = (BigDecimal) selector.getObject(); ++ ++ BigDecimal oldSum = readTFromBuffer(buf, position); ++ ++ BigDecimal sum = oldSum.add(value); ++ ++ writeToBuffer(sum, buf, position); ++ } ++} +diff --git a/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalSumSerde.java b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalSumSerde.java +new file mode 100644 +index 000000000..d330bae3d +--- /dev/null ++++ b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalSumSerde.java +@@ -0,0 +1,29 @@ ++/* ++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. Metamarkets licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ */ ++ ++package io.druid.query.aggregation.decimal; ++ ++public class DecimalSumSerde extends DecimalSerde ++{ ++ @Override ++ public String getTypeName() ++ { ++ return DecimalDruidModule.DECIMALSUM; ++ } ++} +diff --git a/extensions-contrib/decimal/src/main/resources/META-INF/services/io.druid.initialization.DruidModule b/extensions-contrib/decimal/src/main/resources/META-INF/services/io.druid.initialization.DruidModule +new file mode 100644 +index 000000000..86c36b366 +--- /dev/null ++++ b/extensions-contrib/decimal/src/main/resources/META-INF/services/io.druid.initialization.DruidModule +@@ -0,0 +1 @@ ++io.druid.query.aggregation.decimal.DecimalDruidModule +diff --git a/extensions-contrib/decimal/src/test/java/io.druid.query.aggregation.decimal/DecimalGroupByQueryTest.java b/extensions-contrib/decimal/src/test/java/io.druid.query.aggregation.decimal/DecimalGroupByQueryTest.java +new file mode 100644 +index 000000000..e7207fa2e +--- /dev/null ++++ b/extensions-contrib/decimal/src/test/java/io.druid.query.aggregation.decimal/DecimalGroupByQueryTest.java +@@ -0,0 +1,250 @@ ++/* ++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. Metamarkets licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ */ ++ ++package io.druid.query.aggregation.decimal; ++ ++import com.fasterxml.jackson.databind.Module; ++import com.fasterxml.jackson.databind.ObjectMapper; ++import com.google.common.collect.ImmutableMap; ++import com.google.common.collect.Lists; ++import io.druid.data.input.MapBasedInputRow; ++import io.druid.data.input.Row; ++import io.druid.jackson.DefaultObjectMapper; ++import io.druid.java.util.common.Intervals; ++import io.druid.query.QueryRunnerTestHelper; ++import io.druid.query.aggregation.AggregatorFactory; ++import io.druid.query.dimension.DefaultDimensionSpec; ++import io.druid.query.dimension.DimensionSpec; ++import io.druid.query.groupby.GroupByQuery; ++import io.druid.query.groupby.GroupByQueryConfig; ++import io.druid.query.groupby.GroupByQueryRunnerFactory; ++import io.druid.query.groupby.GroupByQueryRunnerTest; ++import io.druid.query.groupby.GroupByQueryRunnerTestHelper; ++import io.druid.query.groupby.orderby.DefaultLimitSpec; ++import io.druid.query.groupby.orderby.OrderByColumnSpec; ++import io.druid.segment.IndexIO; ++import io.druid.segment.IndexMerger; ++import io.druid.segment.IndexMergerTestBase; ++import io.druid.segment.IndexMergerV9; ++import io.druid.segment.IndexSpec; ++import io.druid.segment.QueryableIndex; ++import io.druid.segment.QueryableIndexSegment; ++import io.druid.segment.TestHelper; ++import io.druid.segment.column.ColumnConfig; ++import io.druid.segment.data.CompressedObjectStrategy; ++import io.druid.segment.data.CompressionFactory; ++import io.druid.segment.data.ConciseBitmapSerdeFactory; ++import io.druid.segment.incremental.IncrementalIndex; ++import io.druid.segment.incremental.IncrementalIndexSchema; ++import io.druid.segment.serde.ComplexMetrics; ++import org.apache.commons.io.FileUtils; ++import org.junit.Test; ++ ++import java.io.File; ++import java.math.BigDecimal; ++import java.util.Arrays; ++import java.util.List; ++ ++public class DecimalGroupByQueryTest ++{ ++ ++ private static final IndexSpec INDEX_SPEC = IndexMergerTestBase.makeIndexSpec( ++ new ConciseBitmapSerdeFactory(), ++ CompressedObjectStrategy.CompressionStrategy.LZ4, ++ CompressedObjectStrategy.CompressionStrategy.LZ4, ++ CompressionFactory.LongEncodingStrategy.LONGS ++ ); ++ private static IndexMerger INDEX_MERGER; ++ ++ private static ObjectMapper JSON_MAPPER = new DefaultObjectMapper(); ++ ++ private static IndexIO INDEX_IO; ++ ++ static { ++ if (ComplexMetrics.getSerdeForType(DecimalDruidModule.DECIMALSUM) == null) { ++ ComplexMetrics.registerSerde(DecimalDruidModule.DECIMALSUM, new DecimalSumSerde()); ++ } ++ ++ if (ComplexMetrics.getSerdeForType(DecimalDruidModule.DECIMALMIN) == null) { ++ ComplexMetrics.registerSerde(DecimalDruidModule.DECIMALMIN, new DecimalMinSerde()); ++ } ++ ++ if (ComplexMetrics.getSerdeForType(DecimalDruidModule.DECIMALMAX) == null) { ++ ComplexMetrics.registerSerde(DecimalDruidModule.DECIMALMAX, new DecimalMaxSerde()); ++ } ++ ++ for (Module mod : new DecimalDruidModule().getJacksonModules()) { ++ JSON_MAPPER.registerModule(mod); ++ } ++ ++ INDEX_IO = new IndexIO(JSON_MAPPER, new ColumnConfig() ++ { ++ @Override ++ public int columnCacheSizeBytes() ++ { ++ return 0; ++ } ++ }); ++ ++ INDEX_MERGER = new IndexMergerV9(JSON_MAPPER, INDEX_IO); ++ } ++ ++ @Test ++ public void testGroupByWithDistinctCountAgg() throws Exception ++ { ++ final GroupByQueryConfig config = new GroupByQueryConfig(); ++ config.setMaxIntermediateRows(100000000); ++ final GroupByQueryRunnerFactory factory = GroupByQueryRunnerTest.makeQueryRunnerFactory(config); ++ ++ String visitor_id = "visitor_id"; ++ String client_type = "client_type"; ++ String sum = "sum"; ++ String max = "max"; ++ String min = "min"; ++ long timestamp = System.currentTimeMillis(); ++ ++ IncrementalIndex index = new IncrementalIndex.Builder() ++ .setIndexSchema( ++ new IncrementalIndexSchema.Builder() ++ .withMetrics(new AggregatorFactory[]{ ++ new DecimalSumAggregatorFactory(sum, sum, 100), ++ new DecimalMaxAggregatorFactory(max, max, 100), ++ new DecimalMinAggregatorFactory(min, min, 100), ++ }) ++ .withRollup(false) ++ .build() ++ ) ++ .setReportParseExceptions(false) ++ .setConcurrentEventAdd(true) ++ .setMaxRowCount(1000000) ++ .buildOnheap(); ++ ++ BigDecimal decimal1 = new BigDecimal(2.0); ++ BigDecimal decimal2 = new BigDecimal(3.0000); ++ BigDecimal decimal3 = new BigDecimal(0.8888); ++ ++ index.add(new MapBasedInputRow( ++ timestamp, ++ Lists.newArrayList(visitor_id, client_type), ++ ImmutableMap.<String, Object>of( ++ visitor_id, ++ "0", ++ client_type, ++ "iphone", ++ sum, ++ decimal1, ++ max, ++ decimal1, ++ min, ++ decimal1 ++ ) ++ )); ++ index.add(new MapBasedInputRow( ++ timestamp, ++ Lists.newArrayList(visitor_id, client_type), ++ ImmutableMap.<String, Object>of( ++ visitor_id, ++ "0", ++ client_type, ++ "iphone", ++ sum, ++ decimal2, ++ max, ++ decimal2, ++ min, ++ decimal2 ++ ) ++ )); ++ index.add(new MapBasedInputRow( ++ timestamp, ++ Lists.newArrayList(visitor_id, client_type), ++ ImmutableMap.<String, Object>of( ++ visitor_id, ++ "0", ++ client_type, ++ "android", ++ sum, ++ decimal3, ++ max, ++ decimal3, ++ min, ++ decimal3 ++ ) ++ )); ++ ++ final File finalFile = new File("tmp"); ++ INDEX_MERGER.persist( ++ index, ++ Intervals.of("1970-01-01T00:00:00.000Z/2020-01-01T00:00:00.000Z"), ++ finalFile, ++ INDEX_SPEC ++ ); ++ ++ QueryableIndex queryableIndex = INDEX_IO.loadIndex(finalFile); ++ QueryableIndexSegment segment = new QueryableIndexSegment("index", queryableIndex); ++ ++ GroupByQuery query = new GroupByQuery.Builder().setDataSource(QueryRunnerTestHelper.dataSource) ++ .setGranularity(QueryRunnerTestHelper.allGran) ++ .setDimensions(Arrays.<DimensionSpec>asList(new DefaultDimensionSpec( ++ client_type, ++ client_type ++ ))) ++ .setInterval(QueryRunnerTestHelper.fullOnInterval) ++ .setLimitSpec( ++ new DefaultLimitSpec( ++ Lists.newArrayList( ++ new OrderByColumnSpec( ++ client_type, ++ OrderByColumnSpec.Direction.DESCENDING ++ ) ++ ), 10)) ++ .setAggregatorSpecs(Lists.newArrayList( ++ new DecimalSumAggregatorFactory(sum, sum, 100), ++ new DecimalMaxAggregatorFactory(max, max, 100), ++ new DecimalMinAggregatorFactory(min, min, 100) ++ )) ++ .build(); ++ ++ Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, factory.createRunner(segment), query); ++ ++ ++ BigDecimal decimalSum1 = decimal1.add(decimal2); ++ ++ List<Row> expectedResults = Arrays.asList( ++ GroupByQueryRunnerTestHelper.createExpectedRow( ++ "1970-01-01T00:00:00.000Z", ++ client_type, "iphone", ++ sum, decimalSum1, ++ max, decimal2, ++ min, decimal1 ++ ), ++ GroupByQueryRunnerTestHelper.createExpectedRow( ++ "1970-01-01T00:00:00.000Z", ++ client_type, "android", ++ sum, decimal3, ++ max, decimal3, ++ min, decimal3 ++ ) ++ ); ++ ++ TestHelper.assertExpectedObjects(expectedResults, results, "decimal"); ++ ++ FileUtils.deleteDirectory(finalFile); ++ } ++} +diff --git a/extensions-contrib/kylin-distinccount/pom.xml b/extensions-contrib/kylin-distinccount/pom.xml +new file mode 100644 +index 000000000..a2031f4e5 +--- /dev/null ++++ b/extensions-contrib/kylin-distinccount/pom.xml +@@ -0,0 +1,59 @@ ++<?xml version="1.0" encoding="UTF-8"?> ++<!-- ++ ~ Licensed to Metamarkets Group Inc. (Metamarkets) under one ++ ~ or more contributor license agreements. See the NOTICE file ++ ~ distributed with this work for additional information ++ ~ regarding copyright ownership. Metamarkets licenses this file ++ ~ to you under the Apache License, Version 2.0 (the ++ ~ "License"); you may not use this file except in compliance ++ ~ with the License. You may obtain a copy of the License at ++ ~ ++ ~ http://www.apache.org/licenses/LICENSE-2.0 ++ ~ ++ ~ Unless required by applicable law or agreed to in writing, ++ ~ software distributed under the License is distributed on an ++ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ ~ KIND, either express or implied. See the License for the ++ ~ specific language governing permissions and limitations ++ ~ under the License. ++ --> ++ ++<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> ++ <modelVersion>4.0.0</modelVersion> ++ ++ <groupId>io.druid.extensions.contrib</groupId> ++ <artifactId>druid-kylin-distinctcount</artifactId> ++ <name>druid-kylin-distinctcount</name> ++ <description>druid-kylin-distinctcount</description> ++ ++ <parent> ++ <groupId>io.druid</groupId> ++ <artifactId>druid</artifactId> ++ <version>0.11.1-SNAPSHOT</version> ++ <relativePath>../../pom.xml</relativePath> ++ </parent> ++ ++ <dependencies> ++ <dependency> ++ <groupId>io.druid</groupId> ++ <artifactId>druid-processing</artifactId> ++ <version>${project.parent.version}</version> ++ <scope>provided</scope> ++ </dependency> ++ ++ <!-- Tests --> ++ <dependency> ++ <groupId>io.druid</groupId> ++ <artifactId>druid-processing</artifactId> ++ <version>${project.parent.version}</version> ++ <scope>test</scope> ++ <type>test-jar</type> ++ </dependency> ++ <dependency> ++ <groupId>junit</groupId> ++ <artifactId>junit</artifactId> ++ <scope>test</scope> ++ </dependency> ++ </dependencies> ++ ++</project> +diff --git a/extensions-contrib/kylin-distinccount/src/main/java/io/druid/query/aggregation/kylin.distinctcount/DistinctCountAggregator.java b/extensions-contrib/kylin-distinccount/src/main/java/io/druid/query/aggregation/kylin.distinctcount/DistinctCountAggregator.java +new file mode 100644 +index 000000000..48db064d9 +--- /dev/null ++++ b/extensions-contrib/kylin-distinccount/src/main/java/io/druid/query/aggregation/kylin.distinctcount/DistinctCountAggregator.java +@@ -0,0 +1,73 @@ ++/* ++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. Metamarkets licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ */ ++ ++package io.druid.query.aggregation.kylin.distinctcount; ++ ++import io.druid.collections.bitmap.ImmutableBitmap; ++import io.druid.collections.bitmap.RoaringBitmapFactory; ++import io.druid.query.aggregation.Aggregator; ++import io.druid.segment.ObjectColumnSelector; ++ ++public class DistinctCountAggregator implements Aggregator ++{ ++ ++ private final ObjectColumnSelector selector; ++ private ImmutableBitmap bitmap = new RoaringBitmapFactory().makeEmptyImmutableBitmap(); ++ ++ public DistinctCountAggregator(ObjectColumnSelector selector) ++ { ++ this.selector = selector; ++ } ++ ++ @Override ++ public void aggregate() ++ { ++ bitmap = bitmap.union((ImmutableBitmap) selector.getObject()); ++ } ++ ++ @Override ++ public void reset() ++ { ++ bitmap = null; ++ } ++ ++ @Override ++ public Object get() ++ { ++ return bitmap; ++ } ++ ++ @Override ++ public void close() ++ { ++ bitmap = null; ++ } ++ ++ @Override ++ public float getFloat() ++ { ++ throw new UnsupportedOperationException("DistinctCountAggregator does not support getFloat()"); ++ } ++ ++ @Override ++ public long getLong() ++ { ++ throw new UnsupportedOperationException("DistinctCountAggregator does not support getLong()"); ++ } ++} +diff --git a/extensions-contrib/kylin-distinccount/src/main/java/io/druid/query/aggregation/kylin.distinctcount/DistinctCountAggregatorFactory.java b/extensions-contrib/kylin-distinccount/src/main/java/io/druid/query/aggregation/kylin.distinctcount/DistinctCountAggregatorFactory.java +new file mode 100644 +index 000000000..03f04b3ff +--- /dev/null ++++ b/extensions-contrib/kylin-distinccount/src/main/java/io/druid/query/aggregation/kylin.distinctcount/DistinctCountAggregatorFactory.java +@@ -0,0 +1,223 @@ ++/* ++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. Metamarkets licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ */ ++ ++package io.druid.query.aggregation.kylin.distinctcount; ++ ++import com.fasterxml.jackson.annotation.JsonCreator; ++import com.fasterxml.jackson.annotation.JsonProperty; ++import com.google.common.base.Preconditions; ++ ++import io.druid.collections.bitmap.ImmutableBitmap; ++import io.druid.collections.bitmap.RoaringBitmapFactory; ++import io.druid.java.util.common.IAE; ++import io.druid.java.util.common.StringUtils; ++import io.druid.query.aggregation.Aggregator; ++import io.druid.query.aggregation.AggregatorFactory; ++import io.druid.query.aggregation.AggregatorUtil; ++import io.druid.query.aggregation.BufferAggregator; ++import io.druid.segment.ColumnSelectorFactory; ++import io.druid.segment.ObjectColumnSelector; ++import io.druid.segment.data.RoaringBitmapSerdeFactory; ++import org.apache.commons.codec.binary.Base64; ++ ++import java.nio.ByteBuffer; ++import java.util.Arrays; ++import java.util.Comparator; ++import java.util.List; ++ ++public class DistinctCountAggregatorFactory extends AggregatorFactory ++{ ++ private static final byte CACHE_TYPE_ID = 21; ++ ++ private final String name; ++ private final String fieldName; ++ ++ @JsonCreator ++ public DistinctCountAggregatorFactory(@JsonProperty("name") String name, @JsonProperty("fieldName") String fieldName) ++ { ++ Preconditions.checkNotNull(name); ++ Preconditions.checkNotNull(fieldName); ++ this.name = name; ++ this.fieldName = fieldName; ++ } ++ ++ @Override ++ public Aggregator factorize(ColumnSelectorFactory columnFactory) ++ { ++ ObjectColumnSelector selector = columnFactory.makeObjectColumnSelector(fieldName); ++ if (selector == null) { ++ throw new IAE("selector in DistinctCountAggregatorFactory should not be Null"); ++ } else { ++ return new DistinctCountAggregator(selector); ++ } ++ } ++ ++ @Override ++ public BufferAggregator factorizeBuffered(ColumnSelectorFactory columnFactory) ++ { ++ ObjectColumnSelector selector = columnFactory.makeObjectColumnSelector(fieldName); ++ ++ final Class classOfObject = selector.classOfObject(); ++ if (!classOfObject.equals(Object.class) && !ImmutableBitmap.class.isAssignableFrom(classOfObject)) { ++ throw new IAE("Incompatible type for metric[%s], expected a MutableBitmap, got a %s", fieldName, classOfObject); ++ } ++ ++ return new DistinctCountBufferAggregator(selector); ++ } ++ ++ @Override ++ public Comparator getComparator() ++ { ++ return new Comparator() ++ { ++ @Override ++ public int compare(Object o, Object o1) ++ { ++ return new RoaringBitmapSerdeFactory(true).getObjectStrategy() ++ .compare((ImmutableBitmap) o, (ImmutableBitmap) o1); ++ } ++ }; ++ } ++ ++ @Override ++ public Object combine(Object lhs, Object rhs) ++ { ++ if (rhs == null) { ++ return lhs; ++ } ++ if (lhs == null) { ++ return rhs; ++ } ++ ++ return ((ImmutableBitmap) lhs).union((ImmutableBitmap) rhs); ++ } ++ ++ @Override ++ public AggregatorFactory getCombiningFactory() ++ { ++ return new DistinctCountAggregatorFactory(name, name); ++ } ++ ++ @Override ++ public List<AggregatorFactory> getRequiredColumns() ++ { ++ return Arrays.<AggregatorFactory>asList(new DistinctCountAggregatorFactory(fieldName, fieldName)); ++ } ++ ++ @Override ++ public Object deserialize(Object object) ++ { ++ final ByteBuffer buffer; ++ if (object instanceof String) { ++ buffer = ByteBuffer.wrap(Base64.decodeBase64(StringUtils.toUtf8((String) object))); ++ } else if (object instanceof byte[]) { ++ buffer = ByteBuffer.wrap((byte[]) object); ++ } else if (object instanceof ByteBuffer) { ++ // Be conservative, don't assume we own this buffer. ++ buffer = ((ByteBuffer) object).duplicate(); ++ } else { ++ return object; ++ } ++ return new RoaringBitmapFactory().mapImmutableBitmap(buffer); ++ } ++ ++ @Override ++ public Object finalizeComputation(Object object) ++ { ++ return object; ++ } ++ ++ @JsonProperty ++ public String getFieldName() ++ { ++ return fieldName; ++ } ++ ++ @Override ++ @JsonProperty ++ public String getName() ++ { ++ return name; ++ } ++ ++ @Override ++ public List<String> requiredFields() ++ { ++ return Arrays.asList(fieldName); ++ } ++ ++ @Override ++ public byte[] getCacheKey() ++ { ++ byte[] fieldNameBytes = StringUtils.toUtf8(fieldName); ++ return ByteBuffer.allocate(2 + fieldNameBytes.length) ++ .put(CACHE_TYPE_ID) ++ .put(fieldNameBytes) ++ .put(AggregatorUtil.STRING_SEPARATOR) ++ .array(); ++ } ++ ++ @Override ++ public String getTypeName() ++ { ++ return DistinctCountDruidModule.DISTINCT_COUNT; ++ } ++ ++ @Override ++ public int getMaxIntermediateSize() ++ { ++ return 4; ++ } ++ ++ @Override ++ public boolean equals(Object o) ++ { ++ if (this == o) { ++ return true; ++ } ++ if (o == null || getClass() != o.getClass()) { ++ return false; ++ } ++ ++ DistinctCountAggregatorFactory that = (DistinctCountAggregatorFactory) o; ++ ++ if (!fieldName.equals(that.fieldName)) { ++ return false; ++ } ++ if (!name.equals(that.name)) { ++ return false; ++ } ++ ++ return true; ++ } ++ ++ @Override ++ public int hashCode() ++ { ++ int result = name.hashCode(); ++ result = 31 * result + fieldName.hashCode(); ++ return result; ++ } ++ ++ @Override ++ public String toString() ++ { ++ return "DistinctCountAggregatorFactory{" + "name='" + name + '\'' + ", fieldName='" + fieldName + '\'' + '}'; ++ } ++} +diff --git a/extensions-contrib/kylin-distinccount/src/main/java/io/druid/query/aggregation/kylin.distinctcount/DistinctCountBufferAggregator.java b/extensions-contrib/kylin-distinccount/src/main/java/io/druid/query/aggregation/kylin.distinctcount/DistinctCountBufferAggregator.java +new file mode 100644 +index 000000000..1693972d0 +--- /dev/null ++++ b/extensions-contrib/kylin-distinccount/src/main/java/io/druid/query/aggregation/kylin.distinctcount/DistinctCountBufferAggregator.java +@@ -0,0 +1,120 @@ ++/* ++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. Metamarkets licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ */ ++ ++package io.druid.query.aggregation.kylin.distinctcount; ++ ++import io.druid.collections.bitmap.ImmutableBitmap; ++import io.druid.collections.bitmap.RoaringBitmapFactory; ++import io.druid.query.aggregation.BufferAggregator; ++import io.druid.segment.ObjectColumnSelector; ++import it.unimi.dsi.fastutil.ints.Int2ObjectMap; ++import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; ++ ++import java.nio.ByteBuffer; ++import java.util.IdentityHashMap; ++ ++public class DistinctCountBufferAggregator implements BufferAggregator ++{ ++ private final ObjectColumnSelector selector; ++ private final IdentityHashMap<ByteBuffer, Int2ObjectMap<ImmutableBitmap>> bitmaps = new IdentityHashMap<>(); ++ ++ public DistinctCountBufferAggregator(ObjectColumnSelector selector) ++ { ++ this.selector = selector; ++ } ++ ++ @Override ++ public void init(ByteBuffer buf, int position) ++ { ++ createNewBitmap(buf, position); ++ } ++ ++ @Override ++ public void aggregate(ByteBuffer buf, int position) ++ { ++ ImmutableBitmap updateBitmap = (ImmutableBitmap) selector.getObject(); ++ if (updateBitmap == null) { ++ return; ++ } ++ ++ ImmutableBitmap oldBitmap = getBitmap(buf, position); ++ ImmutableBitmap mergedBitmap = oldBitmap.union(updateBitmap); ++ bitmaps.get(buf).put(position, mergedBitmap); ++ } ++ ++ private ImmutableBitmap createNewBitmap(ByteBuffer buf, int position) ++ { ++ ImmutableBitmap immutableBitmap = new RoaringBitmapFactory().makeEmptyImmutableBitmap(); ++ Int2ObjectMap<ImmutableBitmap> bitmapMap = bitmaps.get(buf); ++ if (bitmapMap == null) { ++ bitmapMap = new Int2ObjectOpenHashMap<>(); ++ bitmaps.put(buf, bitmapMap); ++ } ++ bitmapMap.put(position, immutableBitmap); ++ return immutableBitmap; ++ } ++ ++ //Note that this is not threadsafe and I don't think it needs to be ++ private ImmutableBitmap getBitmap(ByteBuffer buf, int position) ++ { ++ Int2ObjectMap<ImmutableBitmap> bitmapMap = bitmaps.get(buf); ++ ImmutableBitmap bitmap = bitmapMap != null ? bitmapMap.get(position) : null; ++ if (bitmap != null) { ++ return bitmap; ++ } ++ return createNewBitmap(buf, position); ++ } ++ ++ @Override ++ public Object get(ByteBuffer buf, int position) ++ { ++ return getBitmap(buf, position); ++ } ++ ++ @Override ++ public float getFloat(ByteBuffer buf, int position) ++ { ++ throw new UnsupportedOperationException("DistinctCountBufferAggregator does not support getFloat()"); ++ } ++ ++ @Override ++ public long getLong(ByteBuffer buf, int position) ++ { ++ throw new UnsupportedOperationException("DistinctCountBufferAggregator does not support getLong()"); ++ } ++ ++ @Override ++ public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer) ++ { ++ createNewBitmap(newBuffer, newPosition); ++ Int2ObjectMap<ImmutableBitmap> bitmapMap = bitmaps.get(oldBuffer); ++ if (bitmapMap != null) { ++ bitmaps.get(newBuffer).put(newPosition, bitmapMap.get(oldPosition)); ++ bitmapMap.remove(oldPosition); ++ if (bitmapMap.isEmpty()) { ++ bitmaps.remove(oldBuffer); ++ } ++ } ++ } ++ ++ @Override ++ public void close() ++ { ++ } ++} +diff --git a/extensions-contrib/kylin-distinccount/src/main/java/io/druid/query/aggregation/kylin.distinctcount/DistinctCountDruidModule.java b/extensions-contrib/kylin-distinccount/src/main/java/io/druid/query/aggregation/kylin.distinctcount/DistinctCountDruidModule.java +new file mode 100644 +index 000000000..c8e729ba7 +--- /dev/null ++++ b/extensions-contrib/kylin-distinccount/src/main/java/io/druid/query/aggregation/kylin.distinctcount/DistinctCountDruidModule.java +@@ -0,0 +1,53 @@ ++/* ++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. Metamarkets licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ */ ++ ++package io.druid.query.aggregation.kylin.distinctcount; ++ ++import com.fasterxml.jackson.databind.Module; ++import com.fasterxml.jackson.databind.jsontype.NamedType; ++import com.fasterxml.jackson.databind.module.SimpleModule; ++import com.google.common.collect.ImmutableList; ++import com.google.inject.Binder; ++import io.druid.initialization.DruidModule; ++import io.druid.segment.serde.ComplexMetrics; ++ ++import java.util.List; ++ ++public class DistinctCountDruidModule implements DruidModule ++{ ++ public static final String DISTINCT_COUNT = "kylin-distinctCount"; ++ ++ @Override ++ public List<? extends Module> getJacksonModules() ++ { ++ return ImmutableList.of( ++ new SimpleModule("KylinDistinctCountModule").registerSubtypes( ++ new NamedType(DistinctCountAggregatorFactory.class, DISTINCT_COUNT) ++ ) ++ ); ++ } ++ ++ @Override ++ public void configure(Binder binder) ++ { ++ if (ComplexMetrics.getSerdeForType(DISTINCT_COUNT) == null) { ++ ComplexMetrics.registerSerde(DISTINCT_COUNT, new DistinctCountSerde()); ++ } ++ } ++} +diff --git a/extensions-contrib/kylin-distinccount/src/main/java/io/druid/query/aggregation/kylin.distinctcount/DistinctCountSerde.java b/extensions-contrib/kylin-distinccount/src/main/java/io/druid/query/aggregation/kylin.distinctcount/DistinctCountSerde.java +new file mode 100644 +index 000000000..3559ae1e0 +--- /dev/null ++++ b/extensions-contrib/kylin-distinccount/src/main/java/io/druid/query/aggregation/kylin.distinctcount/DistinctCountSerde.java +@@ -0,0 +1,95 @@ ++/* ++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. Metamarkets licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ */ ++ ++package io.druid.query.aggregation.kylin.distinctcount; ++ ++import io.druid.collections.bitmap.ImmutableBitmap; ++import io.druid.data.input.InputRow; ++import io.druid.java.util.common.IAE; ++import io.druid.segment.GenericColumnSerializer; ++import io.druid.segment.column.ColumnBuilder; ++import io.druid.segment.data.GenericIndexed; ++import io.druid.segment.data.IOPeon; ++import io.druid.segment.data.ObjectStrategy; ++import io.druid.segment.data.RoaringBitmapSerdeFactory; ++import io.druid.segment.serde.ComplexColumnPartSupplier; ++import io.druid.segment.serde.ComplexMetricExtractor; ++import io.druid.segment.serde.ComplexMetricSerde; ++import io.druid.segment.serde.LargeColumnSupportedComplexColumnSerializer; ++ ++import java.nio.ByteBuffer; ++ ++public class DistinctCountSerde extends ComplexMetricSerde ++{ ++ ++ public DistinctCountSerde() ++ { ++ } ++ ++ @Override ++ public String getTypeName() ++ { ++ return DistinctCountDruidModule.DISTINCT_COUNT; ++ } ++ ++ @Override ++ public ComplexMetricExtractor getExtractor() ++ { ++ return new ComplexMetricExtractor() ++ { ++ @Override ++ public Class<ImmutableBitmap> extractedClass() ++ { ++ return ImmutableBitmap.class; ++ } ++ ++ @Override ++ public ImmutableBitmap extractValue(InputRow inputRow, String metricName) ++ { ++ Object rawValue = inputRow.getRaw(metricName); ++ ++ if (ImmutableBitmap.class.isAssignableFrom(rawValue.getClass())) { ++ return (ImmutableBitmap) rawValue; ++ } else { ++ throw new IAE("The class must be MutableBitmap"); ++ } ++ } ++ }; ++ } ++ ++ @Override ++ public void deserializeColumn(ByteBuffer byteBuffer, ColumnBuilder columnBuilder) ++ { ++ final GenericIndexed column = GenericIndexed.read(byteBuffer, getObjectStrategy()); ++ columnBuilder.setComplexColumn(new ComplexColumnPartSupplier(getTypeName(), column)); ++ } ++ ++ @Override ++ public ObjectStrategy getObjectStrategy() ++ { ++ RoaringBitmapSerdeFactory bitmapSerdeFactory = new RoaringBitmapSerdeFactory(true); ++ return bitmapSerdeFactory.getObjectStrategy(); ++ } ++ ++ @Override ++ public GenericColumnSerializer getSerializer(IOPeon peon, String column) ++ { ++ return LargeColumnSupportedComplexColumnSerializer.create(peon, column, this.getObjectStrategy()); ++ } ++} +diff --git a/extensions-contrib/kylin-distinccount/src/main/resources/META-INF/services/io.druid.initialization.DruidModule b/extensions-contrib/kylin-distinccount/src/main/resources/META-INF/services/io.druid.initialization.DruidModule +new file mode 100644 +index 000000000..abad7df15 +--- /dev/null ++++ b/extensions-contrib/kylin-distinccount/src/main/resources/META-INF/services/io.druid.initialization.DruidModule +@@ -0,0 +1 @@ ++io.druid.query.aggregation.kylin.distinctcount.DistinctCountDruidModule +diff --git a/extensions-contrib/kylin-distinccount/src/test/java/io.druid.query.aggregation.kylin.distinctcount/DistinctCountBufferAggregatorTest.java b/extensions-contrib/kylin-distinccount/src/test/java/io.druid.query.aggregation.kylin.distinctcount/DistinctCountBufferAggregatorTest.java +new file mode 100644 +index 000000000..8bfd9b60a +--- /dev/null ++++ b/extensions-contrib/kylin-distinccount/src/test/java/io.druid.query.aggregation.kylin.distinctcount/DistinctCountBufferAggregatorTest.java +@@ -0,0 +1,71 @@ ++/* ++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. Metamarkets licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ */ ++ ++package io.druid.query.aggregation.kylin.distinctcount; ++ ++import com.google.common.collect.ImmutableMap; ++import io.druid.collections.bitmap.ImmutableBitmap; ++import io.druid.collections.bitmap.MutableBitmap; ++import io.druid.collections.bitmap.RoaringBitmapFactory; ++import io.druid.data.input.MapBasedRow; ++import io.druid.query.aggregation.AggregationTestHelper; ++import io.druid.query.groupby.GroupByQueryConfig; ++import io.druid.query.groupby.epinephelinae.GrouperTestUtil; ++import io.druid.query.groupby.epinephelinae.TestColumnSelectorFactory; ++import org.junit.Assert; ++import org.junit.Rule; ++import org.junit.Test; ++import org.junit.rules.TemporaryFolder; ++ ++public class DistinctCountBufferAggregatorTest ++{ ++ @Rule ++ public final TemporaryFolder tempFolder = new TemporaryFolder(); ++ ++ @Test ++ public void testRelocation() ++ { ++ DistinctCountDruidModule module = new DistinctCountDruidModule(); ++ final AggregationTestHelper helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper( ++ module.getJacksonModules(), ++ new GroupByQueryConfig(), ++ tempFolder ++ ); ++ final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory(); ++ RoaringBitmapFactory bitmapFactory = new RoaringBitmapFactory(true); ++ MutableBitmap bitmap1 = bitmapFactory.makeEmptyMutableBitmap(); ++ bitmap1.add(1); ++ ++ columnSelectorFactory.setRow(new MapBasedRow(0, ++ ImmutableMap.<String, Object>of( ++ "kylin", ++ bitmapFactory.makeImmutableBitmap(bitmap1) ++ ) ++ )); ++ ImmutableBitmap[] bitmaps = helper.runRelocateVerificationTest( ++ new DistinctCountAggregatorFactory( ++ "kylin", ++ "kylin" ++ ), ++ columnSelectorFactory, ++ ImmutableBitmap.class ++ ); ++ Assert.assertEquals(bitmaps[0].size(), bitmaps[1].size(), 0); ++ } ++} +diff --git a/extensions-contrib/kylin-distinccount/src/test/java/io.druid.query.aggregation.kylin.distinctcount/DistinctCountGroupByQueryTest.java b/extensions-contrib/kylin-distinccount/src/test/java/io.druid.query.aggregation.kylin.distinctcount/DistinctCountGroupByQueryTest.java +new file mode 100644 +index 000000000..912189d57 +--- /dev/null ++++ b/extensions-contrib/kylin-distinccount/src/test/java/io.druid.query.aggregation.kylin.distinctcount/DistinctCountGroupByQueryTest.java +@@ -0,0 +1,234 @@ ++/* ++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. Metamarkets licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ */ ++ ++package io.druid.query.aggregation.kylin.distinctcount; ++ ++import com.fasterxml.jackson.databind.Module; ++import com.fasterxml.jackson.databind.ObjectMapper; ++import com.google.common.collect.ImmutableMap; ++import com.google.common.collect.Lists; ++import io.druid.collections.bitmap.MutableBitmap; ++import io.druid.collections.bitmap.RoaringBitmapFactory; ++import io.druid.data.input.MapBasedInputRow; ++import io.druid.data.input.Row; ++import io.druid.jackson.DefaultObjectMapper; ++import io.druid.java.util.common.Intervals; ++import io.druid.query.QueryRunnerTestHelper; ++import io.druid.query.aggregation.AggregatorFactory; ++import io.druid.query.dimension.DefaultDimensionSpec; ++import io.druid.query.dimension.DimensionSpec; ++import io.druid.query.groupby.GroupByQuery; ++import io.druid.query.groupby.GroupByQueryConfig; ++import io.druid.query.groupby.GroupByQueryRunnerFactory; ++import io.druid.query.groupby.GroupByQueryRunnerTest; ++import io.druid.query.groupby.GroupByQueryRunnerTestHelper; ++import io.druid.query.groupby.orderby.DefaultLimitSpec; ++import io.druid.query.groupby.orderby.OrderByColumnSpec; ++import io.druid.segment.IndexIO; ++import io.druid.segment.IndexMerger; ++import io.druid.segment.IndexMergerTestBase; ++import io.druid.segment.IndexMergerV9; ++import io.druid.segment.IndexSpec; ++import io.druid.segment.QueryableIndex; ++import io.druid.segment.QueryableIndexSegment; ++import io.druid.segment.TestHelper; ++import io.druid.segment.column.ColumnConfig; ++import io.druid.segment.data.CompressedObjectStrategy; ++import io.druid.segment.data.CompressionFactory; ++import io.druid.segment.data.ConciseBitmapSerdeFactory; ++import io.druid.segment.incremental.IncrementalIndex; ++import io.druid.segment.incremental.IncrementalIndexSchema; ++import io.druid.segment.serde.ComplexMetrics; ++import org.apache.commons.io.FileUtils; ++import org.junit.Test; ++ ++import java.io.File; ++import java.util.Arrays; ++import java.util.List; ++ ++public class DistinctCountGroupByQueryTest ++{ ++ ++ private static final IndexSpec INDEX_SPEC = IndexMergerTestBase.makeIndexSpec( ++ new ConciseBitmapSerdeFactory(), ++ CompressedObjectStrategy.CompressionStrategy.LZ4, ++ CompressedObjectStrategy.CompressionStrategy.LZ4, ++ CompressionFactory.LongEncodingStrategy.LONGS ++ ); ++ private static IndexMerger INDEX_MERGER; ++ ++ private static ObjectMapper JSON_MAPPER = new DefaultObjectMapper(); ++ ++ private static IndexIO INDEX_IO; ++ ++ static { ++ if (ComplexMetrics.getSerdeForType("kylin-distinctCount") == null) { ++ ComplexMetrics.registerSerde("kylin-distinctCount", new DistinctCountSerde()); ++ } ++ ++ for (Module mod : new DistinctCountDruidModule().getJacksonModules()) { ++ JSON_MAPPER.registerModule(mod); ++ } ++ ++ INDEX_IO = new IndexIO(JSON_MAPPER, new ColumnConfig() ++ { ++ @Override ++ public int columnCacheSizeBytes() ++ { ++ return 0; ++ } ++ }); ++ ++ INDEX_MERGER = new IndexMergerV9(JSON_MAPPER, INDEX_IO); ++ } ++ ++ @Test ++ public void testGroupByWithDistinctCountAgg() throws Exception ++ { ++ final GroupByQueryConfig config = new GroupByQueryConfig(); ++ config.setMaxIntermediateRows(100000000); ++ final GroupByQueryRunnerFactory factory = GroupByQueryRunnerTest.makeQueryRunnerFactory(config); ++ ++ String visitor_id = "visitor_id"; ++ String client_type = "client_type"; ++ String kylin_distinctCount = "kylin"; ++ long timestamp = System.currentTimeMillis(); ++ ++ IncrementalIndex index = new IncrementalIndex.Builder() ++ .setIndexSchema( ++ new IncrementalIndexSchema.Builder() ++ .withMetrics(new AggregatorFactory[]{ ++ new DistinctCountAggregatorFactory( ++ "kylin", ++ "kylin" ++ ) ++ }) ++ .withRollup(false) ++ .build() ++ ) ++ .setReportParseExceptions(false) ++ .setConcurrentEventAdd(true) ++ .setMaxRowCount(1000000) ++ .buildOnheap(); ++ ++ RoaringBitmapFactory bitmapFactory = new RoaringBitmapFactory(true); ++ MutableBitmap bitmap1 = bitmapFactory.makeEmptyMutableBitmap(); ++ bitmap1.add(1); ++ bitmap1.add(2); ++ bitmap1.add(3); ++ ++ MutableBitmap bitmap2 = bitmapFactory.makeEmptyMutableBitmap(); ++ bitmap2.add(4); ++ ++ MutableBitmap bitmap3 = bitmapFactory.makeEmptyMutableBitmap(); ++ bitmap3.add(1); ++ ++ index.add(new MapBasedInputRow( ++ timestamp, ++ Lists.newArrayList(visitor_id, client_type), ++ ImmutableMap.<String, Object>of( ++ visitor_id, ++ "0", ++ client_type, ++ "iphone", ++ kylin_distinctCount, ++ bitmapFactory.makeImmutableBitmap(bitmap1) ++ ) ++ )); ++ index.add(new MapBasedInputRow( ++ timestamp, ++ Lists.newArrayList(visitor_id, client_type), ++ ImmutableMap.<String, Object>of( ++ visitor_id, ++ "0", ++ client_type, ++ "iphone", ++ kylin_distinctCount, ++ bitmapFactory.makeImmutableBitmap(bitmap2) ++ ) ++ )); ++ index.add(new MapBasedInputRow( ++ timestamp, ++ Lists.newArrayList(visitor_id, client_type), ++ ImmutableMap.<String, Object>of( ++ visitor_id, ++ "0", ++ client_type, ++ "android", ++ kylin_distinctCount, ++ bitmapFactory.makeImmutableBitmap(bitmap3) ++ ) ++ )); ++ ++ final File finalFile = new File("tmp"); ++ INDEX_MERGER.persist( ++ index, ++ Intervals.of("1970-01-01T00:00:00.000Z/2020-01-01T00:00:00.000Z"), ++ finalFile, ++ INDEX_SPEC ++ ); ++ ++ QueryableIndex queryableIndex = INDEX_IO.loadIndex(finalFile); ++ QueryableIndexSegment segment = new QueryableIndexSegment("index", queryableIndex); ++ ++ GroupByQuery query = new GroupByQuery.Builder().setDataSource(QueryRunnerTestHelper.dataSource) ++ .setGranularity(QueryRunnerTestHelper.allGran) ++ .setDimensions(Arrays.<DimensionSpec>asList(new DefaultDimensionSpec( ++ client_type, ++ client_type ++ ))) ++ .setInterval(QueryRunnerTestHelper.fullOnInterval) ++ .setLimitSpec( ++ new DefaultLimitSpec( ++ Lists.newArrayList( ++ new OrderByColumnSpec( ++ client_type, ++ OrderByColumnSpec.Direction.DESCENDING ++ ) ++ ), 10)) ++ .setAggregatorSpecs(Lists.newArrayList(new DistinctCountAggregatorFactory( ++ "kylin", ++ "kylin" ++ ))) ++ .build(); ++ ++ Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, factory.createRunner(segment), query); ++ ++ bitmap1.or(bitmap2); ++ System.out.println("bitmap1 :" + bitmap1.size()); ++ System.out.println("bitmap3 :" + bitmap3.size()); ++ ++ List<Row> expectedResults = Arrays.asList( ++ GroupByQueryRunnerTestHelper.createExpectedRow( ++ "1970-01-01T00:00:00.000Z", ++ client_type, "iphone", ++ "kylin", bitmap1 ++ ), ++ GroupByQueryRunnerTestHelper.createExpectedRow( ++ "1970-01-01T00:00:00.000Z", ++ client_type, "android", ++ "kylin", bitmap3 ++ ) ++ ); ++ ++ TestHelper.assertExpectedObjects(expectedResults, results, "distinct-count"); ++ ++ FileUtils.deleteDirectory(finalFile); ++ } ++} +diff --git a/extensions-contrib/kylin-extendcolumn/pom.xml b/extensions-contrib/kylin-extendcolumn/pom.xml +new file mode 100644 +index 000000000..50e192cd3 +--- /dev/null ++++ b/extensions-contrib/kylin-extendcolumn/pom.xml +@@ -0,0 +1,59 @@ ++<?xml version="1.0" encoding="UTF-8"?> ++<!-- ++ ~ Licensed to Metamarkets Group Inc. (Metamarkets) under one ++ ~ or more contributor license agreements. See the NOTICE file ++ ~ distributed with this work for additional information ++ ~ regarding copyright ownership. Metamarkets licenses this file ++ ~ to you under the Apache License, Version 2.0 (the ++ ~ "License"); you may not use this file except in compliance ++ ~ with the License. You may obtain a copy of the License at ++ ~ ++ ~ http://www.apache.org/licenses/LICENSE-2.0 ++ ~ ++ ~ Unless required by applicable law or agreed to in writing, ++ ~ software distributed under the License is distributed on an ++ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ ~ KIND, either express or implied. See the License for the ++ ~ specific language governing permissions and limitations ++ ~ under the License. ++ --> ++ ++<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> ++ <modelVersion>4.0.0</modelVersion> ++ ++ <groupId>io.druid.extensions.contrib</groupId> ++ <artifactId>druid-kylin-extendcolumn</artifactId> ++ <name>druid-kylin-extendcolumn</name> ++ <description>druid-kylin-extendcolumn</description> ++ ++ <parent> ++ <groupId>io.druid</groupId> ++ <artifactId>druid</artifactId> ++ <version>0.11.1-SNAPSHOT</version> ++ <relativePath>../../pom.xml</relativePath> ++ </parent> ++ ++ <dependencies> ++ <dependency> ++ <groupId>io.druid</groupId> ++ <artifactId>druid-processing</artifactId> ++ <version>${project.parent.version}</version> ++ <scope>provided</scope> ++ </dependency> ++ ++ <!-- Tests --> ++ <dependency> ++ <groupId>io.druid</groupId> ++ <artifactId>druid-processing</artifactId> ++ <version>${project.parent.version}</version> ++ <scope>test</scope> ++ <type>test-jar</type> ++ </dependency> ++ <dependency> ++ <groupId>junit</groupId> ++ <artifactId>junit</artifactId> ++ <scope>test</scope> ++ </dependency> ++ </dependencies> ++ ++</project> +diff --git a/extensions-contrib/kylin-extendcolumn/src/main/java/io/druid/query/aggregation/kylin.extendcolumn/ExtendColumnAggregator.java b/extensions-contrib/kylin-extendcolumn/src/main/java/io/druid/query/aggregation/kylin.extendcolumn/ExtendColumnAggregator.java +new file mode 100644 +index 000000000..6260639a1 +--- /dev/null ++++ b/extensions-contrib/kylin-extendcolumn/src/main/java/io/druid/query/aggregation/kylin.extendcolumn/ExtendColumnAggregator.java +@@ -0,0 +1,83 @@ ++/* ++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. Metamarkets licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ */ ++ ++package io.druid.query.aggregation.kylin.extendcolumn; ++ ++import io.druid.query.aggregation.Aggregator; ++import io.druid.segment.ObjectColumnSelector; ++ ++public class ExtendColumnAggregator implements Aggregator ++{ ++ ++ private final ObjectColumnSelector selector; ++ private String result; ++ ++ public ExtendColumnAggregator(ObjectColumnSelector selector) ++ { ++ this.selector = selector; ++ } ++ ++ @Override ++ public void aggregate() ++ { ++ String tmp = (String) selector.getObject(); ++ ++ if (tmp == null || tmp.length() == 0) { ++ return; ++ } ++ ++ result = tmp; ++ } ++ ++ @Override ++ public void reset() ++ { ++ result = null; ++ } ++ ++ @Override ++ public Object get() ++ { ++ return result; ++ } ++ ++ @Override ++ public void close() ++ { ++ result = null; ++ } ++ ++ @Override ++ public float getFloat() ++ { ++ throw new UnsupportedOperationException("ExtendColumnAggregator does not support getFloat()"); ++ } ++ ++ @Override ++ public long getLong() ++ { ++ throw new UnsupportedOperationException("ExtendColumnAggregator does not support getLong()"); ++ } ++ ++ @Override ++ public double getDouble() ++ { ++ throw new UnsupportedOperationException("ExtendColumnAggregator does not support getDouble()"); ++ } ++} +diff --git a/extensions-contrib/kylin-extendcolumn/src/main/java/io/druid/query/aggregation/kylin.extendcolumn/ExtendColumnAggregatorFactory.java b/extensions-contrib/kylin-extendcolumn/src/main/java/io/druid/query/aggregation/kylin.extendcolumn/ExtendColumnAggregatorFactory.java +new file mode 100644 +index 000000000..a732c31de +--- /dev/null ++++ b/extensions-contrib/kylin-extendcolumn/src/main/java/io/druid/query/aggregation/kylin.extendcolumn/ExtendColumnAggregatorFactory.java +@@ -0,0 +1,227 @@ ++/* ++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. Metamarkets licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ */ ++ ++package io.druid.query.aggregation.kylin.extendcolumn; ++ ++import com.fasterxml.jackson.annotation.JsonCreator; ++import com.fasterxml.jackson.annotation.JsonProperty; ++import com.google.common.base.Preconditions; ++ ++import io.druid.java.util.common.IAE; ++import io.druid.java.util.common.StringUtils; ++import io.druid.query.aggregation.Aggregator; ++import io.druid.query.aggregation.AggregatorFactory; ++import io.druid.query.aggregation.AggregatorUtil; ++import io.druid.query.aggregation.BufferAggregator; ++import io.druid.segment.ColumnSelectorFactory; ++import io.druid.segment.ObjectColumnSelector; ++ ++import java.nio.ByteBuffer; ++import java.util.Arrays; ++import java.util.Comparator; ++import java.util.List; ++ ++public class ExtendColumnAggregatorFactory extends AggregatorFactory ++{ ++ private static final byte CACHE_TYPE_ID = 26; ++ ++ private final String name; ++ private final String fieldName; ++ private final Integer precision; ++ ++ public static final String EXTEND_COLUMN = "kylin-extendcolumn"; ++ ++ @JsonCreator ++ public ExtendColumnAggregatorFactory( ++ @JsonProperty("name") String name, ++ @JsonProperty("fieldName") String fieldName, ++ @JsonProperty("precision") Integer precision ++ ) ++ { ++ Preconditions.checkNotNull(name); ++ Preconditions.checkNotNull(fieldName); ++ this.name = name; ++ this.fieldName = fieldName; ++ this.precision = precision; ++ } ++ ++ @Override ++ public Aggregator factorize(ColumnSelectorFactory columnFactory) ++ { ++ ObjectColumnSelector selector = columnFactory.makeObjectColumnSelector(fieldName); ++ if (selector == null) { ++ throw new IAE("selector in ExtendColumnAggregatorFactory should not be Null"); ++ } else { ++ return new ExtendColumnAggregator(selector); ++ } ++ } ++ ++ @Override ++ public BufferAggregator factorizeBuffered(ColumnSelectorFactory columnFactory) ++ { ++ ObjectColumnSelector selector = columnFactory.makeObjectColumnSelector(fieldName); ++ ++ final Class classOfObject = selector.classOfObject(); ++ if (!classOfObject.equals(Object.class) && !String.class.isAssignableFrom(classOfObject)) { ++ throw new IAE("Incompatible type for metric[%s], expected a ExtendByteArray, got a %s", fieldName, classOfObject); ++ } ++ ++ return new ExtendColumnBufferAggregator(selector); ++ } ++ ++ @Override ++ public Comparator getComparator() ++ { ++ return new Comparator() ++ { ++ @Override ++ public int compare(Object o1, Object o2) ++ { ++ String left = (String) o1; ++ ++ if (left == null || left.length() == 0) { ++ return -1; ++ } ++ ++ return 1; ++ } ++ }; ++ } ++ ++ @Override ++ public Object combine(Object lhs, Object rhs) ++ { ++ String left = (String) lhs; ++ ++ if (left == null || left.length() == 0) { ++ return rhs; ++ } ++ ++ return left; ++ } ++ ++ @Override ++ public AggregatorFactory getCombiningFactory() ++ { ++ return new ExtendColumnAggregatorFactory(name, name, precision); ++ } ++ ++ @Override ++ public List<AggregatorFactory> getRequiredColumns() ++ { ++ return Arrays.<AggregatorFactory>asList(new ExtendColumnAggregatorFactory(fieldName, fieldName, precision)); ++ } ++ ++ @Override ++ public Object deserialize(Object object) ++ { ++ return object; ++ } ++ ++ @Override ++ public Object finalizeComputation(Object object) ++ { ++ return object; ++ } ++ ++ @JsonProperty ++ public String getFieldName() ++ { ++ return fieldName; ++ } ++ ++ @Override ++ @JsonProperty ++ public String getName() ++ { ++ return name; ++ } ++ ++ @JsonProperty ++ public Integer getPrecision() ++ { ++ return precision; ++ } ++ ++ @Override ++ public List<String> requiredFields() ++ { ++ return Arrays.asList(fieldName); ++ } ++ ++ @Override ++ public byte[] getCacheKey() ++ { ++ byte[] fieldNameBytes = StringUtils.toUtf8(fieldName); ++ return ByteBuffer.allocate(2 + fieldNameBytes.length) ++ .put(CACHE_TYPE_ID) ++ .put(fieldNameBytes) ++ .put(AggregatorUtil.STRING_SEPARATOR) ++ .array(); ++ } ++ ++ @Override ++ public String getTypeName() ++ { ++ return EXTEND_COLUMN; ++ } ++ ++ @Override ++ public int getMaxIntermediateSize() ++ { ++ return precision + 2; ++ } ++ ++ @Override ++ public boolean equals(Object o) ++ { ++ if (this == o) { ++ return true; ++ } ++ if (o == null || getClass() != o.getClass()) { ++ return false; ++ } ++ ++ ExtendColumnAggregatorFactory that = (ExtendColumnAggregatorFactory) o; ++ ++ if (!fieldName.equals(that.fieldName)) { ++ return false; ++ } ++ ++ if (!name.equals(that.name)) { ++ return false; ++ } ++ ++ return true; ++ } ++ ++ @Override ++ public int hashCode() ++ { ++ int result = name.hashCode(); ++ result = 31 * result + fieldName.hashCode(); ++ return result; ++ } ++ ++ @Override ++ public String toString() ++ { ++ return "ExtendColumnAggregatorFactory {" + "name='" + name + '\'' + ", fieldName='" + fieldName + '\'' + '}'; ++ } ++} +diff --git a/extensions-contrib/kylin-extendcolumn/src/main/java/io/druid/query/aggregation/kylin.extendcolumn/ExtendColumnBufferAggregator.java b/extensions-contrib/kylin-extendcolumn/src/main/java/io/druid/query/aggregation/kylin.extendcolumn/ExtendColumnBufferAggregator.java +new file mode 100644 +index 000000000..55a57bc34 +--- /dev/null ++++ b/extensions-contrib/kylin-extendcolumn/src/main/java/io/druid/query/aggregation/kylin.extendcolumn/ExtendColumnBufferAggregator.java +@@ -0,0 +1,117 @@ ++/* ++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. Metamarkets licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ */ ++ ++package io.druid.query.aggregation.kylin.extendcolumn; ++ ++import io.druid.query.aggregation.BufferAggregator; ++import io.druid.segment.ObjectColumnSelector; ++ ++import java.nio.ByteBuffer; ++import java.nio.charset.Charset; ++ ++public class ExtendColumnBufferAggregator implements BufferAggregator ++{ ++ private final ObjectColumnSelector selector; ++ private static final Charset UTF8_CHARSET = Charset.forName("UTF-8"); ++ ++ public ExtendColumnBufferAggregator(ObjectColumnSelector selector) ++ { ++ this.selector = selector; ++ } ++ ++ @Override ++ public void init(ByteBuffer buf, int position) ++ { ++ int oldPosition = buf.position(); ++ ++ buf.position(position); ++ buf.putShort((short) 0); ++ ++ buf.position(oldPosition); ++ } ++ ++ @Override ++ public void aggregate(ByteBuffer buf, int position) ++ { ++ String tmp = (String) selector.getObject(); ++ ++ if (tmp == null || tmp.length() == 0) { ++ return; ++ } ++ ++ int oldPosition = buf.position(); ++ ++ byte[] bytes = tmp.getBytes(UTF8_CHARSET); ++ short size = (short) bytes.length; ++ ++ buf.position(position); ++ buf.putShort(size); ++ buf.put(bytes); ++ ++ buf.position(oldPosition); ++ } ++ ++ @Override ++ public Object get(ByteBuffer buf, int position) ++ { ++ int oldPosition = buf.position(); ++ int oldLimit = buf.limit(); ++ ++ buf.position(position); ++ short size = buf.getShort(); ++ if (size == 0) { ++ buf.position(oldPosition); ++ return null; ++ } ++ ++ buf.limit(position + size + 2); ++ ++ byte[] bytes = new byte[buf.remaining()]; ++ buf.get(bytes, 0, bytes.length); ++ String result = new String(bytes, 0, bytes.length, UTF8_CHARSET); ++ ++ buf.limit(oldLimit); ++ buf.position(oldPosition); ++ ++ return result; ++ } ++ ++ @Override ++ public float getFloat(ByteBuffer buf, int position) ++ { ++ throw new UnsupportedOperationException("ExtendColumnBufferAggregator does not support getFloat()"); ++ } ++ ++ @Override ++ public long getLong(ByteBuffer buf, int position) ++ { ++ throw new UnsupportedOperationException("ExtendColumnBufferAggregator does not support getLong()"); ++ } ++ ++ @Override ++ public double getDouble(ByteBuffer buf, int position) ++ { ++ throw new UnsupportedOperationException("ExtendColumnBufferAggregator does not support getDouble()"); ++ } ++ ++ @Override ++ public void close() ++ { ++ } ++} +diff --git a/extensions-contrib/kylin-extendcolumn/src/main/java/io/druid/query/aggregation/kylin.extendcolumn/ExtendColumnDruidModule.java b/extensions-contrib/kylin-extendcolumn/src/main/java/io/druid/query/aggregation/kylin.extendcolumn/ExtendColumnDruidModule.java +new file mode 100644 +index 000000000..1c383d97a +--- /dev/null ++++ b/extensions-contrib/kylin-extendcolumn/src/main/java/io/druid/query/aggregation/kylin.extendcolumn/ExtendColumnDruidModule.java +@@ -0,0 +1,51 @@ ++/* ++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. Metamarkets licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ */ ++ ++package io.druid.query.aggregation.kylin.extendcolumn; ++ ++import com.fasterxml.jackson.databind.Module; ++import com.fasterxml.jackson.databind.jsontype.NamedType; ++import com.fasterxml.jackson.databind.module.SimpleModule; ++import com.google.common.collect.ImmutableList; ++import com.google.inject.Binder; ++import io.druid.initialization.DruidModule; ++import io.druid.segment.serde.ComplexMetrics; ++ ++import java.util.List; ++ ++public class ExtendColumnDruidModule implements DruidModule ++{ ++ @Override ++ public List<? extends Module> getJacksonModules() ++ { ++ return ImmutableList.of( ++ new SimpleModule("KylinExtendColumnModule").registerSubtypes( ++ new NamedType(ExtendColumnAggregatorFactory.class, ExtendColumnAggregatorFactory.EXTEND_COLUMN) ++ ) ++ ); ++ } ++ ++ @Override ++ public void configure(Binder binder) ++ { ++ if (ComplexMetrics.getSerdeForType(ExtendColumnAggregatorFactory.EXTEND_COLUMN) == null) { ++ ComplexMetrics.registerSerde(ExtendColumnAggregatorFactory.EXTEND_COLUMN, new ExtendColumnSerde()); ++ } ++ } ++} +diff --git a/extensions-contrib/kylin-extendcolumn/src/main/java/io/druid/query/aggregation/kylin.extendcolumn/ExtendColumnSerde.java b/extensions-contrib/kylin-extendcolumn/src/main/java/io/druid/query/aggregation/kylin.extendcolumn/ExtendColumnSerde.java +new file mode 100644 +index 000000000..9969d232b +--- /dev/null ++++ b/extensions-contrib/kylin-extendcolumn/src/main/java/io/druid/query/aggregation/kylin.extendcolumn/ExtendColumnSerde.java +@@ -0,0 +1,139 @@ ++/* ++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. Metamarkets licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ */ ++ ++package io.druid.query.aggregation.kylin.extendcolumn; ++ ++import io.druid.data.input.InputRow; ++import io.druid.java.util.common.IAE; ++import io.druid.segment.GenericColumnSerializer; ++import io.druid.segment.column.ColumnBuilder; ++import io.druid.segment.data.GenericIndexed; ++import io.druid.segment.data.IOPeon; ++import io.druid.segment.data.ObjectStrategy; ++import io.druid.segment.serde.ComplexColumnPartSupplier; ++import io.druid.segment.serde.ComplexMetricExtractor; ++import io.druid.segment.serde.ComplexMetricSerde; ++import io.druid.segment.serde.LargeColumnSupportedComplexColumnSerializer; ++ ++import java.nio.ByteBuffer; ++import java.nio.charset.Charset; ++ ++public class ExtendColumnSerde extends ComplexMetricSerde ++{ ++ ++ private static final Charset UTF8_CHARSET = Charset.forName("UTF-8"); ++ ++ public ExtendColumnSerde() ++ { ++ } ++ ++ @Override ++ public String getTypeName() ++ { ++ return ExtendColumnAggregatorFactory.EXTEND_COLUMN; ++ } ++ ++ @Override ++ public ComplexMetricExtractor getExtractor() ++ { ++ return new ComplexMetricExtractor() ++ { ++ @Override ++ public Class<String> extractedClass() ++ { ++ return String.class; ++ } ++ ++ @Override ++ public String extractValue(InputRow inputRow, String metricName) ++ { ++ Object rawValue = inputRow.getRaw(metricName); ++ ++ if (String.class.isAssignableFrom(rawValue.getClass())) { ++ return (String) rawValue; ++ } else { ++ throw new IAE("The class must be ExtendByteArray"); ++ } ++ } ++ }; ++ } ++ ++ @Override ++ public void deserializeColumn(ByteBuffer byteBuffer, ColumnBuilder columnBuilder) ++ { ++ final GenericIndexed column = GenericIndexed.read(byteBuffer, getObjectStrategy()); ++ columnBuilder.setComplexColumn(new ComplexColumnPartSupplier(getTypeName(), column)); ++ } ++ ++ @Override ++ public ObjectStrategy getObjectStrategy() ++ { ++ return new ObjectStrategy() ++ { ++ @Override ++ public Class getClazz() ++ { ++ return String.class; ++ } ++ ++ @Override ++ public Object fromByteBuffer(ByteBuffer buffer, int numBytes) ++ { ++ final ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer(); ++ readOnlyBuffer.limit(readOnlyBuffer.position() + numBytes); ++ ++ byte[] bytes = new byte[readOnlyBuffer.remaining()]; ++ readOnlyBuffer.get(bytes, 0, bytes.length); ++ return new String(bytes, 0, bytes.length, UTF8_CHARSET); ++ } ++ ++ @Override ++ public byte[] toBytes(Object val) ++ { ++ if (val == null) { ++ return new byte[]{}; ++ } ++ ++ if (val instanceof String) { ++ return ((String) val).getBytes(UTF8_CHARSET); ++ } else { ++ throw new IAE("Unknown class[%s], toString[%s]", val.getClass(), val); ++ } ++ } ++ ++ @Override ++ public int compare(Object o1, Object o2) ++ { ++ String left = (String) o1; ++ ++ if (left == null || left.length() == 0) { ++ return -1; ++ } ++ ++ return 1; ++ } ++ }; ++ } ++ ++ @Override ++ public GenericColumnSerializer getSerializer(IOPeon peon, String column) ++ { ++ return LargeColumnSupportedComplexColumnSerializer.create(peon, column, this.getObjectStrategy()); ++ } ++} +diff --git a/extensions-contrib/kylin-extendcolumn/src/main/resources/META-INF/services/io.druid.initialization.DruidModule b/extensions-contrib/kylin-extendcolumn/src/main/resources/META-INF/services/io.druid.initialization.DruidModule +new file mode 100644 +index 000000000..18982d1ae +--- /dev/null ++++ b/extensions-contrib/kylin-extendcolumn/src/main/resources/META-INF/services/io.druid.initialization.DruidModule +@@ -0,0 +1 @@ ++io.druid.query.aggregation.kylin.extendcolumn.ExtendColumnDruidModule +diff --git a/extensions-contrib/kylin-extendcolumn/src/test/java/io.druid.query.aggregation.kylin.extendcolumn/ExtendColumnGroupByQueryTest.java b/extensions-contrib/kylin-extendcolumn/src/test/java/io.druid.query.aggregation.kylin.extendcolumn/ExtendColumnGroupByQueryTest.java +new file mode 100644 +index 000000000..2267d07d3 +--- /dev/null ++++ b/extensions-contrib/kylin-extendcolumn/src/test/java/io.druid.query.aggregation.kylin.extendcolumn/ExtendColumnGroupByQueryTest.java +@@ -0,0 +1,229 @@ ++/* ++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. Metamarkets licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ */ ++ ++package io.druid.query.aggregation.kylin.extendcolumn; ++ ++import com.fasterxml.jackson.databind.Module; ++import com.fasterxml.jackson.databind.ObjectMapper; ++import com.google.common.collect.ImmutableMap; ++import com.google.common.collect.Lists; ++import io.druid.data.input.MapBasedInputRow; ++import io.druid.data.input.Row; ++import io.druid.jackson.DefaultObjectMapper; ++import io.druid.java.util.common.Intervals; ++import io.druid.query.QueryRunnerTestHelper; ++import io.druid.query.aggregation.AggregatorFactory; ++import io.druid.query.dimension.DefaultDimensionSpec; ++import io.druid.query.dimension.DimensionSpec; ++import io.druid.query.groupby.GroupByQuery; ++import io.druid.query.groupby.GroupByQueryConfig; ++import io.druid.query.groupby.GroupByQueryRunnerFactory; ++import io.druid.query.groupby.GroupByQueryRunnerTest; ++import io.druid.query.groupby.GroupByQueryRunnerTestHelper; ++import io.druid.query.groupby.orderby.DefaultLimitSpec; ++import io.druid.query.groupby.orderby.OrderByColumnSpec; ++import io.druid.segment.IndexIO; ++import io.druid.segment.IndexMerger; ++import io.druid.segment.IndexMergerTestBase; ++import io.druid.segment.IndexMergerV9; ++import io.druid.segment.IndexSpec; ++import io.druid.segment.QueryableIndex; ++import io.druid.segment.QueryableIndexSegment; ++import io.druid.segment.TestHelper; ++import io.druid.segment.column.ColumnConfig; ++import io.druid.segment.data.CompressedObjectStrategy; ++import io.druid.segment.data.CompressionFactory; ++import io.druid.segment.data.ConciseBitmapSerdeFactory; ++import io.druid.segment.incremental.IncrementalIndex; ++import io.druid.segment.incremental.IncrementalIndexSchema; ++import io.druid.segment.serde.ComplexMetrics; ++import org.apache.commons.io.FileUtils; ++import org.junit.Test; ++ ++import java.io.File; ++import java.nio.charset.Charset; ++import java.util.Arrays; ++import java.util.List; ++ ++public class ExtendColumnGroupByQueryTest ++{ ++ ++ private static final IndexSpec INDEX_SPEC = IndexMergerTestBase.makeIndexSpec( ++ new ConciseBitmapSerdeFactory(), ++ CompressedObjectStrategy.CompressionStrategy.LZ4, ++ CompressedObjectStrategy.CompressionStrategy.LZ4, ++ CompressionFactory.LongEncodingStrategy.LONGS ++ ); ++ private static IndexMerger INDEX_MERGER; ++ ++ private static ObjectMapper JSON_MAPPER = new DefaultObjectMapper(); ++ ++ private static IndexIO INDEX_IO; ++ ++ static { ++ if (ComplexMetrics.getSerdeForType(ExtendColumnAggregatorFactory.EXTEND_COLUMN) == null) { ++ ComplexMetrics.registerSerde(ExtendColumnAggregatorFactory.EXTEND_COLUMN, new ExtendColumnSerde()); ++ } ++ ++ for (Module mod : new ExtendColumnDruidModule().getJacksonModules()) { ++ JSON_MAPPER.registerModule(mod); ++ } ++ ++ INDEX_IO = new IndexIO(JSON_MAPPER, new ColumnConfig() ++ { ++ @Override ++ public int columnCacheSizeBytes() ++ { ++ return 0; ++ } ++ }); ++ ++ INDEX_MERGER = new IndexMergerV9(JSON_MAPPER, INDEX_IO); ++ } ++ ++ @Test ++ public void testGroupByWithDistinctCountAgg() throws Exception ++ { ++ final GroupByQueryConfig config = new GroupByQueryConfig(); ++ config.setMaxIntermediateRows(100000000); ++ final GroupByQueryRunnerFactory factory = GroupByQueryRunnerTest.makeQueryRunnerFactory(config); ++ ++ String visitor_id = "visitor_id"; ++ String client_type = "client_type"; ++ String kylin_extendcolumn = "kylin"; ++ long timestamp = System.currentTimeMillis(); ++ ++ IncrementalIndex index = new IncrementalIndex.Builder() ++ .setIndexSchema( ++ new IncrementalIndexSchema.Builder() ++ .withMetrics(new AggregatorFactory[]{ ++ new ExtendColumnAggregatorFactory( ++ "kylin", ++ "kylin", ++ 100 ++ ) ++ }) ++ .withRollup(false) ++ .build() ++ ) ++ .setReportParseExceptions(false) ++ .setConcurrentEventAdd(true) ++ .setMaxRowCount(1000000) ++ .buildOnheap(); ++ ++ final Charset UTF8_CHARSET = Charset.forName("UTF-8"); ++ ++ byte[] bytes1 = new byte[]{}; ++ ++ String array1 = new String(bytes1, 0, bytes1.length, UTF8_CHARSET); ++ ++ byte[] bytes2 = new String("中国").getBytes(UTF8_CHARSET); ++ String array2 = new String(bytes2, UTF8_CHARSET); ++ String array3 = new String("342"); ++ ++ index.add(new MapBasedInputRow( ++ timestamp, ++ Lists.newArrayList(visitor_id, client_type), ++ ImmutableMap.<String, Object>of( ++ visitor_id, ++ "0", ++ client_type, ++ "iphone", ++ kylin_extendcolumn, ++ array1 ++ ) ++ )); ++ index.add(new MapBasedInputRow( ++ timestamp, ++ Lists.newArrayList(visitor_id, client_type), ++ ImmutableMap.<String, Object>of( ++ visitor_id, ++ "0", ++ client_type, ++ "iphone", ++ kylin_extendcolumn, ++ array2 ++ ) ++ )); ++ index.add(new MapBasedInputRow( ++ timestamp, ++ Lists.newArrayList(visitor_id, client_type), ++ ImmutableMap.<String, Object>of( ++ visitor_id, ++ "0", ++ client_type, ++ "android", ++ kylin_extendcolumn, ++ array3 ++ ) ++ )); ++ ++ final File finalFile = new File("tmp"); ++ INDEX_MERGER.persist( ++ index, ++ Intervals.of("1970-01-01T00:00:00.000Z/2020-01-01T00:00:00.000Z"), ++ finalFile, ++ INDEX_SPEC ++ ); ++ ++ QueryableIndex queryableIndex = INDEX_IO.loadIndex(finalFile); ++ QueryableIndexSegment segment = new QueryableIndexSegment("index", queryableIndex); ++ ++ GroupByQuery query = new GroupByQuery.Builder().setDataSource(QueryRunnerTestHelper.dataSource) ++ .setGranularity(QueryRunnerTestHelper.allGran) ++ .setDimensions(Arrays.<DimensionSpec>asList(new DefaultDimensionSpec( ++ client_type, ++ client_type ++ ))) ++ .setInterval(QueryRunnerTestHelper.fullOnInterval) ++ .setLimitSpec( ++ new DefaultLimitSpec( ++ Lists.newArrayList( ++ new OrderByColumnSpec( ++ client_type, ++ OrderByColumnSpec.Direction.DESCENDING ++ ) ++ ), 10)) ++ .setAggregatorSpecs(Lists.newArrayList(new ExtendColumnAggregatorFactory( ++ "kylin", ++ "kylin", ++ 100 ++ ))) ++ .build(); ++ ++ Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, factory.createRunner(segment), query); ++ ++ List<Row> expectedResults = Arrays.asList( ++ GroupByQueryRunnerTestHelper.createExpectedRow( ++ "1970-01-01T00:00:00.000Z", ++ client_type, "iphone", ++ "kylin", array2 ++ ), ++ GroupByQueryRunnerTestHelper.createExpectedRow( ++ "1970-01-01T00:00:00.000Z", ++ client_type, "android", ++ "kylin", array3 ++ ) ++ ); ++ ++ TestHelper.assertExpectedObjects(expectedResults, results, "extend-column"); ++ ++ FileUtils.deleteDirectory(finalFile); ++ } ++} +diff --git a/pom.xml b/pom.xml +index bc5e967b3..59cfeb6d1 100644 +--- a/pom.xml ++++ b/pom.xml +@@ -130,6 +130,9 @@ + <module>extensions-contrib/rabbitmq</module> + <module>extensions-contrib/distinctcount</module> + <module>extensions-contrib/parquet-extensions</module> ++ <module>extensions-contrib/kylin-distinccount</module> ++ <module>extensions-contrib/kylin-extendcolumn</module> ++ <module>extensions-contrib/decimal</module> + <module>extensions-contrib/statsd-emitter</module> + <module>extensions-contrib/orc-extensions</module> + <module>extensions-contrib/time-min-max</module> +diff --git a/processing/src/main/java/io/druid/jackson/DruidDefaultSerializersModule.java b/processing/src/main/java/io/druid/jackson/DruidDefaultSerializersModule.java +index 632777f8d..5d81f3964 100644 +--- a/processing/src/main/java/io/druid/jackson/DruidDefaultSerializersModule.java ++++ b/processing/src/main/java/io/druid/jackson/DruidDefaultSerializersModule.java +@@ -35,6 +35,7 @@ import io.druid.java.util.common.guava.Yielder; + import org.joda.time.DateTimeZone; + + import java.io.IOException; ++import java.math.BigDecimal; + import java.nio.ByteOrder; + import java.util.TimeZone; + +@@ -48,6 +49,23 @@ public class DruidDefaultSerializersModule extends SimpleModule + + JodaStuff.register(this); + ++ addSerializer( ++ BigDecimal.class, ++ new JsonSerializer<BigDecimal>() ++ { ++ @Override ++ public void serialize( ++ BigDecimal bigDecimal, ++ JsonGenerator jsonGenerator, ++ SerializerProvider serializerProvider ++ ) ++ throws IOException, JsonProcessingException ++ { ++ jsonGenerator.writeString(bigDecimal.toString()); ++ } ++ } ++ ); ++ + addDeserializer( + DateTimeZone.class, + new JsonDeserializer<DateTimeZone>() +diff --git a/processing/src/test/java/io/druid/segment/IndexMergerTestBase.java b/processing/src/test/java/io/druid/segment/IndexMergerTestBase.java +index f5196d062..9a96bd060 100644 +--- a/processing/src/test/java/io/druid/segment/IndexMergerTestBase.java ++++ b/processing/src/test/java/io/druid/segment/IndexMergerTestBase.java +@@ -109,7 +109,7 @@ public class IndexMergerTestBase + ); + } + +- static IndexSpec makeIndexSpec( ++ public static IndexSpec makeIndexSpec( + BitmapSerdeFactory bitmapSerdeFactory, + CompressedObjectStrategy.CompressionStrategy compressionStrategy, + CompressedObjectStrategy.CompressionStrategy dimCompressionStrategy, +diff --git a/processing/src/test/java/io/druid/segment/TestHelper.java b/processing/src/test/java/io/druid/segment/TestHelper.java +index 5ca389305..6ca65ebc7 100644 +--- a/processing/src/test/java/io/druid/segment/TestHelper.java ++++ b/processing/src/test/java/io/druid/segment/TestHelper.java +@@ -22,6 +22,7 @@ package io.druid.segment; + import com.fasterxml.jackson.databind.InjectableValues; + import com.fasterxml.jackson.databind.ObjectMapper; + import com.google.common.collect.Lists; ++import io.druid.collections.bitmap.ImmutableBitmap; + import io.druid.data.input.MapBasedRow; + import io.druid.data.input.Row; + import io.druid.jackson.DefaultObjectMapper; +@@ -318,7 +319,13 @@ public class TestHelper + final Object expectedValue = expectedMap.get(key); + final Object actualValue = actualMap.get(key); + +- if (expectedValue instanceof Float || expectedValue instanceof Double) { ++ if (expectedValue instanceof ImmutableBitmap || actualValue instanceof ImmutableBitmap) { ++ Assert.assertEquals( ++ StringUtils.format("%s: key[%s]", msg, key), ++ ((ImmutableBitmap) expectedValue).size(), ++ ((ImmutableBitmap) actualValue).size() ++ ); ++ } else if (expectedValue instanceof Float || expectedValue instanceof Double) { + Assert.assertEquals( + StringUtils.format("%s: key[%s]", msg, key), + ((Number) expectedValue).doubleValue(), +-- +2.17.2 (Apple Git-113) +