This is an automated email from the ASF dual-hosted git repository.

kangkaisen pushed a commit to branch kylin-on-druid
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin-on-druid by this push:
     new 4b6668a  Add patch for precise-distinct-count,extend-column,decima 
measure
4b6668a is described below

commit 4b6668af91c14a318b3e06a7ac8deef91a1b326b
Author: kangkaisen <kangkai...@meituan.com>
AuthorDate: Thu Dec 27 20:43:51 2018 +0800

    Add patch for precise-distinct-count,extend-column,decima measure
---
 ...tinct-count-extend-column-decimal-measure.patch | 3960 ++++++++++++++++++++
 1 file changed, 3960 insertions(+)

diff --git a/kod-precise-distinct-count-extend-column-decimal-measure.patch 
b/kod-precise-distinct-count-extend-column-decimal-measure.patch
new file mode 100644
index 0000000..b7a0539
--- /dev/null
+++ b/kod-precise-distinct-count-extend-column-decimal-measure.patch
@@ -0,0 +1,3960 @@
+From 4e7daa032d5e6867666b65843e2804ffe1de2c67 Mon Sep 17 00:00:00 2001
+From: kangkaisen <kangkai...@meituan.com>
+Date: Wed, 15 Nov 2017 19:30:54 +0800
+Subject: [PATCH] Druid support kylin precise distinct count measure Druid
+ support kylin extend-column measure Druid support decimal measure
+
+---
+ .../bitmap/WrappedImmutableRoaringBitmap.java |   2 +
+ distribution/pom.xml                          |  12 +
+ extensions-contrib/decimal/pom.xml            |  59 +++++
+ .../decimal/DecimalAggregatorFactory.java     | 114 ++++++++
+ .../decimal/DecimalBufferAggregator.java      |  99 +++++++
+ .../decimal/DecimalDruidModule.java           |  71 +++++
+ .../decimal/DecimalMaxAggregator.java         |  82 ++++++
+ .../decimal/DecimalMaxAggregatorFactory.java  | 160 +++++++++++
+ .../decimal/DecimalMaxBufferAggregator.java   |  56 ++++
+ .../aggregation/decimal/DecimalMaxSerde.java  |  29 ++
+ .../decimal/DecimalMinAggregator.java         |  83 ++++++
+ .../decimal/DecimalMinAggregatorFactory.java  | 149 +++++++++++
+ .../decimal/DecimalMinBufferAggregator.java   |  56 ++++
+ .../aggregation/decimal/DecimalMinSerde.java  |  29 ++
+ .../aggregation/decimal/DecimalSerde.java     | 135 ++++++++++
+ .../decimal/DecimalSumAggregator.java         |  80 ++++++
+ .../decimal/DecimalSumAggregatorFactory.java  | 143 ++++++++++
+ .../decimal/DecimalSumBufferAggregator.java   |  54 ++++
+ .../aggregation/decimal/DecimalSumSerde.java  |  29 ++
+ .../io.druid.initialization.DruidModule       |   1 +
+ .../DecimalGroupByQueryTest.java              | 250 ++++++++++++++++++
+ extensions-contrib/kylin-distinccount/pom.xml |  59 +++++
+ .../DistinctCountAggregator.java              |  73 +++++
+ .../DistinctCountAggregatorFactory.java       | 223 ++++++++++++++++
+ .../DistinctCountBufferAggregator.java        | 120 +++++++++
+ .../DistinctCountDruidModule.java             |  53 ++++
+ .../DistinctCountSerde.java                   |  95 +++++++
+ .../io.druid.initialization.DruidModule       |   1 +
+ .../DistinctCountBufferAggregatorTest.java    |  71 +++++
+ .../DistinctCountGroupByQueryTest.java        | 234 ++++++++++++++++
+ extensions-contrib/kylin-extendcolumn/pom.xml |  59 +++++
+ .../ExtendColumnAggregator.java               |  83 ++++++
+ .../ExtendColumnAggregatorFactory.java        | 227 ++++++++++++++++
+ .../ExtendColumnBufferAggregator.java         | 117 ++++++++
+ .../ExtendColumnDruidModule.java              |  51 ++++
+ .../kylin.extendcolumn/ExtendColumnSerde.java | 139 ++++++++++
+ .../io.druid.initialization.DruidModule       |   1 +
+ .../ExtendColumnGroupByQueryTest.java         | 229 ++++++++++++++++
+ pom.xml                                       |   3 +
+ .../DruidDefaultSerializersModule.java        |  18 ++
+ .../io/druid/segment/IndexMergerTestBase.java |   2 +-
+ .../java/io/druid/segment/TestHelper.java     |   9 +-
+ 42 files changed, 3558 insertions(+), 2 deletions(-)
+ create mode 100644 extensions-contrib/decimal/pom.xml
+ create mode 100644 
extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalAggregatorFactory.java
+ create mode 100644 
extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalBufferAggregator.java
+ create mode 100644 
extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalDruidModule.java
+ create mode 100644 
extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMaxAggregator.java
+ create mode 100644 
extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMaxAggregatorFactory.java
+ create mode 100644 
extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMaxBufferAggregator.java
+ create mode 100644 
extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMaxSerde.java
+ create mode 100644 
extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMinAggregator.java
+ create mode 100644 
extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMinAggregatorFactory.java
+ create mode 100644 
extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMinBufferAggregator.java
+ create mode 100644 
extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMinSerde.java
+ create mode 100644 
extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalSerde.java
+ create mode 100644 
extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalSumAggregator.java
+ create mode 100644 
extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalSumAggregatorFactory.java
+ create mode 100644 
extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalSumBufferAggregator.java
+ create mode 100644 
extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalSumSerde.java
+ create mode 100644 
extensions-contrib/decimal/src/main/resources/META-INF/services/io.druid.initialization.DruidModule
+ create mode 100644 
extensions-contrib/decimal/src/test/java/io.druid.query.aggregation.decimal/DecimalGroupByQueryTest.java
+ create mode 100644 extensions-contrib/kylin-distinccount/pom.xml
+ create mode 100644 
extensions-contrib/kylin-distinccount/src/main/java/io/druid/query/aggregation/kylin.distinctcount/DistinctCountAggregator.java
+ create mode 100644 
extensions-contrib/kylin-distinccount/src/main/java/io/druid/query/aggregation/kylin.distinctcount/DistinctCountAggregatorFactory.java
+ create mode 100644 
extensions-contrib/kylin-distinccount/src/main/java/io/druid/query/aggregation/kylin.distinctcount/DistinctCountBufferAggregator.java
+ create mode 100644 
extensions-contrib/kylin-distinccount/src/main/java/io/druid/query/aggregation/kylin.distinctcount/DistinctCountDruidModule.java
+ create mode 100644 
extensions-contrib/kylin-distinccount/src/main/java/io/druid/query/aggregation/kylin.distinctcount/DistinctCountSerde.java
+ create mode 100644 
extensions-contrib/kylin-distinccount/src/main/resources/META-INF/services/io.druid.initialization.DruidModule
+ create mode 100644 
extensions-contrib/kylin-distinccount/src/test/java/io.druid.query.aggregation.kylin.distinctcount/DistinctCountBufferAggregatorTest.java
+ create mode 100644 
extensions-contrib/kylin-distinccount/src/test/java/io.druid.query.aggregation.kylin.distinctcount/DistinctCountGroupByQueryTest.java
+ create mode 100644 extensions-contrib/kylin-extendcolumn/pom.xml
+ create mode 100644 
extensions-contrib/kylin-extendcolumn/src/main/java/io/druid/query/aggregation/kylin.extendcolumn/ExtendColumnAggregator.java
+ create mode 100644 
extensions-contrib/kylin-extendcolumn/src/main/java/io/druid/query/aggregation/kylin.extendcolumn/ExtendColumnAggregatorFactory.java
+ create mode 100644 
extensions-contrib/kylin-extendcolumn/src/main/java/io/druid/query/aggregation/kylin.extendcolumn/ExtendColumnBufferAggregator.java
+ create mode 100644 
extensions-contrib/kylin-extendcolumn/src/main/java/io/druid/query/aggregation/kylin.extendcolumn/ExtendColumnDruidModule.java
+ create mode 100644 
extensions-contrib/kylin-extendcolumn/src/main/java/io/druid/query/aggregation/kylin.extendcolumn/ExtendColumnSerde.java
+ create mode 100644 
extensions-contrib/kylin-extendcolumn/src/main/resources/META-INF/services/io.druid.initialization.DruidModule
+ create mode 100644 
extensions-contrib/kylin-extendcolumn/src/test/java/io.druid.query.aggregation.kylin.extendcolumn/ExtendColumnGroupByQueryTest.java
+
+diff --git 
a/bytebuffer-collections/src/main/java/io/druid/collections/bitmap/WrappedImmutableRoaringBitmap.java
 
b/bytebuffer-collections/src/main/java/io/druid/collections/bitmap/WrappedImmutableRoaringBitmap.java
+index 26454d07c..853e3b581 100755
+--- 
a/bytebuffer-collections/src/main/java/io/druid/collections/bitmap/WrappedImmutableRoaringBitmap.java
++++ 
b/bytebuffer-collections/src/main/java/io/druid/collections/bitmap/WrappedImmutableRoaringBitmap.java
+@@ -19,6 +19,7 @@
+ 
+ package io.druid.collections.bitmap;
+ 
++import com.fasterxml.jackson.annotation.JsonValue;
+ import com.google.common.base.Throwables;
+ import org.roaringbitmap.IntIterator;
+ import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+@@ -55,6 +56,7 @@ public class WrappedImmutableRoaringBitmap implements 
ImmutableBitmap
+   }
+ 
+   @Override
++  @JsonValue
+   public byte[] toBytes()
+   {
+     try {
+diff --git a/distribution/pom.xml b/distribution/pom.xml
+index 751d1ded4..44192a66a 100644
+--- a/distribution/pom.xml
++++ b/distribution/pom.xml
+@@ -111,6 +111,12 @@
+                                 
<argument>io.druid.extensions:druid-examples</argument>
+                                 <argument>-c</argument>
+                                 
<argument>io.druid.extensions:simple-client-sslcontext</argument>
++                                <argument>-c</argument>
++                                
<argument>io.druid.extensions.contrib:druid-kylin-distinctcount</argument>
++                                <argument>-c</argument>
++                                
<argument>io.druid.extensions.contrib:druid-kylin-extendcolumn</argument>
++                                <argument>-c</argument>
++                                
<argument>io.druid.extensions.contrib:druid-decimal</argument>
+                                 
<argument>${druid.distribution.pulldeps.opts}</argument>
+                             </arguments>
+                         </configuration>
+@@ -215,6 +221,12 @@
+                                         <argument>-c</argument>
+                                         
<argument>io.druid.extensions.contrib:druid-distinctcount</argument>
+                                         <argument>-c</argument>
++                                        
<argument>io.druid.extensions.contrib:druid-kylin-distinctcount</argument>
++                                        <argument>-c</argument>
++                                        
<argument>io.druid.extensions.contrib:druid-kylin-extendcolumn</argument>
++                                        <argument>-c</argument>
++                                        
<argument>io.druid.extensions.contrib:druid-decimal</argument>
++                                        <argument>-c</argument>
+                                         
<argument>io.druid.extensions.contrib:druid-rocketmq</argument>
+                                         <argument>-c</argument>
+                                         
<argument>io.druid.extensions.contrib:druid-google-extensions</argument>
+diff --git a/extensions-contrib/decimal/pom.xml 
b/extensions-contrib/decimal/pom.xml
+new file mode 100644
+index 000000000..1f2740347
+--- /dev/null
++++ b/extensions-contrib/decimal/pom.xml
+@@ -0,0 +1,59 @@
++<?xml version="1.0" encoding="UTF-8"?>
++<!--
++ ~ Licensed to Metamarkets Group Inc. (Metamarkets) under one
++ ~ or more contributor license agreements. See the NOTICE file
++ ~ distributed with this work for additional information
++ ~ regarding copyright ownership. Metamarkets licenses this file
++ ~ to you under the Apache License, Version 2.0 (the
++ ~ "License"); you may not use this file except in compliance
++ ~ with the License. You may obtain a copy of the License at
++ ~
++ ~ http://www.apache.org/licenses/LICENSE-2.0
++ ~
++ ~ Unless required by applicable law or agreed to in writing,
++ ~ software distributed under the License is distributed on an
++ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ ~ KIND, either express or implied. See the License for the
++ ~ specific language governing permissions and limitations
++ ~ under the License.
++  -->
++
++<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
++  <modelVersion>4.0.0</modelVersion>
++
++  <groupId>io.druid.extensions.contrib</groupId>
++  <artifactId>druid-decimal</artifactId>
++  <name>druid-decimal</name>
++  <description>druid-decimal</description>
++
++  <parent>
++    <groupId>io.druid</groupId>
++    <artifactId>druid</artifactId>
++    <version>0.11.1-SNAPSHOT</version>
++    <relativePath>../../pom.xml</relativePath>
++  </parent>
++
++  <dependencies>
++    <dependency>
++      <groupId>io.druid</groupId>
++      <artifactId>druid-processing</artifactId>
++      <version>${project.parent.version}</version>
++      <scope>provided</scope>
++    </dependency>
++
++    <!-- Tests -->
++    <dependency>
++      <groupId>io.druid</groupId>
++      <artifactId>druid-processing</artifactId>
++      <version>${project.parent.version}</version>
++      <scope>test</scope>
++      <type>test-jar</type>
++    </dependency>
++    <dependency>
++      <groupId>junit</groupId>
++      <artifactId>junit</artifactId>
++      <scope>test</scope>
++    </dependency>
++  </dependencies>
++
++</project>
+diff --git 
a/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalAggregatorFactory.java
 
b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalAggregatorFactory.java
+new file mode 100644
+index 000000000..e09925b34
+--- /dev/null
++++ 
b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalAggregatorFactory.java
+@@ -0,0 +1,114 @@
++/*
++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. Metamarkets licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
++package io.druid.query.aggregation.decimal;
++
++import com.fasterxml.jackson.annotation.JsonProperty;
++import com.google.common.base.Preconditions;
++import io.druid.query.aggregation.AggregatorFactory;
++
++import java.math.BigDecimal;
++import java.util.Arrays;
++import java.util.Comparator;
++import java.util.List;
++
++public abstract class DecimalAggregatorFactory extends AggregatorFactory
++{
++  protected final String name;
++  protected final String fieldName;
++  protected final Integer precision;
++
++
++  public DecimalAggregatorFactory(String name, String fieldName, Integer 
precision)
++  {
++    Preconditions.checkNotNull(name);
++    Preconditions.checkNotNull(fieldName);
++    this.name = name;
++    this.fieldName = fieldName;
++    this.precision = precision;
++  }
++
++  @Override
++  public Comparator getComparator()
++  {
++    return new Comparator()
++    {
++      @Override
++      public int compare(Object o1, Object o2)
++      {
++        return ((BigDecimal) o1).compareTo((BigDecimal) o2);
++      }
++    };
++  }
++
++  @Override
++  public Object deserialize(Object object)
++  {
++    if (object instanceof String) {
++      return new BigDecimal(((String) object));
++    }
++    return object;
++  }
++
++  @Override
++  public Object finalizeComputation(Object object)
++  {
++    return object;
++  }
++
++  @JsonProperty
++  public Integer getPrecision()
++  {
++    return precision;
++  }
++
++  @JsonProperty
++  public String getFieldName()
++  {
++    return fieldName;
++  }
++
++  @Override
++  @JsonProperty
++  public String getName()
++  {
++    return name;
++  }
++
++  @Override
++  public List<String> requiredFields()
++  {
++    return Arrays.asList(fieldName);
++  }
++
++
++  @Override
++  public int getMaxIntermediateSize()
++  {
++    return 6 + (precision + 1) / 2;
++  }
++
++  @Override
++  public int hashCode()
++  {
++    int result = name.hashCode();
++    result = 31 * result + fieldName.hashCode();
++    return result;
++  }
++}
+diff --git 
a/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalBufferAggregator.java
 
b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalBufferAggregator.java
+new file mode 100644
+index 000000000..ff8967ba4
+--- /dev/null
++++ 
b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalBufferAggregator.java
+@@ -0,0 +1,99 @@
++/*
++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. Metamarkets licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
++package io.druid.query.aggregation.decimal;
++
++import io.druid.query.aggregation.BufferAggregator;
++import io.druid.segment.ObjectColumnSelector;
++
++import java.math.BigDecimal;
++import java.math.BigInteger;
++import java.nio.ByteBuffer;
++
++public abstract class DecimalBufferAggregator implements BufferAggregator
++{
++  protected final ObjectColumnSelector selector;
++
++  public DecimalBufferAggregator(ObjectColumnSelector selector)
++  {
++    this.selector = selector;
++  }
++
++  protected BigDecimal readTFromBuffer(ByteBuffer buf, int position)
++  {
++    int oldPosition = buf.position();
++    int oldLimit = buf.limit();
++
++    buf.position(position);
++    short size = buf.getShort();
++    int scale = buf.getInt();
++    buf.limit(position + size);
++    byte[] bytes = new byte[buf.remaining()];
++    buf.get(bytes, 0, bytes.length);
++
++    buf.limit(oldLimit);
++    buf.position(oldPosition);
++
++    return new BigDecimal(new BigInteger(bytes), scale);
++  }
++
++  protected void writeToBuffer(BigDecimal value, ByteBuffer buf, int position)
++  {
++    int oldPosition = buf.position();
++
++    byte[] sumBytes = value.unscaledValue().toByteArray();
++    short sumSize = (short) (6 + sumBytes.length);
++
++    buf.position(position);
++    buf.putShort(sumSize);
++    buf.putInt(value.scale());
++    buf.put(sumBytes);
++
++    buf.position(oldPosition);
++  }
++
++  @Override
++  public Object get(ByteBuffer buf, int position)
++  {
++    return readTFromBuffer(buf, position);
++  }
++
++  @Override
++  public float getFloat(ByteBuffer buf, int position)
++  {
++    throw new UnsupportedOperationException("DecimalBufferAggregator does not 
support getFloat()");
++  }
++
++  @Override
++  public long getLong(ByteBuffer buf, int position)
++  {
++    throw new UnsupportedOperationException("DecimalBufferAggregator does not 
support getLong()");
++  }
++
++  @Override
++  public double getDouble(ByteBuffer buf, int position)
++  {
++    throw new UnsupportedOperationException("DecimalBufferAggregator does not 
support getDouble()");
++  }
++
++  @Override
++  public void close()
++  {
++  }
++}
+diff --git 
a/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalDruidModule.java
 
b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalDruidModule.java
+new file mode 100644
+index 000000000..ac7f69549
+--- /dev/null
++++ 
b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalDruidModule.java
+@@ -0,0 +1,71 @@
++/*
++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. Metamarkets licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
++package io.druid.query.aggregation.decimal;
++
++import com.fasterxml.jackson.databind.Module;
++import com.fasterxml.jackson.databind.jsontype.NamedType;
++import com.fasterxml.jackson.databind.module.SimpleModule;
++import com.google.common.collect.ImmutableList;
++import com.google.inject.Binder;
++import io.druid.initialization.DruidModule;
++import io.druid.segment.serde.ComplexMetrics;
++
++import java.util.List;
++
++public class DecimalDruidModule implements DruidModule
++{
++
++  public static final String DECIMALSUM = "decimalSum";
++  public static final String DECIMALMAX = "decimalMax";
++  public static final String DECIMALMIN = "decimalMin";
++
++  @Override
++  public List<? extends Module> getJacksonModules()
++  {
++    return ImmutableList.of(
++        new SimpleModule("DecimalModule").registerSubtypes(
++            new NamedType(DecimalSumAggregatorFactory.class, DECIMALSUM)
++        ),
++        new SimpleModule("DecimalModule").registerSubtypes(
++            new NamedType(DecimalMaxAggregatorFactory.class, DECIMALMAX)
++        ),
++        new SimpleModule("DecimalModule").registerSubtypes(
++            new NamedType(DecimalMinAggregatorFactory.class, DECIMALMIN)
++        )
++    );
++  }
++
++  @Override
++  public void configure(Binder binder)
++  {
++    if (ComplexMetrics.getSerdeForType(DECIMALSUM) == null) {
++      ComplexMetrics.registerSerde(DECIMALSUM, new DecimalSumSerde());
++    }
++
++    if (ComplexMetrics.getSerdeForType(DECIMALMIN) == null) {
++      ComplexMetrics.registerSerde(DECIMALMIN, new DecimalMinSerde());
++    }
++
++    if (ComplexMetrics.getSerdeForType(DECIMALMAX) == null) {
++      ComplexMetrics.registerSerde(DECIMALMAX, new DecimalMaxSerde());
++    }
++
++  }
++}
+diff --git 
a/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMaxAggregator.java
 
b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMaxAggregator.java
+new file mode 100644
+index 000000000..7dca0ae56
+--- /dev/null
++++ 
b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMaxAggregator.java
+@@ -0,0 +1,82 @@
++/*
++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. Metamarkets licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
++package io.druid.query.aggregation.decimal;
++
++import io.druid.query.aggregation.Aggregator;
++import io.druid.segment.ObjectColumnSelector;
++
++import java.math.BigDecimal;
++
++public class DecimalMaxAggregator implements Aggregator
++{
++  private final ObjectColumnSelector selector;
++  private BigDecimal max = null;
++
++  public DecimalMaxAggregator(ObjectColumnSelector selector)
++  {
++    this.selector = selector;
++  }
++
++  @Override
++  public void aggregate()
++  {
++    BigDecimal value = (BigDecimal) selector.getObject();
++    if (max == null) {
++      max = value;
++    } else if (max.compareTo(value) < 0) {
++      max = value;
++    }
++  }
++
++  @Override
++  public void reset()
++  {
++    max = null;
++  }
++
++  @Override
++  public Object get()
++  {
++    return max;
++  }
++
++  @Override
++  public void close()
++  {
++  }
++
++  @Override
++  public float getFloat()
++  {
++    throw new UnsupportedOperationException("DecimalMaxAggregator does not 
support getFloat()");
++  }
++
++  @Override
++  public long getLong()
++  {
++    throw new UnsupportedOperationException("DecimalMaxAggregator does not 
support getLong()");
++  }
++
++  @Override
++  public double getDouble()
++  {
++    throw new UnsupportedOperationException("DecimalMaxAggregator does not 
support getDouble()");
++  }
++}
+diff --git 
a/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMaxAggregatorFactory.java
 
b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMaxAggregatorFactory.java
+new file mode 100644
+index 000000000..91c826592
+--- /dev/null
++++ 
b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMaxAggregatorFactory.java
+@@ -0,0 +1,160 @@
++/*
++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. Metamarkets licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
++package io.druid.query.aggregation.decimal;
++
++import com.fasterxml.jackson.annotation.JsonCreator;
++import com.fasterxml.jackson.annotation.JsonProperty;
++import io.druid.java.util.common.IAE;
++import io.druid.java.util.common.StringUtils;
++import io.druid.query.aggregation.Aggregator;
++import io.druid.query.aggregation.AggregatorFactory;
++import io.druid.query.aggregation.AggregatorUtil;
++import io.druid.query.aggregation.BufferAggregator;
++import io.druid.segment.ColumnSelectorFactory;
++import io.druid.segment.ObjectColumnSelector;
++
++import java.math.BigDecimal;
++import java.nio.ByteBuffer;
++import java.util.Arrays;
++import java.util.Comparator;
++import java.util.List;
++
++public class DecimalMaxAggregatorFactory extends DecimalAggregatorFactory
++{
++  private static final byte CACHE_TYPE_ID = 28;
++
++  @JsonCreator
++  public DecimalMaxAggregatorFactory(
++      @JsonProperty("name") String name,
++      @JsonProperty("fieldName") String fieldName,
++      @JsonProperty("precision") Integer precision
++  )
++  {
++    super(name, fieldName, precision);
++  }
++
++  @Override
++  public Aggregator factorize(ColumnSelectorFactory columnFactory)
++  {
++    ObjectColumnSelector selector = 
columnFactory.makeObjectColumnSelector(fieldName);
++    if (selector == null) {
++      throw new IAE("selector in ExtendColumnAggregatorFactory should not be 
Null");
++    } else {
++      return new DecimalMaxAggregator(selector);
++    }
++  }
++
++  @Override
++  public BufferAggregator factorizeBuffered(ColumnSelectorFactory 
columnFactory)
++  {
++    ObjectColumnSelector selector = 
columnFactory.makeObjectColumnSelector(fieldName);
++
++    final Class classOfObject = selector.classOfObject();
++    if (!classOfObject.equals(Object.class) && 
!BigDecimal.class.isAssignableFrom(classOfObject)) {
++      throw new IAE("Incompatible type for metric[%s], expected a 
ExtendByteArray, got a %s", fieldName, classOfObject);
++    }
++
++    return new DecimalMaxBufferAggregator(selector);
++  }
++
++  @Override
++  public Comparator getComparator()
++  {
++    return new Comparator()
++    {
++      @Override
++      public int compare(Object o1, Object o2)
++      {
++        return ((BigDecimal) o1).compareTo((BigDecimal) o2);
++      }
++    };
++  }
++
++  @Override
++  public Object combine(Object o1, Object o2)
++  {
++    BigDecimal left = (BigDecimal) o1;
++    BigDecimal right = (BigDecimal) o2;
++
++    if (left.compareTo(right) > 0) {
++      return left;
++    } else {
++      return right;
++    }
++  }
++
++  @Override
++  public AggregatorFactory getCombiningFactory()
++  {
++    return new DecimalMaxAggregatorFactory(name, name, precision);
++  }
++
++  @Override
++  public List<AggregatorFactory> getRequiredColumns()
++  {
++    return Arrays.<AggregatorFactory>asList(new 
DecimalMaxAggregatorFactory(fieldName, fieldName, precision));
++  }
++
++  @Override
++  public byte[] getCacheKey()
++  {
++    byte[] fieldNameBytes = StringUtils.toUtf8(fieldName);
++    return ByteBuffer.allocate(2 + fieldNameBytes.length)
++                     .put(CACHE_TYPE_ID)
++                     .put(fieldNameBytes)
++                     .put(AggregatorUtil.STRING_SEPARATOR)
++                     .array();
++  }
++
++  @Override
++  public String getTypeName()
++  {
++    return DecimalDruidModule.DECIMALMAX;
++  }
++
++  @Override
++  public boolean equals(Object o)
++  {
++    if (this == o) {
++      return true;
++    }
++    if (o == null || getClass() != o.getClass()) {
++      return false;
++    }
++
++    DecimalMaxAggregatorFactory that = (DecimalMaxAggregatorFactory) o;
++
++    if (!fieldName.equals(that.fieldName)) {
++      return false;
++    }
++
++    if (!name.equals(that.name)) {
++      return false;
++    }
++
++    return true;
++  }
++
++  @Override
++  public String toString()
++  {
++    return "DecimalMaxAggregatorFactory {" + "name='" + name + '\'' + ", 
fieldName='" + fieldName + '\'' + '}';
++  }
++}
+diff --git 
a/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMaxBufferAggregator.java
 
b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMaxBufferAggregator.java
+new file mode 100644
+index 000000000..d2a933c39
+--- /dev/null
++++ 
b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMaxBufferAggregator.java
+@@ -0,0 +1,56 @@
++/*
++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. Metamarkets licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
++package io.druid.query.aggregation.decimal;
++
++import io.druid.segment.ObjectColumnSelector;
++
++import java.math.BigDecimal;
++import java.nio.ByteBuffer;
++
++public class DecimalMaxBufferAggregator extends DecimalBufferAggregator
++{
++
++  public DecimalMaxBufferAggregator(ObjectColumnSelector selector)
++  {
++    super(selector);
++  }
++
++  @Override
++  public void init(ByteBuffer buf, int position)
++  {
++    BigDecimal init = new BigDecimal(Long.MIN_VALUE);
++
++    writeToBuffer(init, buf, position);
++  }
++
++  @Override
++  public void aggregate(ByteBuffer buf, int position)
++  {
++    BigDecimal value = (BigDecimal) selector.getObject();
++
++    BigDecimal max = readTFromBuffer(buf, position);
++
++    if (max.compareTo(value) < 0) {
++      max = value;
++    }
++
++    writeToBuffer(max, buf, position);
++  }
++}
+diff --git 
a/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMaxSerde.java
 
b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMaxSerde.java
+new file mode 100644
+index 000000000..eccaec0ba
+--- /dev/null
++++ 
b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMaxSerde.java
+@@ -0,0 +1,29 @@
++/*
++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. Metamarkets licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
++package io.druid.query.aggregation.decimal;
++
++public class DecimalMaxSerde extends DecimalSerde
++{
++  @Override
++  public String getTypeName()
++  {
++    return DecimalDruidModule.DECIMALMAX;
++  }
++}
+diff --git 
a/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMinAggregator.java
 
b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMinAggregator.java
+new file mode 100644
+index 000000000..f69b6e1e3
+--- /dev/null
++++ 
b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMinAggregator.java
+@@ -0,0 +1,83 @@
++/*
++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. Metamarkets licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
++package io.druid.query.aggregation.decimal;
++
++import io.druid.query.aggregation.Aggregator;
++import io.druid.segment.ObjectColumnSelector;
++
++import java.math.BigDecimal;
++
++public class DecimalMinAggregator implements Aggregator
++{
++  private final ObjectColumnSelector selector;
++  private BigDecimal min = null;
++
++  public DecimalMinAggregator(ObjectColumnSelector selector)
++  {
++    this.selector = selector;
++  }
++
++  @Override
++  public void aggregate()
++  {
++    BigDecimal value = (BigDecimal) selector.getObject();
++
++    if (min == null) {
++      min = value;
++    } else if (min.compareTo(value) > 0) {
++      min = value;
++    }
++  }
++
++  @Override
++  public void reset()
++  {
++    min = null;
++  }
++
++  @Override
++  public Object get()
++  {
++    return min;
++  }
++
++  @Override
++  public void close()
++  {
++  }
++
++  @Override
++  public float getFloat()
++  {
++    throw new UnsupportedOperationException("ExtendColumnAggregator does not 
support getFloat()");
++  }
++
++  @Override
++  public long getLong()
++  {
++    throw new UnsupportedOperationException("ExtendColumnAggregator does not 
support getLong()");
++  }
++
++  @Override
++  public double getDouble()
++  {
++    throw new UnsupportedOperationException("ExtendColumnAggregator does not 
support getDouble()");
++  }
++}
+diff --git 
a/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMinAggregatorFactory.java
 
b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMinAggregatorFactory.java
+new file mode 100644
+index 000000000..9612033f4
+--- /dev/null
++++ 
b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMinAggregatorFactory.java
+@@ -0,0 +1,149 @@
++/*
++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. Metamarkets licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
++package io.druid.query.aggregation.decimal;
++
++import com.fasterxml.jackson.annotation.JsonCreator;
++import com.fasterxml.jackson.annotation.JsonProperty;
++import io.druid.java.util.common.IAE;
++import io.druid.java.util.common.StringUtils;
++import io.druid.query.aggregation.Aggregator;
++import io.druid.query.aggregation.AggregatorFactory;
++import io.druid.query.aggregation.AggregatorUtil;
++import io.druid.query.aggregation.BufferAggregator;
++import io.druid.segment.ColumnSelectorFactory;
++import io.druid.segment.ObjectColumnSelector;
++
++import java.math.BigDecimal;
++import java.nio.ByteBuffer;
++import java.util.Arrays;
++import java.util.List;
++
++/**
++ * Created by kangkaisen on 2017/11/28.
++ */
++public class DecimalMinAggregatorFactory extends DecimalAggregatorFactory
++{
++  private static final byte CACHE_TYPE_ID = 29;
++
++  @JsonCreator
++  public DecimalMinAggregatorFactory(
++      @JsonProperty("name") String name,
++      @JsonProperty("fieldName") String fieldName,
++      @JsonProperty("precision") Integer precision
++  )
++  {
++    super(name, fieldName, precision);
++  }
++
++  @Override
++  public Aggregator factorize(ColumnSelectorFactory columnFactory)
++  {
++    ObjectColumnSelector selector = 
columnFactory.makeObjectColumnSelector(fieldName);
++    if (selector == null) {
++      throw new IAE("selector in ExtendColumnAggregatorFactory should not be 
Null");
++    } else {
++      return new DecimalMinAggregator(selector);
++    }
++  }
++
++  @Override
++  public BufferAggregator factorizeBuffered(ColumnSelectorFactory 
columnFactory)
++  {
++    ObjectColumnSelector selector = 
columnFactory.makeObjectColumnSelector(fieldName);
++
++    final Class classOfObject = selector.classOfObject();
++    if (!classOfObject.equals(Object.class) && 
!BigDecimal.class.isAssignableFrom(classOfObject)) {
++      throw new IAE("Incompatible type for metric[%s], expected a 
ExtendByteArray, got a %s", fieldName, classOfObject);
++    }
++
++    return new DecimalMinBufferAggregator(selector);
++  }
++
++  @Override
++  public Object combine(Object o1, Object o2)
++  {
++    BigDecimal left = (BigDecimal) o1;
++    BigDecimal right = (BigDecimal) o2;
++
++    if (left.compareTo(right) < 0) {
++      return left;
++    } else {
++      return right;
++    }
++  }
++
++  @Override
++  public AggregatorFactory getCombiningFactory()
++  {
++    return new DecimalMinAggregatorFactory(name, name, precision);
++  }
++
++  @Override
++  public List<AggregatorFactory> getRequiredColumns()
++  {
++    return Arrays.<AggregatorFactory>asList(new 
DecimalMinAggregatorFactory(fieldName, fieldName, precision));
++  }
++
++  @Override
++  public byte[] getCacheKey()
++  {
++    byte[] fieldNameBytes = StringUtils.toUtf8(fieldName);
++    return ByteBuffer.allocate(2 + fieldNameBytes.length)
++                     .put(CACHE_TYPE_ID)
++                     .put(fieldNameBytes)
++                     .put(AggregatorUtil.STRING_SEPARATOR)
++                     .array();
++  }
++
++  @Override
++  public String getTypeName()
++  {
++    return DecimalDruidModule.DECIMALMIN;
++  }
++
++  @Override
++  public boolean equals(Object o)
++  {
++    if (this == o) {
++      return true;
++    }
++    if (o == null || getClass() != o.getClass()) {
++      return false;
++    }
++
++    DecimalMinAggregatorFactory that = (DecimalMinAggregatorFactory) o;
++
++    if (!fieldName.equals(that.fieldName)) {
++      return false;
++    }
++
++    if (!name.equals(that.name)) {
++      return false;
++    }
++
++    return true;
++  }
++
++  @Override
++  public String toString()
++  {
++    return "DecimalMinAggregatorFactory {" + "name='" + name + '\'' + ", 
fieldName='" + fieldName + '\'' + '}';
++  }
++}
+diff --git 
a/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMinBufferAggregator.java
 
b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMinBufferAggregator.java
+new file mode 100644
+index 000000000..db12f46eb
+--- /dev/null
++++ 
b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMinBufferAggregator.java
+@@ -0,0 +1,56 @@
++/*
++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. Metamarkets licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
++package io.druid.query.aggregation.decimal;
++
++import io.druid.segment.ObjectColumnSelector;
++
++import java.math.BigDecimal;
++import java.nio.ByteBuffer;
++
++public class DecimalMinBufferAggregator extends DecimalBufferAggregator
++{
++
++  public DecimalMinBufferAggregator(ObjectColumnSelector selector)
++  {
++    super(selector);
++  }
++
++  @Override
++  public void init(ByteBuffer buf, int position)
++  {
++    BigDecimal init = new BigDecimal(Long.MAX_VALUE);
++
++    writeToBuffer(init, buf, position);
++  }
++
++  @Override
++  public void aggregate(ByteBuffer buf, int position)
++  {
++    BigDecimal value = (BigDecimal) selector.getObject();
++
++    BigDecimal min = readTFromBuffer(buf, position);
++
++    if (min.compareTo(value) > 0) {
++      min = value;
++    }
++
++    writeToBuffer(min, buf, position);
++  }
++}
+diff --git 
a/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMinSerde.java
 
b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMinSerde.java
+new file mode 100644
+index 000000000..b09146eb1
+--- /dev/null
++++ 
b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalMinSerde.java
+@@ -0,0 +1,29 @@
++/*
++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. Metamarkets licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
++package io.druid.query.aggregation.decimal;
++
++public class DecimalMinSerde extends DecimalSerde
++{
++  @Override
++  public String getTypeName()
++  {
++    return DecimalDruidModule.DECIMALMIN;
++  }
++}
+diff --git 
a/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalSerde.java
 
b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalSerde.java
+new file mode 100644
+index 000000000..ea674d0f4
+--- /dev/null
++++ 
b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalSerde.java
+@@ -0,0 +1,135 @@
++/*
++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. Metamarkets licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
++package io.druid.query.aggregation.decimal;
++
++import com.google.common.primitives.Ints;
++import io.druid.data.input.InputRow;
++import io.druid.java.util.common.IAE;
++import io.druid.segment.GenericColumnSerializer;
++import io.druid.segment.column.ColumnBuilder;
++import io.druid.segment.data.GenericIndexed;
++import io.druid.segment.data.IOPeon;
++import io.druid.segment.data.ObjectStrategy;
++import io.druid.segment.serde.ComplexColumnPartSupplier;
++import io.druid.segment.serde.ComplexMetricExtractor;
++import io.druid.segment.serde.ComplexMetricSerde;
++import io.druid.segment.serde.LargeColumnSupportedComplexColumnSerializer;
++import org.apache.commons.lang.ArrayUtils;
++
++import java.math.BigDecimal;
++import java.math.BigInteger;
++import java.nio.ByteBuffer;
++
++public abstract class DecimalSerde extends ComplexMetricSerde
++{
++
++  public DecimalSerde()
++  {
++  }
++
++  @Override
++  public ComplexMetricExtractor getExtractor()
++  {
++    return new ComplexMetricExtractor()
++    {
++      @Override
++      public Class<BigDecimal> extractedClass()
++      {
++        return BigDecimal.class;
++      }
++
++      @Override
++      public BigDecimal extractValue(InputRow inputRow, String metricName)
++      {
++        Object rawValue = inputRow.getRaw(metricName);
++
++        if (BigDecimal.class.isAssignableFrom(rawValue.getClass())) {
++          return (BigDecimal) rawValue;
++        } else {
++          throw new IAE("The class must be ExtendByteArray");
++        }
++      }
++    };
++  }
++
++  @Override
++  public void deserializeColumn(ByteBuffer byteBuffer, ColumnBuilder 
columnBuilder)
++  {
++    final GenericIndexed column = GenericIndexed.read(byteBuffer, 
getObjectStrategy());
++    columnBuilder.setComplexColumn(new 
ComplexColumnPartSupplier(getTypeName(), column));
++  }
++
++  @Override
++  public ObjectStrategy getObjectStrategy()
++  {
++    return new ObjectStrategy()
++    {
++      @Override
++      public Class getClazz()
++      {
++        return BigDecimal.class;
++      }
++
++      @Override
++      public Object fromByteBuffer(ByteBuffer buffer, int numBytes)
++      {
++        final ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer();
++        readOnlyBuffer.limit(readOnlyBuffer.position() + numBytes);
++
++        int scale = readOnlyBuffer.getInt();
++        byte[] bytes = new byte[readOnlyBuffer.remaining()];
++        readOnlyBuffer.get(bytes, 0, bytes.length);
++
++        return new BigDecimal(new BigInteger(bytes), scale);
++      }
++
++      @Override
++      public byte[] toBytes(Object val)
++      {
++        if (val == null) {
++          return new byte[]{};
++        }
++
++        if (val instanceof BigDecimal) {
++          BigDecimal value = (BigDecimal) val;
++
++          byte[] scaleBytes = Ints.toByteArray(value.scale());
++          byte[] bytes = value.unscaledValue().toByteArray();
++
++          return ArrayUtils.addAll(scaleBytes, bytes);
++        } else {
++          throw new IAE("Unknown class[%s], toString[%s]", val.getClass(), 
val);
++        }
++      }
++
++      @Override
++      public int compare(Object o1, Object o2)
++      {
++        return ((BigDecimal) o1).compareTo((BigDecimal) o2);
++      }
++    };
++  }
++
++  @Override
++  public GenericColumnSerializer getSerializer(IOPeon peon, String column)
++  {
++    return LargeColumnSupportedComplexColumnSerializer.create(peon, column, 
this.getObjectStrategy());
++  }
++}
+diff --git 
a/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalSumAggregator.java
 
b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalSumAggregator.java
+new file mode 100644
+index 000000000..f381b47dc
+--- /dev/null
++++ 
b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalSumAggregator.java
+@@ -0,0 +1,80 @@
++/*
++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. Metamarkets licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
++package io.druid.query.aggregation.decimal;
++
++import io.druid.query.aggregation.Aggregator;
++import io.druid.segment.ObjectColumnSelector;
++
++import java.math.BigDecimal;
++
++public class DecimalSumAggregator implements Aggregator
++{
++
++  private final ObjectColumnSelector selector;
++  private BigDecimal sum = new BigDecimal(0);
++
++  public DecimalSumAggregator(ObjectColumnSelector selector)
++  {
++    this.selector = selector;
++  }
++
++  @Override
++  public void aggregate()
++  {
++    BigDecimal value = (BigDecimal) selector.getObject();
++
++    sum = sum.add(value);
++  }
++
++  @Override
++  public void reset()
++  {
++    sum = new BigDecimal(0);
++  }
++
++  @Override
++  public Object get()
++  {
++    return sum;
++  }
++
++  @Override
++  public void close()
++  {
++  }
++
++  @Override
++  public float getFloat()
++  {
++    throw new UnsupportedOperationException("ExtendColumnAggregator does not 
support getFloat()");
++  }
++
++  @Override
++  public long getLong()
++  {
++    throw new UnsupportedOperationException("ExtendColumnAggregator does not 
support getLong()");
++  }
++
++  @Override
++  public double getDouble()
++  {
++    throw new UnsupportedOperationException("ExtendColumnAggregator does not 
support getDouble()");
++  }
++}
+diff --git 
a/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalSumAggregatorFactory.java
 
b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalSumAggregatorFactory.java
+new file mode 100644
+index 000000000..329609341
+--- /dev/null
++++ 
b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalSumAggregatorFactory.java
+@@ -0,0 +1,143 @@
++/*
++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. Metamarkets licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
++package io.druid.query.aggregation.decimal;
++
++import com.fasterxml.jackson.annotation.JsonCreator;
++import com.fasterxml.jackson.annotation.JsonProperty;
++
++import io.druid.java.util.common.IAE;
++import io.druid.java.util.common.StringUtils;
++import io.druid.query.aggregation.Aggregator;
++import io.druid.query.aggregation.AggregatorFactory;
++import io.druid.query.aggregation.AggregatorUtil;
++import io.druid.query.aggregation.BufferAggregator;
++import io.druid.segment.ColumnSelectorFactory;
++import io.druid.segment.ObjectColumnSelector;
++
++import java.math.BigDecimal;
++import java.nio.ByteBuffer;
++import java.util.Arrays;
++import java.util.List;
++
++public class DecimalSumAggregatorFactory extends DecimalAggregatorFactory
++{
++  private static final byte CACHE_TYPE_ID = 27;
++
++  @JsonCreator
++  public DecimalSumAggregatorFactory(
++      @JsonProperty("name") String name,
++      @JsonProperty("fieldName") String fieldName,
++      @JsonProperty("precision") Integer precision
++  )
++  {
++    super(name, fieldName, precision);
++  }
++
++  @Override
++  public Aggregator factorize(ColumnSelectorFactory columnFactory)
++  {
++    ObjectColumnSelector selector = 
columnFactory.makeObjectColumnSelector(fieldName);
++    if (selector == null) {
++      throw new IAE("selector in ExtendColumnAggregatorFactory should not be 
Null");
++    } else {
++      return new DecimalSumAggregator(selector);
++    }
++  }
++
++  @Override
++  public BufferAggregator factorizeBuffered(ColumnSelectorFactory 
columnFactory)
++  {
++    ObjectColumnSelector selector = 
columnFactory.makeObjectColumnSelector(fieldName);
++
++    final Class classOfObject = selector.classOfObject();
++    if (!classOfObject.equals(Object.class) && 
!BigDecimal.class.isAssignableFrom(classOfObject)) {
++      throw new IAE("Incompatible type for metric[%s], expected a 
ExtendByteArray, got a %s", fieldName, classOfObject);
++    }
++
++    return new DecimalSumBufferAggregator(selector);
++  }
++
++  @Override
++  public Object combine(Object o1, Object o2)
++  {
++    BigDecimal left = (BigDecimal) o1;
++    BigDecimal right = (BigDecimal) o2;
++
++    return left.add(right);
++  }
++
++  @Override
++  public AggregatorFactory getCombiningFactory()
++  {
++    return new DecimalSumAggregatorFactory(name, name, precision);
++  }
++
++  @Override
++  public List<AggregatorFactory> getRequiredColumns()
++  {
++    return Arrays.<AggregatorFactory>asList(new 
DecimalSumAggregatorFactory(fieldName, fieldName, precision));
++  }
++
++  @Override
++  public byte[] getCacheKey()
++  {
++    byte[] fieldNameBytes = StringUtils.toUtf8(fieldName);
++    return ByteBuffer.allocate(2 + fieldNameBytes.length)
++                     .put(CACHE_TYPE_ID)
++                     .put(fieldNameBytes)
++                     .put(AggregatorUtil.STRING_SEPARATOR)
++                     .array();
++  }
++
++  @Override
++  public String getTypeName()
++  {
++    return DecimalDruidModule.DECIMALSUM;
++  }
++
++  @Override
++  public boolean equals(Object o)
++  {
++    if (this == o) {
++      return true;
++    }
++    if (o == null || getClass() != o.getClass()) {
++      return false;
++    }
++
++    DecimalSumAggregatorFactory that = (DecimalSumAggregatorFactory) o;
++
++    if (!fieldName.equals(that.fieldName)) {
++      return false;
++    }
++
++    if (!name.equals(that.name)) {
++      return false;
++    }
++
++    return true;
++  }
++
++  @Override
++  public String toString()
++  {
++    return "DecimalSumAggregatorFactory {" + "name='" + name + '\'' + ", 
fieldName='" + fieldName + '\'' + '}';
++  }
++}
+diff --git 
a/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalSumBufferAggregator.java
 
b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalSumBufferAggregator.java
+new file mode 100644
+index 000000000..69f510eb1
+--- /dev/null
++++ 
b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalSumBufferAggregator.java
+@@ -0,0 +1,54 @@
++/*
++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. Metamarkets licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
++package io.druid.query.aggregation.decimal;
++
++import io.druid.segment.ObjectColumnSelector;
++
++import java.math.BigDecimal;
++import java.nio.ByteBuffer;
++
++public class DecimalSumBufferAggregator extends DecimalBufferAggregator
++{
++
++  public DecimalSumBufferAggregator(ObjectColumnSelector selector)
++  {
++    super(selector);
++  }
++
++  @Override
++  public void init(ByteBuffer buf, int position)
++  {
++    BigDecimal zero = BigDecimal.ZERO;
++
++    writeToBuffer(zero, buf, position);
++  }
++
++  @Override
++  public void aggregate(ByteBuffer buf, int position)
++  {
++    BigDecimal value = (BigDecimal) selector.getObject();
++
++    BigDecimal oldSum = readTFromBuffer(buf, position);
++
++    BigDecimal sum = oldSum.add(value);
++
++    writeToBuffer(sum, buf, position);
++  }
++}
+diff --git 
a/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalSumSerde.java
 
b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalSumSerde.java
+new file mode 100644
+index 000000000..d330bae3d
+--- /dev/null
++++ 
b/extensions-contrib/decimal/src/main/java/io/druid/query/aggregation/decimal/DecimalSumSerde.java
+@@ -0,0 +1,29 @@
++/*
++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. Metamarkets licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
++package io.druid.query.aggregation.decimal;
++
++public class DecimalSumSerde extends DecimalSerde
++{
++  @Override
++  public String getTypeName()
++  {
++    return DecimalDruidModule.DECIMALSUM;
++  }
++}
+diff --git 
a/extensions-contrib/decimal/src/main/resources/META-INF/services/io.druid.initialization.DruidModule
 
b/extensions-contrib/decimal/src/main/resources/META-INF/services/io.druid.initialization.DruidModule
+new file mode 100644
+index 000000000..86c36b366
+--- /dev/null
++++ 
b/extensions-contrib/decimal/src/main/resources/META-INF/services/io.druid.initialization.DruidModule
+@@ -0,0 +1 @@
++io.druid.query.aggregation.decimal.DecimalDruidModule
+diff --git 
a/extensions-contrib/decimal/src/test/java/io.druid.query.aggregation.decimal/DecimalGroupByQueryTest.java
 
b/extensions-contrib/decimal/src/test/java/io.druid.query.aggregation.decimal/DecimalGroupByQueryTest.java
+new file mode 100644
+index 000000000..e7207fa2e
+--- /dev/null
++++ 
b/extensions-contrib/decimal/src/test/java/io.druid.query.aggregation.decimal/DecimalGroupByQueryTest.java
+@@ -0,0 +1,250 @@
++/*
++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. Metamarkets licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
++package io.druid.query.aggregation.decimal;
++
++import com.fasterxml.jackson.databind.Module;
++import com.fasterxml.jackson.databind.ObjectMapper;
++import com.google.common.collect.ImmutableMap;
++import com.google.common.collect.Lists;
++import io.druid.data.input.MapBasedInputRow;
++import io.druid.data.input.Row;
++import io.druid.jackson.DefaultObjectMapper;
++import io.druid.java.util.common.Intervals;
++import io.druid.query.QueryRunnerTestHelper;
++import io.druid.query.aggregation.AggregatorFactory;
++import io.druid.query.dimension.DefaultDimensionSpec;
++import io.druid.query.dimension.DimensionSpec;
++import io.druid.query.groupby.GroupByQuery;
++import io.druid.query.groupby.GroupByQueryConfig;
++import io.druid.query.groupby.GroupByQueryRunnerFactory;
++import io.druid.query.groupby.GroupByQueryRunnerTest;
++import io.druid.query.groupby.GroupByQueryRunnerTestHelper;
++import io.druid.query.groupby.orderby.DefaultLimitSpec;
++import io.druid.query.groupby.orderby.OrderByColumnSpec;
++import io.druid.segment.IndexIO;
++import io.druid.segment.IndexMerger;
++import io.druid.segment.IndexMergerTestBase;
++import io.druid.segment.IndexMergerV9;
++import io.druid.segment.IndexSpec;
++import io.druid.segment.QueryableIndex;
++import io.druid.segment.QueryableIndexSegment;
++import io.druid.segment.TestHelper;
++import io.druid.segment.column.ColumnConfig;
++import io.druid.segment.data.CompressedObjectStrategy;
++import io.druid.segment.data.CompressionFactory;
++import io.druid.segment.data.ConciseBitmapSerdeFactory;
++import io.druid.segment.incremental.IncrementalIndex;
++import io.druid.segment.incremental.IncrementalIndexSchema;
++import io.druid.segment.serde.ComplexMetrics;
++import org.apache.commons.io.FileUtils;
++import org.junit.Test;
++
++import java.io.File;
++import java.math.BigDecimal;
++import java.util.Arrays;
++import java.util.List;
++
++public class DecimalGroupByQueryTest
++{
++
++  private static final IndexSpec INDEX_SPEC = 
IndexMergerTestBase.makeIndexSpec(
++      new ConciseBitmapSerdeFactory(),
++      CompressedObjectStrategy.CompressionStrategy.LZ4,
++      CompressedObjectStrategy.CompressionStrategy.LZ4,
++      CompressionFactory.LongEncodingStrategy.LONGS
++  );
++  private static IndexMerger INDEX_MERGER;
++
++  private static ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
++
++  private static IndexIO INDEX_IO;
++
++  static {
++    if (ComplexMetrics.getSerdeForType(DecimalDruidModule.DECIMALSUM) == 
null) {
++      ComplexMetrics.registerSerde(DecimalDruidModule.DECIMALSUM, new 
DecimalSumSerde());
++    }
++
++    if (ComplexMetrics.getSerdeForType(DecimalDruidModule.DECIMALMIN) == 
null) {
++      ComplexMetrics.registerSerde(DecimalDruidModule.DECIMALMIN, new 
DecimalMinSerde());
++    }
++
++    if (ComplexMetrics.getSerdeForType(DecimalDruidModule.DECIMALMAX) == 
null) {
++      ComplexMetrics.registerSerde(DecimalDruidModule.DECIMALMAX, new 
DecimalMaxSerde());
++    }
++
++    for (Module mod : new DecimalDruidModule().getJacksonModules()) {
++      JSON_MAPPER.registerModule(mod);
++    }
++
++    INDEX_IO = new IndexIO(JSON_MAPPER, new ColumnConfig()
++    {
++      @Override
++      public int columnCacheSizeBytes()
++      {
++        return 0;
++      }
++    });
++
++    INDEX_MERGER = new IndexMergerV9(JSON_MAPPER, INDEX_IO);
++  }
++
++  @Test
++  public void testGroupByWithDistinctCountAgg() throws Exception
++  {
++    final GroupByQueryConfig config = new GroupByQueryConfig();
++    config.setMaxIntermediateRows(100000000);
++    final GroupByQueryRunnerFactory factory = 
GroupByQueryRunnerTest.makeQueryRunnerFactory(config);
++
++    String visitor_id = "visitor_id";
++    String client_type = "client_type";
++    String sum = "sum";
++    String max = "max";
++    String min = "min";
++    long timestamp = System.currentTimeMillis();
++
++    IncrementalIndex index = new IncrementalIndex.Builder()
++        .setIndexSchema(
++            new IncrementalIndexSchema.Builder()
++                .withMetrics(new AggregatorFactory[]{
++                    new DecimalSumAggregatorFactory(sum, sum, 100),
++                    new DecimalMaxAggregatorFactory(max, max, 100),
++                    new DecimalMinAggregatorFactory(min, min, 100),
++                    })
++                .withRollup(false)
++                .build()
++        )
++        .setReportParseExceptions(false)
++        .setConcurrentEventAdd(true)
++        .setMaxRowCount(1000000)
++        .buildOnheap();
++
++    BigDecimal decimal1 = new BigDecimal(2.0);
++    BigDecimal decimal2 = new BigDecimal(3.0000);
++    BigDecimal decimal3 = new BigDecimal(0.8888);
++
++    index.add(new MapBasedInputRow(
++        timestamp,
++        Lists.newArrayList(visitor_id, client_type),
++        ImmutableMap.<String, Object>of(
++            visitor_id,
++            "0",
++            client_type,
++            "iphone",
++            sum,
++            decimal1,
++            max,
++            decimal1,
++            min,
++            decimal1
++        )
++    ));
++    index.add(new MapBasedInputRow(
++        timestamp,
++        Lists.newArrayList(visitor_id, client_type),
++        ImmutableMap.<String, Object>of(
++            visitor_id,
++            "0",
++            client_type,
++            "iphone",
++            sum,
++            decimal2,
++            max,
++            decimal2,
++            min,
++            decimal2
++        )
++    ));
++    index.add(new MapBasedInputRow(
++        timestamp,
++        Lists.newArrayList(visitor_id, client_type),
++        ImmutableMap.<String, Object>of(
++            visitor_id,
++            "0",
++            client_type,
++            "android",
++            sum,
++            decimal3,
++            max,
++            decimal3,
++            min,
++            decimal3
++        )
++    ));
++
++    final File finalFile = new File("tmp");
++    INDEX_MERGER.persist(
++        index,
++        Intervals.of("1970-01-01T00:00:00.000Z/2020-01-01T00:00:00.000Z"),
++        finalFile,
++        INDEX_SPEC
++    );
++
++    QueryableIndex queryableIndex = INDEX_IO.loadIndex(finalFile);
++    QueryableIndexSegment segment = new QueryableIndexSegment("index", 
queryableIndex);
++
++    GroupByQuery query = new 
GroupByQuery.Builder().setDataSource(QueryRunnerTestHelper.dataSource)
++                                                   
.setGranularity(QueryRunnerTestHelper.allGran)
++                                                   
.setDimensions(Arrays.<DimensionSpec>asList(new DefaultDimensionSpec(
++                                                       client_type,
++                                                       client_type
++                                                   )))
++                                                   
.setInterval(QueryRunnerTestHelper.fullOnInterval)
++                                                   .setLimitSpec(
++                                                       new DefaultLimitSpec(
++                                                           Lists.newArrayList(
++                                                               new 
OrderByColumnSpec(
++                                                                   
client_type,
++                                                                   
OrderByColumnSpec.Direction.DESCENDING
++                                                               )
++                                                           ), 10))
++                                                   
.setAggregatorSpecs(Lists.newArrayList(
++                                                       new 
DecimalSumAggregatorFactory(sum, sum, 100),
++                                                       new 
DecimalMaxAggregatorFactory(max, max, 100),
++                                                       new 
DecimalMinAggregatorFactory(min, min, 100)
++                                                   ))
++                                                   .build();
++
++    Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, 
factory.createRunner(segment), query);
++
++
++    BigDecimal decimalSum1 = decimal1.add(decimal2);
++
++    List<Row> expectedResults = Arrays.asList(
++        GroupByQueryRunnerTestHelper.createExpectedRow(
++            "1970-01-01T00:00:00.000Z",
++            client_type, "iphone",
++            sum, decimalSum1,
++            max, decimal2,
++            min, decimal1
++        ),
++        GroupByQueryRunnerTestHelper.createExpectedRow(
++            "1970-01-01T00:00:00.000Z",
++            client_type, "android",
++            sum, decimal3,
++            max, decimal3,
++            min, decimal3
++        )
++    );
++
++    TestHelper.assertExpectedObjects(expectedResults, results, "decimal");
++
++    FileUtils.deleteDirectory(finalFile);
++  }
++}
+diff --git a/extensions-contrib/kylin-distinccount/pom.xml 
b/extensions-contrib/kylin-distinccount/pom.xml
+new file mode 100644
+index 000000000..a2031f4e5
+--- /dev/null
++++ b/extensions-contrib/kylin-distinccount/pom.xml
+@@ -0,0 +1,59 @@
++<?xml version="1.0" encoding="UTF-8"?>
++<!--
++ ~ Licensed to Metamarkets Group Inc. (Metamarkets) under one
++ ~ or more contributor license agreements. See the NOTICE file
++ ~ distributed with this work for additional information
++ ~ regarding copyright ownership. Metamarkets licenses this file
++ ~ to you under the Apache License, Version 2.0 (the
++ ~ "License"); you may not use this file except in compliance
++ ~ with the License. You may obtain a copy of the License at
++ ~
++ ~ http://www.apache.org/licenses/LICENSE-2.0
++ ~
++ ~ Unless required by applicable law or agreed to in writing,
++ ~ software distributed under the License is distributed on an
++ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ ~ KIND, either express or implied. See the License for the
++ ~ specific language governing permissions and limitations
++ ~ under the License.
++  -->
++
++<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
++  <modelVersion>4.0.0</modelVersion>
++
++  <groupId>io.druid.extensions.contrib</groupId>
++  <artifactId>druid-kylin-distinctcount</artifactId>
++  <name>druid-kylin-distinctcount</name>
++  <description>druid-kylin-distinctcount</description>
++
++  <parent>
++    <groupId>io.druid</groupId>
++    <artifactId>druid</artifactId>
++    <version>0.11.1-SNAPSHOT</version>
++    <relativePath>../../pom.xml</relativePath>
++  </parent>
++
++  <dependencies>
++    <dependency>
++      <groupId>io.druid</groupId>
++      <artifactId>druid-processing</artifactId>
++      <version>${project.parent.version}</version>
++      <scope>provided</scope>
++    </dependency>
++
++    <!-- Tests -->
++    <dependency>
++      <groupId>io.druid</groupId>
++      <artifactId>druid-processing</artifactId>
++      <version>${project.parent.version}</version>
++      <scope>test</scope>
++      <type>test-jar</type>
++    </dependency>
++    <dependency>
++      <groupId>junit</groupId>
++      <artifactId>junit</artifactId>
++      <scope>test</scope>
++    </dependency>
++  </dependencies>
++
++</project>
+diff --git 
a/extensions-contrib/kylin-distinccount/src/main/java/io/druid/query/aggregation/kylin.distinctcount/DistinctCountAggregator.java
 
b/extensions-contrib/kylin-distinccount/src/main/java/io/druid/query/aggregation/kylin.distinctcount/DistinctCountAggregator.java
+new file mode 100644
+index 000000000..48db064d9
+--- /dev/null
++++ 
b/extensions-contrib/kylin-distinccount/src/main/java/io/druid/query/aggregation/kylin.distinctcount/DistinctCountAggregator.java
+@@ -0,0 +1,73 @@
++/*
++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. Metamarkets licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
++package io.druid.query.aggregation.kylin.distinctcount;
++
++import io.druid.collections.bitmap.ImmutableBitmap;
++import io.druid.collections.bitmap.RoaringBitmapFactory;
++import io.druid.query.aggregation.Aggregator;
++import io.druid.segment.ObjectColumnSelector;
++
++public class DistinctCountAggregator implements Aggregator
++{
++
++  private final ObjectColumnSelector selector;
++  private ImmutableBitmap bitmap = new 
RoaringBitmapFactory().makeEmptyImmutableBitmap();
++
++  public DistinctCountAggregator(ObjectColumnSelector selector)
++  {
++    this.selector = selector;
++  }
++
++  @Override
++  public void aggregate()
++  {
++    bitmap = bitmap.union((ImmutableBitmap) selector.getObject());
++  }
++
++  @Override
++  public void reset()
++  {
++    bitmap = null;
++  }
++
++  @Override
++  public Object get()
++  {
++    return bitmap;
++  }
++
++  @Override
++  public void close()
++  {
++    bitmap = null;
++  }
++
++  @Override
++  public float getFloat()
++  {
++    throw new UnsupportedOperationException("DistinctCountAggregator does not 
support getFloat()");
++  }
++
++  @Override
++  public long getLong()
++  {
++    throw new UnsupportedOperationException("DistinctCountAggregator does not 
support getLong()");
++  }
++}
+diff --git 
a/extensions-contrib/kylin-distinccount/src/main/java/io/druid/query/aggregation/kylin.distinctcount/DistinctCountAggregatorFactory.java
 
b/extensions-contrib/kylin-distinccount/src/main/java/io/druid/query/aggregation/kylin.distinctcount/DistinctCountAggregatorFactory.java
+new file mode 100644
+index 000000000..03f04b3ff
+--- /dev/null
++++ 
b/extensions-contrib/kylin-distinccount/src/main/java/io/druid/query/aggregation/kylin.distinctcount/DistinctCountAggregatorFactory.java
+@@ -0,0 +1,223 @@
++/*
++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. Metamarkets licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
++package io.druid.query.aggregation.kylin.distinctcount;
++
++import com.fasterxml.jackson.annotation.JsonCreator;
++import com.fasterxml.jackson.annotation.JsonProperty;
++import com.google.common.base.Preconditions;
++
++import io.druid.collections.bitmap.ImmutableBitmap;
++import io.druid.collections.bitmap.RoaringBitmapFactory;
++import io.druid.java.util.common.IAE;
++import io.druid.java.util.common.StringUtils;
++import io.druid.query.aggregation.Aggregator;
++import io.druid.query.aggregation.AggregatorFactory;
++import io.druid.query.aggregation.AggregatorUtil;
++import io.druid.query.aggregation.BufferAggregator;
++import io.druid.segment.ColumnSelectorFactory;
++import io.druid.segment.ObjectColumnSelector;
++import io.druid.segment.data.RoaringBitmapSerdeFactory;
++import org.apache.commons.codec.binary.Base64;
++
++import java.nio.ByteBuffer;
++import java.util.Arrays;
++import java.util.Comparator;
++import java.util.List;
++
++public class DistinctCountAggregatorFactory extends AggregatorFactory
++{
++  private static final byte CACHE_TYPE_ID = 21;
++
++  private final String name;
++  private final String fieldName;
++
++  @JsonCreator
++  public DistinctCountAggregatorFactory(@JsonProperty("name") String name, 
@JsonProperty("fieldName") String fieldName)
++  {
++    Preconditions.checkNotNull(name);
++    Preconditions.checkNotNull(fieldName);
++    this.name = name;
++    this.fieldName = fieldName;
++  }
++
++  @Override
++  public Aggregator factorize(ColumnSelectorFactory columnFactory)
++  {
++    ObjectColumnSelector selector = 
columnFactory.makeObjectColumnSelector(fieldName);
++    if (selector == null) {
++      throw new IAE("selector in DistinctCountAggregatorFactory should not be 
Null");
++    } else {
++      return new DistinctCountAggregator(selector);
++    }
++  }
++
++  @Override
++  public BufferAggregator factorizeBuffered(ColumnSelectorFactory 
columnFactory)
++  {
++    ObjectColumnSelector selector = 
columnFactory.makeObjectColumnSelector(fieldName);
++
++    final Class classOfObject = selector.classOfObject();
++    if (!classOfObject.equals(Object.class) && 
!ImmutableBitmap.class.isAssignableFrom(classOfObject)) {
++      throw new IAE("Incompatible type for metric[%s], expected a 
MutableBitmap, got a %s", fieldName, classOfObject);
++    }
++
++    return new DistinctCountBufferAggregator(selector);
++  }
++
++  @Override
++  public Comparator getComparator()
++  {
++    return new Comparator()
++    {
++      @Override
++      public int compare(Object o, Object o1)
++      {
++        return new RoaringBitmapSerdeFactory(true).getObjectStrategy()
++                                                  .compare((ImmutableBitmap) 
o, (ImmutableBitmap) o1);
++      }
++    };
++  }
++
++  @Override
++  public Object combine(Object lhs, Object rhs)
++  {
++    if (rhs == null) {
++      return lhs;
++    }
++    if (lhs == null) {
++      return rhs;
++    }
++
++    return ((ImmutableBitmap) lhs).union((ImmutableBitmap) rhs);
++  }
++
++  @Override
++  public AggregatorFactory getCombiningFactory()
++  {
++    return new DistinctCountAggregatorFactory(name, name);
++  }
++
++  @Override
++  public List<AggregatorFactory> getRequiredColumns()
++  {
++    return Arrays.<AggregatorFactory>asList(new 
DistinctCountAggregatorFactory(fieldName, fieldName));
++  }
++
++  @Override
++  public Object deserialize(Object object)
++  {
++    final ByteBuffer buffer;
++    if (object instanceof String) {
++      buffer = 
ByteBuffer.wrap(Base64.decodeBase64(StringUtils.toUtf8((String) object)));
++    } else if (object instanceof byte[]) {
++      buffer = ByteBuffer.wrap((byte[]) object);
++    } else if (object instanceof ByteBuffer) {
++      // Be conservative, don't assume we own this buffer.
++      buffer = ((ByteBuffer) object).duplicate();
++    } else {
++      return object;
++    }
++    return new RoaringBitmapFactory().mapImmutableBitmap(buffer);
++  }
++
++  @Override
++  public Object finalizeComputation(Object object)
++  {
++    return object;
++  }
++
++  @JsonProperty
++  public String getFieldName()
++  {
++    return fieldName;
++  }
++
++  @Override
++  @JsonProperty
++  public String getName()
++  {
++    return name;
++  }
++
++  @Override
++  public List<String> requiredFields()
++  {
++    return Arrays.asList(fieldName);
++  }
++
++  @Override
++  public byte[] getCacheKey()
++  {
++    byte[] fieldNameBytes = StringUtils.toUtf8(fieldName);
++    return ByteBuffer.allocate(2 + fieldNameBytes.length)
++                     .put(CACHE_TYPE_ID)
++                     .put(fieldNameBytes)
++                     .put(AggregatorUtil.STRING_SEPARATOR)
++                     .array();
++  }
++
++  @Override
++  public String getTypeName()
++  {
++    return DistinctCountDruidModule.DISTINCT_COUNT;
++  }
++
++  @Override
++  public int getMaxIntermediateSize()
++  {
++    return 4;
++  }
++
++  @Override
++  public boolean equals(Object o)
++  {
++    if (this == o) {
++      return true;
++    }
++    if (o == null || getClass() != o.getClass()) {
++      return false;
++    }
++
++    DistinctCountAggregatorFactory that = (DistinctCountAggregatorFactory) o;
++
++    if (!fieldName.equals(that.fieldName)) {
++      return false;
++    }
++    if (!name.equals(that.name)) {
++      return false;
++    }
++
++    return true;
++  }
++
++  @Override
++  public int hashCode()
++  {
++    int result = name.hashCode();
++    result = 31 * result + fieldName.hashCode();
++    return result;
++  }
++
++  @Override
++  public String toString()
++  {
++    return "DistinctCountAggregatorFactory{" + "name='" + name + '\'' + ", 
fieldName='" + fieldName + '\'' + '}';
++  }
++}
+diff --git 
a/extensions-contrib/kylin-distinccount/src/main/java/io/druid/query/aggregation/kylin.distinctcount/DistinctCountBufferAggregator.java
 
b/extensions-contrib/kylin-distinccount/src/main/java/io/druid/query/aggregation/kylin.distinctcount/DistinctCountBufferAggregator.java
+new file mode 100644
+index 000000000..1693972d0
+--- /dev/null
++++ 
b/extensions-contrib/kylin-distinccount/src/main/java/io/druid/query/aggregation/kylin.distinctcount/DistinctCountBufferAggregator.java
+@@ -0,0 +1,120 @@
++/*
++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. Metamarkets licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
++package io.druid.query.aggregation.kylin.distinctcount;
++
++import io.druid.collections.bitmap.ImmutableBitmap;
++import io.druid.collections.bitmap.RoaringBitmapFactory;
++import io.druid.query.aggregation.BufferAggregator;
++import io.druid.segment.ObjectColumnSelector;
++import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
++import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
++
++import java.nio.ByteBuffer;
++import java.util.IdentityHashMap;
++
++public class DistinctCountBufferAggregator implements BufferAggregator
++{
++  private final ObjectColumnSelector selector;
++  private final IdentityHashMap<ByteBuffer, Int2ObjectMap<ImmutableBitmap>> 
bitmaps = new IdentityHashMap<>();
++
++  public DistinctCountBufferAggregator(ObjectColumnSelector selector)
++  {
++    this.selector = selector;
++  }
++
++  @Override
++  public void init(ByteBuffer buf, int position)
++  {
++    createNewBitmap(buf, position);
++  }
++
++  @Override
++  public void aggregate(ByteBuffer buf, int position)
++  {
++    ImmutableBitmap updateBitmap = (ImmutableBitmap) selector.getObject();
++    if (updateBitmap == null) {
++      return;
++    }
++
++    ImmutableBitmap oldBitmap = getBitmap(buf, position);
++    ImmutableBitmap mergedBitmap = oldBitmap.union(updateBitmap);
++    bitmaps.get(buf).put(position, mergedBitmap);
++  }
++
++  private ImmutableBitmap createNewBitmap(ByteBuffer buf, int position)
++  {
++    ImmutableBitmap immutableBitmap = new 
RoaringBitmapFactory().makeEmptyImmutableBitmap();
++    Int2ObjectMap<ImmutableBitmap> bitmapMap = bitmaps.get(buf);
++    if (bitmapMap == null) {
++      bitmapMap = new Int2ObjectOpenHashMap<>();
++      bitmaps.put(buf, bitmapMap);
++    }
++    bitmapMap.put(position, immutableBitmap);
++    return immutableBitmap;
++  }
++
++  //Note that this is not threadsafe and I don't think it needs to be
++  private ImmutableBitmap getBitmap(ByteBuffer buf, int position)
++  {
++    Int2ObjectMap<ImmutableBitmap> bitmapMap = bitmaps.get(buf);
++    ImmutableBitmap bitmap = bitmapMap != null ? bitmapMap.get(position) : 
null;
++    if (bitmap != null) {
++      return bitmap;
++    }
++    return createNewBitmap(buf, position);
++  }
++
++  @Override
++  public Object get(ByteBuffer buf, int position)
++  {
++    return getBitmap(buf, position);
++  }
++
++  @Override
++  public float getFloat(ByteBuffer buf, int position)
++  {
++    throw new UnsupportedOperationException("DistinctCountBufferAggregator 
does not support getFloat()");
++  }
++
++  @Override
++  public long getLong(ByteBuffer buf, int position)
++  {
++    throw new UnsupportedOperationException("DistinctCountBufferAggregator 
does not support getLong()");
++  }
++
++  @Override
++  public void relocate(int oldPosition, int newPosition, ByteBuffer 
oldBuffer, ByteBuffer newBuffer)
++  {
++    createNewBitmap(newBuffer, newPosition);
++    Int2ObjectMap<ImmutableBitmap> bitmapMap = bitmaps.get(oldBuffer);
++    if (bitmapMap != null) {
++      bitmaps.get(newBuffer).put(newPosition, bitmapMap.get(oldPosition));
++      bitmapMap.remove(oldPosition);
++      if (bitmapMap.isEmpty()) {
++        bitmaps.remove(oldBuffer);
++      }
++    }
++  }
++
++  @Override
++  public void close()
++  {
++  }
++}
+diff --git 
a/extensions-contrib/kylin-distinccount/src/main/java/io/druid/query/aggregation/kylin.distinctcount/DistinctCountDruidModule.java
 
b/extensions-contrib/kylin-distinccount/src/main/java/io/druid/query/aggregation/kylin.distinctcount/DistinctCountDruidModule.java
+new file mode 100644
+index 000000000..c8e729ba7
+--- /dev/null
++++ 
b/extensions-contrib/kylin-distinccount/src/main/java/io/druid/query/aggregation/kylin.distinctcount/DistinctCountDruidModule.java
+@@ -0,0 +1,53 @@
++/*
++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. Metamarkets licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
++package io.druid.query.aggregation.kylin.distinctcount;
++
++import com.fasterxml.jackson.databind.Module;
++import com.fasterxml.jackson.databind.jsontype.NamedType;
++import com.fasterxml.jackson.databind.module.SimpleModule;
++import com.google.common.collect.ImmutableList;
++import com.google.inject.Binder;
++import io.druid.initialization.DruidModule;
++import io.druid.segment.serde.ComplexMetrics;
++
++import java.util.List;
++
++public class DistinctCountDruidModule implements DruidModule
++{
++  public static final String DISTINCT_COUNT = "kylin-distinctCount";
++
++  @Override
++  public List<? extends Module> getJacksonModules()
++  {
++    return ImmutableList.of(
++        new SimpleModule("KylinDistinctCountModule").registerSubtypes(
++            new NamedType(DistinctCountAggregatorFactory.class, 
DISTINCT_COUNT)
++        )
++    );
++  }
++
++  @Override
++  public void configure(Binder binder)
++  {
++    if (ComplexMetrics.getSerdeForType(DISTINCT_COUNT) == null) {
++      ComplexMetrics.registerSerde(DISTINCT_COUNT, new DistinctCountSerde());
++    }
++  }
++}
+diff --git 
a/extensions-contrib/kylin-distinccount/src/main/java/io/druid/query/aggregation/kylin.distinctcount/DistinctCountSerde.java
 
b/extensions-contrib/kylin-distinccount/src/main/java/io/druid/query/aggregation/kylin.distinctcount/DistinctCountSerde.java
+new file mode 100644
+index 000000000..3559ae1e0
+--- /dev/null
++++ 
b/extensions-contrib/kylin-distinccount/src/main/java/io/druid/query/aggregation/kylin.distinctcount/DistinctCountSerde.java
+@@ -0,0 +1,95 @@
++/*
++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. Metamarkets licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
++package io.druid.query.aggregation.kylin.distinctcount;
++
++import io.druid.collections.bitmap.ImmutableBitmap;
++import io.druid.data.input.InputRow;
++import io.druid.java.util.common.IAE;
++import io.druid.segment.GenericColumnSerializer;
++import io.druid.segment.column.ColumnBuilder;
++import io.druid.segment.data.GenericIndexed;
++import io.druid.segment.data.IOPeon;
++import io.druid.segment.data.ObjectStrategy;
++import io.druid.segment.data.RoaringBitmapSerdeFactory;
++import io.druid.segment.serde.ComplexColumnPartSupplier;
++import io.druid.segment.serde.ComplexMetricExtractor;
++import io.druid.segment.serde.ComplexMetricSerde;
++import io.druid.segment.serde.LargeColumnSupportedComplexColumnSerializer;
++
++import java.nio.ByteBuffer;
++
++public class DistinctCountSerde extends ComplexMetricSerde
++{
++
++  public DistinctCountSerde()
++  {
++  }
++
++  @Override
++  public String getTypeName()
++  {
++    return DistinctCountDruidModule.DISTINCT_COUNT;
++  }
++
++  @Override
++  public ComplexMetricExtractor getExtractor()
++  {
++    return new ComplexMetricExtractor()
++    {
++      @Override
++      public Class<ImmutableBitmap> extractedClass()
++      {
++        return ImmutableBitmap.class;
++      }
++
++      @Override
++      public ImmutableBitmap extractValue(InputRow inputRow, String 
metricName)
++      {
++        Object rawValue = inputRow.getRaw(metricName);
++
++        if (ImmutableBitmap.class.isAssignableFrom(rawValue.getClass())) {
++          return (ImmutableBitmap) rawValue;
++        } else {
++          throw new IAE("The class must be MutableBitmap");
++        }
++      }
++    };
++  }
++
++  @Override
++  public void deserializeColumn(ByteBuffer byteBuffer, ColumnBuilder 
columnBuilder)
++  {
++    final GenericIndexed column = GenericIndexed.read(byteBuffer, 
getObjectStrategy());
++    columnBuilder.setComplexColumn(new 
ComplexColumnPartSupplier(getTypeName(), column));
++  }
++
++  @Override
++  public ObjectStrategy getObjectStrategy()
++  {
++    RoaringBitmapSerdeFactory bitmapSerdeFactory = new 
RoaringBitmapSerdeFactory(true);
++    return bitmapSerdeFactory.getObjectStrategy();
++  }
++
++  @Override
++  public GenericColumnSerializer getSerializer(IOPeon peon, String column)
++  {
++    return LargeColumnSupportedComplexColumnSerializer.create(peon, column, 
this.getObjectStrategy());
++  }
++}
+diff --git 
a/extensions-contrib/kylin-distinccount/src/main/resources/META-INF/services/io.druid.initialization.DruidModule
 
b/extensions-contrib/kylin-distinccount/src/main/resources/META-INF/services/io.druid.initialization.DruidModule
+new file mode 100644
+index 000000000..abad7df15
+--- /dev/null
++++ 
b/extensions-contrib/kylin-distinccount/src/main/resources/META-INF/services/io.druid.initialization.DruidModule
+@@ -0,0 +1 @@
++io.druid.query.aggregation.kylin.distinctcount.DistinctCountDruidModule
+diff --git 
a/extensions-contrib/kylin-distinccount/src/test/java/io.druid.query.aggregation.kylin.distinctcount/DistinctCountBufferAggregatorTest.java
 
b/extensions-contrib/kylin-distinccount/src/test/java/io.druid.query.aggregation.kylin.distinctcount/DistinctCountBufferAggregatorTest.java
+new file mode 100644
+index 000000000..8bfd9b60a
+--- /dev/null
++++ 
b/extensions-contrib/kylin-distinccount/src/test/java/io.druid.query.aggregation.kylin.distinctcount/DistinctCountBufferAggregatorTest.java
+@@ -0,0 +1,71 @@
++/*
++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. Metamarkets licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
++package io.druid.query.aggregation.kylin.distinctcount;
++
++import com.google.common.collect.ImmutableMap;
++import io.druid.collections.bitmap.ImmutableBitmap;
++import io.druid.collections.bitmap.MutableBitmap;
++import io.druid.collections.bitmap.RoaringBitmapFactory;
++import io.druid.data.input.MapBasedRow;
++import io.druid.query.aggregation.AggregationTestHelper;
++import io.druid.query.groupby.GroupByQueryConfig;
++import io.druid.query.groupby.epinephelinae.GrouperTestUtil;
++import io.druid.query.groupby.epinephelinae.TestColumnSelectorFactory;
++import org.junit.Assert;
++import org.junit.Rule;
++import org.junit.Test;
++import org.junit.rules.TemporaryFolder;
++
++public class DistinctCountBufferAggregatorTest
++{
++  @Rule
++  public final TemporaryFolder tempFolder = new TemporaryFolder();
++
++  @Test
++  public void testRelocation()
++  {
++    DistinctCountDruidModule module = new DistinctCountDruidModule();
++    final AggregationTestHelper helper = 
AggregationTestHelper.createGroupByQueryAggregationTestHelper(
++        module.getJacksonModules(),
++        new GroupByQueryConfig(),
++        tempFolder
++    );
++    final TestColumnSelectorFactory columnSelectorFactory = 
GrouperTestUtil.newColumnSelectorFactory();
++    RoaringBitmapFactory bitmapFactory = new RoaringBitmapFactory(true);
++    MutableBitmap bitmap1 = bitmapFactory.makeEmptyMutableBitmap();
++    bitmap1.add(1);
++
++    columnSelectorFactory.setRow(new MapBasedRow(0,
++                                                 ImmutableMap.<String, 
Object>of(
++                                                     "kylin",
++                                                     
bitmapFactory.makeImmutableBitmap(bitmap1)
++                                                 )
++    ));
++    ImmutableBitmap[] bitmaps = helper.runRelocateVerificationTest(
++        new DistinctCountAggregatorFactory(
++            "kylin",
++            "kylin"
++        ),
++        columnSelectorFactory,
++        ImmutableBitmap.class
++    );
++    Assert.assertEquals(bitmaps[0].size(), bitmaps[1].size(), 0);
++  }
++}
+diff --git 
a/extensions-contrib/kylin-distinccount/src/test/java/io.druid.query.aggregation.kylin.distinctcount/DistinctCountGroupByQueryTest.java
 
b/extensions-contrib/kylin-distinccount/src/test/java/io.druid.query.aggregation.kylin.distinctcount/DistinctCountGroupByQueryTest.java
+new file mode 100644
+index 000000000..912189d57
+--- /dev/null
++++ 
b/extensions-contrib/kylin-distinccount/src/test/java/io.druid.query.aggregation.kylin.distinctcount/DistinctCountGroupByQueryTest.java
+@@ -0,0 +1,234 @@
++/*
++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. Metamarkets licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
++package io.druid.query.aggregation.kylin.distinctcount;
++
++import com.fasterxml.jackson.databind.Module;
++import com.fasterxml.jackson.databind.ObjectMapper;
++import com.google.common.collect.ImmutableMap;
++import com.google.common.collect.Lists;
++import io.druid.collections.bitmap.MutableBitmap;
++import io.druid.collections.bitmap.RoaringBitmapFactory;
++import io.druid.data.input.MapBasedInputRow;
++import io.druid.data.input.Row;
++import io.druid.jackson.DefaultObjectMapper;
++import io.druid.java.util.common.Intervals;
++import io.druid.query.QueryRunnerTestHelper;
++import io.druid.query.aggregation.AggregatorFactory;
++import io.druid.query.dimension.DefaultDimensionSpec;
++import io.druid.query.dimension.DimensionSpec;
++import io.druid.query.groupby.GroupByQuery;
++import io.druid.query.groupby.GroupByQueryConfig;
++import io.druid.query.groupby.GroupByQueryRunnerFactory;
++import io.druid.query.groupby.GroupByQueryRunnerTest;
++import io.druid.query.groupby.GroupByQueryRunnerTestHelper;
++import io.druid.query.groupby.orderby.DefaultLimitSpec;
++import io.druid.query.groupby.orderby.OrderByColumnSpec;
++import io.druid.segment.IndexIO;
++import io.druid.segment.IndexMerger;
++import io.druid.segment.IndexMergerTestBase;
++import io.druid.segment.IndexMergerV9;
++import io.druid.segment.IndexSpec;
++import io.druid.segment.QueryableIndex;
++import io.druid.segment.QueryableIndexSegment;
++import io.druid.segment.TestHelper;
++import io.druid.segment.column.ColumnConfig;
++import io.druid.segment.data.CompressedObjectStrategy;
++import io.druid.segment.data.CompressionFactory;
++import io.druid.segment.data.ConciseBitmapSerdeFactory;
++import io.druid.segment.incremental.IncrementalIndex;
++import io.druid.segment.incremental.IncrementalIndexSchema;
++import io.druid.segment.serde.ComplexMetrics;
++import org.apache.commons.io.FileUtils;
++import org.junit.Test;
++
++import java.io.File;
++import java.util.Arrays;
++import java.util.List;
++
++public class DistinctCountGroupByQueryTest
++{
++
++  private static final IndexSpec INDEX_SPEC = 
IndexMergerTestBase.makeIndexSpec(
++      new ConciseBitmapSerdeFactory(),
++      CompressedObjectStrategy.CompressionStrategy.LZ4,
++      CompressedObjectStrategy.CompressionStrategy.LZ4,
++      CompressionFactory.LongEncodingStrategy.LONGS
++  );
++  private static IndexMerger INDEX_MERGER;
++
++  private static ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
++
++  private static IndexIO INDEX_IO;
++
++  static {
++    if (ComplexMetrics.getSerdeForType("kylin-distinctCount") == null) {
++      ComplexMetrics.registerSerde("kylin-distinctCount", new 
DistinctCountSerde());
++    }
++
++    for (Module mod : new DistinctCountDruidModule().getJacksonModules()) {
++      JSON_MAPPER.registerModule(mod);
++    }
++
++    INDEX_IO = new IndexIO(JSON_MAPPER, new ColumnConfig()
++    {
++      @Override
++      public int columnCacheSizeBytes()
++      {
++        return 0;
++      }
++    });
++
++    INDEX_MERGER = new IndexMergerV9(JSON_MAPPER, INDEX_IO);
++  }
++
++  @Test
++  public void testGroupByWithDistinctCountAgg() throws Exception
++  {
++    final GroupByQueryConfig config = new GroupByQueryConfig();
++    config.setMaxIntermediateRows(100000000);
++    final GroupByQueryRunnerFactory factory = 
GroupByQueryRunnerTest.makeQueryRunnerFactory(config);
++
++    String visitor_id = "visitor_id";
++    String client_type = "client_type";
++    String kylin_distinctCount = "kylin";
++    long timestamp = System.currentTimeMillis();
++
++    IncrementalIndex index = new IncrementalIndex.Builder()
++        .setIndexSchema(
++            new IncrementalIndexSchema.Builder()
++                .withMetrics(new AggregatorFactory[]{
++                    new DistinctCountAggregatorFactory(
++                        "kylin",
++                        "kylin"
++                    )
++                })
++                .withRollup(false)
++                .build()
++        )
++        .setReportParseExceptions(false)
++        .setConcurrentEventAdd(true)
++        .setMaxRowCount(1000000)
++        .buildOnheap();
++
++    RoaringBitmapFactory bitmapFactory = new RoaringBitmapFactory(true);
++    MutableBitmap bitmap1 = bitmapFactory.makeEmptyMutableBitmap();
++    bitmap1.add(1);
++    bitmap1.add(2);
++    bitmap1.add(3);
++
++    MutableBitmap bitmap2 = bitmapFactory.makeEmptyMutableBitmap();
++    bitmap2.add(4);
++
++    MutableBitmap bitmap3 = bitmapFactory.makeEmptyMutableBitmap();
++    bitmap3.add(1);
++
++    index.add(new MapBasedInputRow(
++        timestamp,
++        Lists.newArrayList(visitor_id, client_type),
++        ImmutableMap.<String, Object>of(
++            visitor_id,
++            "0",
++            client_type,
++            "iphone",
++            kylin_distinctCount,
++            bitmapFactory.makeImmutableBitmap(bitmap1)
++        )
++    ));
++    index.add(new MapBasedInputRow(
++        timestamp,
++        Lists.newArrayList(visitor_id, client_type),
++        ImmutableMap.<String, Object>of(
++            visitor_id,
++            "0",
++            client_type,
++            "iphone",
++            kylin_distinctCount,
++            bitmapFactory.makeImmutableBitmap(bitmap2)
++        )
++    ));
++    index.add(new MapBasedInputRow(
++        timestamp,
++        Lists.newArrayList(visitor_id, client_type),
++        ImmutableMap.<String, Object>of(
++            visitor_id,
++            "0",
++            client_type,
++            "android",
++            kylin_distinctCount,
++            bitmapFactory.makeImmutableBitmap(bitmap3)
++        )
++    ));
++
++    final File finalFile = new File("tmp");
++    INDEX_MERGER.persist(
++        index,
++        Intervals.of("1970-01-01T00:00:00.000Z/2020-01-01T00:00:00.000Z"),
++        finalFile,
++        INDEX_SPEC
++    );
++
++    QueryableIndex queryableIndex = INDEX_IO.loadIndex(finalFile);
++    QueryableIndexSegment segment = new QueryableIndexSegment("index", 
queryableIndex);
++
++    GroupByQuery query = new 
GroupByQuery.Builder().setDataSource(QueryRunnerTestHelper.dataSource)
++                                                   
.setGranularity(QueryRunnerTestHelper.allGran)
++                                                   
.setDimensions(Arrays.<DimensionSpec>asList(new DefaultDimensionSpec(
++                                                       client_type,
++                                                       client_type
++                                                   )))
++                                                   
.setInterval(QueryRunnerTestHelper.fullOnInterval)
++                                                   .setLimitSpec(
++                                                       new DefaultLimitSpec(
++                                                           Lists.newArrayList(
++                                                               new 
OrderByColumnSpec(
++                                                                   
client_type,
++                                                                   
OrderByColumnSpec.Direction.DESCENDING
++                                                               )
++                                                           ), 10))
++                                                   
.setAggregatorSpecs(Lists.newArrayList(new DistinctCountAggregatorFactory(
++                                                       "kylin",
++                                                       "kylin"
++                                                   )))
++                                                   .build();
++
++    Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, 
factory.createRunner(segment), query);
++
++    bitmap1.or(bitmap2);
++    System.out.println("bitmap1 :" + bitmap1.size());
++    System.out.println("bitmap3 :" + bitmap3.size());
++
++    List<Row> expectedResults = Arrays.asList(
++        GroupByQueryRunnerTestHelper.createExpectedRow(
++            "1970-01-01T00:00:00.000Z",
++            client_type, "iphone",
++            "kylin", bitmap1
++        ),
++        GroupByQueryRunnerTestHelper.createExpectedRow(
++            "1970-01-01T00:00:00.000Z",
++            client_type, "android",
++            "kylin", bitmap3
++        )
++    );
++
++    TestHelper.assertExpectedObjects(expectedResults, results, 
"distinct-count");
++
++    FileUtils.deleteDirectory(finalFile);
++  }
++}
+diff --git a/extensions-contrib/kylin-extendcolumn/pom.xml 
b/extensions-contrib/kylin-extendcolumn/pom.xml
+new file mode 100644
+index 000000000..50e192cd3
+--- /dev/null
++++ b/extensions-contrib/kylin-extendcolumn/pom.xml
+@@ -0,0 +1,59 @@
++<?xml version="1.0" encoding="UTF-8"?>
++<!--
++ ~ Licensed to Metamarkets Group Inc. (Metamarkets) under one
++ ~ or more contributor license agreements. See the NOTICE file
++ ~ distributed with this work for additional information
++ ~ regarding copyright ownership. Metamarkets licenses this file
++ ~ to you under the Apache License, Version 2.0 (the
++ ~ "License"); you may not use this file except in compliance
++ ~ with the License. You may obtain a copy of the License at
++ ~
++ ~ http://www.apache.org/licenses/LICENSE-2.0
++ ~
++ ~ Unless required by applicable law or agreed to in writing,
++ ~ software distributed under the License is distributed on an
++ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ ~ KIND, either express or implied. See the License for the
++ ~ specific language governing permissions and limitations
++ ~ under the License.
++  -->
++
++<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
++  <modelVersion>4.0.0</modelVersion>
++
++  <groupId>io.druid.extensions.contrib</groupId>
++  <artifactId>druid-kylin-extendcolumn</artifactId>
++  <name>druid-kylin-extendcolumn</name>
++  <description>druid-kylin-extendcolumn</description>
++
++  <parent>
++    <groupId>io.druid</groupId>
++    <artifactId>druid</artifactId>
++    <version>0.11.1-SNAPSHOT</version>
++    <relativePath>../../pom.xml</relativePath>
++  </parent>
++
++  <dependencies>
++    <dependency>
++      <groupId>io.druid</groupId>
++      <artifactId>druid-processing</artifactId>
++      <version>${project.parent.version}</version>
++      <scope>provided</scope>
++    </dependency>
++
++    <!-- Tests -->
++    <dependency>
++      <groupId>io.druid</groupId>
++      <artifactId>druid-processing</artifactId>
++      <version>${project.parent.version}</version>
++      <scope>test</scope>
++      <type>test-jar</type>
++    </dependency>
++    <dependency>
++      <groupId>junit</groupId>
++      <artifactId>junit</artifactId>
++      <scope>test</scope>
++    </dependency>
++  </dependencies>
++
++</project>
+diff --git 
a/extensions-contrib/kylin-extendcolumn/src/main/java/io/druid/query/aggregation/kylin.extendcolumn/ExtendColumnAggregator.java
 
b/extensions-contrib/kylin-extendcolumn/src/main/java/io/druid/query/aggregation/kylin.extendcolumn/ExtendColumnAggregator.java
+new file mode 100644
+index 000000000..6260639a1
+--- /dev/null
++++ 
b/extensions-contrib/kylin-extendcolumn/src/main/java/io/druid/query/aggregation/kylin.extendcolumn/ExtendColumnAggregator.java
+@@ -0,0 +1,83 @@
++/*
++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. Metamarkets licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
++package io.druid.query.aggregation.kylin.extendcolumn;
++
++import io.druid.query.aggregation.Aggregator;
++import io.druid.segment.ObjectColumnSelector;
++
++public class ExtendColumnAggregator implements Aggregator
++{
++
++  private final ObjectColumnSelector selector;
++  private String result;
++
++  public ExtendColumnAggregator(ObjectColumnSelector selector)
++  {
++    this.selector = selector;
++  }
++
++  @Override
++  public void aggregate()
++  {
++    String tmp = (String) selector.getObject();
++
++    if (tmp == null || tmp.length() == 0) {
++      return;
++    }
++
++    result = tmp;
++  }
++
++  @Override
++  public void reset()
++  {
++    result = null;
++  }
++
++  @Override
++  public Object get()
++  {
++    return result;
++  }
++
++  @Override
++  public void close()
++  {
++    result = null;
++  }
++
++  @Override
++  public float getFloat()
++  {
++    throw new UnsupportedOperationException("ExtendColumnAggregator does not 
support getFloat()");
++  }
++
++  @Override
++  public long getLong()
++  {
++    throw new UnsupportedOperationException("ExtendColumnAggregator does not 
support getLong()");
++  }
++
++  @Override
++  public double getDouble()
++  {
++    throw new UnsupportedOperationException("ExtendColumnAggregator does not 
support getDouble()");
++  }
++}
+diff --git 
a/extensions-contrib/kylin-extendcolumn/src/main/java/io/druid/query/aggregation/kylin.extendcolumn/ExtendColumnAggregatorFactory.java
 
b/extensions-contrib/kylin-extendcolumn/src/main/java/io/druid/query/aggregation/kylin.extendcolumn/ExtendColumnAggregatorFactory.java
+new file mode 100644
+index 000000000..a732c31de
+--- /dev/null
++++ 
b/extensions-contrib/kylin-extendcolumn/src/main/java/io/druid/query/aggregation/kylin.extendcolumn/ExtendColumnAggregatorFactory.java
+@@ -0,0 +1,227 @@
++/*
++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. Metamarkets licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
++package io.druid.query.aggregation.kylin.extendcolumn;
++
++import com.fasterxml.jackson.annotation.JsonCreator;
++import com.fasterxml.jackson.annotation.JsonProperty;
++import com.google.common.base.Preconditions;
++
++import io.druid.java.util.common.IAE;
++import io.druid.java.util.common.StringUtils;
++import io.druid.query.aggregation.Aggregator;
++import io.druid.query.aggregation.AggregatorFactory;
++import io.druid.query.aggregation.AggregatorUtil;
++import io.druid.query.aggregation.BufferAggregator;
++import io.druid.segment.ColumnSelectorFactory;
++import io.druid.segment.ObjectColumnSelector;
++
++import java.nio.ByteBuffer;
++import java.util.Arrays;
++import java.util.Comparator;
++import java.util.List;
++
++public class ExtendColumnAggregatorFactory extends AggregatorFactory
++{
++  private static final byte CACHE_TYPE_ID = 26;
++
++  private final String name;
++  private final String fieldName;
++  private final Integer precision;
++
++  public static final String EXTEND_COLUMN = "kylin-extendcolumn";
++
++  @JsonCreator
++  public ExtendColumnAggregatorFactory(
++      @JsonProperty("name") String name,
++      @JsonProperty("fieldName") String fieldName,
++      @JsonProperty("precision") Integer precision
++  )
++  {
++    Preconditions.checkNotNull(name);
++    Preconditions.checkNotNull(fieldName);
++    this.name = name;
++    this.fieldName = fieldName;
++    this.precision = precision;
++  }
++
++  @Override
++  public Aggregator factorize(ColumnSelectorFactory columnFactory)
++  {
++    ObjectColumnSelector selector = 
columnFactory.makeObjectColumnSelector(fieldName);
++    if (selector == null) {
++      throw new IAE("selector in ExtendColumnAggregatorFactory should not be 
Null");
++    } else {
++      return new ExtendColumnAggregator(selector);
++    }
++  }
++
++  @Override
++  public BufferAggregator factorizeBuffered(ColumnSelectorFactory 
columnFactory)
++  {
++    ObjectColumnSelector selector = 
columnFactory.makeObjectColumnSelector(fieldName);
++
++    final Class classOfObject = selector.classOfObject();
++    if (!classOfObject.equals(Object.class) && 
!String.class.isAssignableFrom(classOfObject)) {
++      throw new IAE("Incompatible type for metric[%s], expected a 
ExtendByteArray, got a %s", fieldName, classOfObject);
++    }
++
++    return new ExtendColumnBufferAggregator(selector);
++  }
++
++  @Override
++  public Comparator getComparator()
++  {
++    return new Comparator()
++    {
++      @Override
++      public int compare(Object o1, Object o2)
++      {
++        String left = (String) o1;
++
++        if (left == null || left.length() == 0) {
++          return -1;
++        }
++
++        return 1;
++      }
++    };
++  }
++
++  @Override
++  public Object combine(Object lhs, Object rhs)
++  {
++    String left = (String) lhs;
++
++    if (left == null || left.length() == 0) {
++      return rhs;
++    }
++
++    return left;
++  }
++
++  @Override
++  public AggregatorFactory getCombiningFactory()
++  {
++    return new ExtendColumnAggregatorFactory(name, name, precision);
++  }
++
++  @Override
++  public List<AggregatorFactory> getRequiredColumns()
++  {
++    return Arrays.<AggregatorFactory>asList(new 
ExtendColumnAggregatorFactory(fieldName, fieldName, precision));
++  }
++
++  @Override
++  public Object deserialize(Object object)
++  {
++    return object;
++  }
++
++  @Override
++  public Object finalizeComputation(Object object)
++  {
++    return object;
++  }
++
++  @JsonProperty
++  public String getFieldName()
++  {
++    return fieldName;
++  }
++
++  @Override
++  @JsonProperty
++  public String getName()
++  {
++    return name;
++  }
++
++  @JsonProperty
++  public Integer getPrecision()
++  {
++    return precision;
++  }
++
++  @Override
++  public List<String> requiredFields()
++  {
++    return Arrays.asList(fieldName);
++  }
++
++  @Override
++  public byte[] getCacheKey()
++  {
++    byte[] fieldNameBytes = StringUtils.toUtf8(fieldName);
++    return ByteBuffer.allocate(2 + fieldNameBytes.length)
++                     .put(CACHE_TYPE_ID)
++                     .put(fieldNameBytes)
++                     .put(AggregatorUtil.STRING_SEPARATOR)
++                     .array();
++  }
++
++  @Override
++  public String getTypeName()
++  {
++    return EXTEND_COLUMN;
++  }
++
++  @Override
++  public int getMaxIntermediateSize()
++  {
++    return precision + 2;
++  }
++
++  @Override
++  public boolean equals(Object o)
++  {
++    if (this == o) {
++      return true;
++    }
++    if (o == null || getClass() != o.getClass()) {
++      return false;
++    }
++
++    ExtendColumnAggregatorFactory that = (ExtendColumnAggregatorFactory) o;
++
++    if (!fieldName.equals(that.fieldName)) {
++      return false;
++    }
++
++    if (!name.equals(that.name)) {
++      return false;
++    }
++
++    return true;
++  }
++
++  @Override
++  public int hashCode()
++  {
++    int result = name.hashCode();
++    result = 31 * result + fieldName.hashCode();
++    return result;
++  }
++
++  @Override
++  public String toString()
++  {
++    return "ExtendColumnAggregatorFactory {" + "name='" + name + '\'' + ", 
fieldName='" + fieldName + '\'' + '}';
++  }
++}
+diff --git 
a/extensions-contrib/kylin-extendcolumn/src/main/java/io/druid/query/aggregation/kylin.extendcolumn/ExtendColumnBufferAggregator.java
 
b/extensions-contrib/kylin-extendcolumn/src/main/java/io/druid/query/aggregation/kylin.extendcolumn/ExtendColumnBufferAggregator.java
+new file mode 100644
+index 000000000..55a57bc34
+--- /dev/null
++++ 
b/extensions-contrib/kylin-extendcolumn/src/main/java/io/druid/query/aggregation/kylin.extendcolumn/ExtendColumnBufferAggregator.java
+@@ -0,0 +1,117 @@
++/*
++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. Metamarkets licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
++package io.druid.query.aggregation.kylin.extendcolumn;
++
++import io.druid.query.aggregation.BufferAggregator;
++import io.druid.segment.ObjectColumnSelector;
++
++import java.nio.ByteBuffer;
++import java.nio.charset.Charset;
++
++public class ExtendColumnBufferAggregator implements BufferAggregator
++{
++  private final ObjectColumnSelector selector;
++  private static final Charset UTF8_CHARSET = Charset.forName("UTF-8");
++
++  public ExtendColumnBufferAggregator(ObjectColumnSelector selector)
++  {
++    this.selector = selector;
++  }
++
++  @Override
++  public void init(ByteBuffer buf, int position)
++  {
++    int oldPosition = buf.position();
++
++    buf.position(position);
++    buf.putShort((short) 0);
++
++    buf.position(oldPosition);
++  }
++
++  @Override
++  public void aggregate(ByteBuffer buf, int position)
++  {
++    String tmp = (String) selector.getObject();
++
++    if (tmp == null || tmp.length() == 0) {
++      return;
++    }
++
++    int oldPosition = buf.position();
++
++    byte[] bytes = tmp.getBytes(UTF8_CHARSET);
++    short size = (short) bytes.length;
++
++    buf.position(position);
++    buf.putShort(size);
++    buf.put(bytes);
++
++    buf.position(oldPosition);
++  }
++
++  @Override
++  public Object get(ByteBuffer buf, int position)
++  {
++    int oldPosition = buf.position();
++    int oldLimit = buf.limit();
++
++    buf.position(position);
++    short size = buf.getShort();
++    if (size == 0) {
++      buf.position(oldPosition);
++      return null;
++    }
++
++    buf.limit(position + size + 2);
++
++    byte[] bytes = new byte[buf.remaining()];
++    buf.get(bytes, 0, bytes.length);
++    String result = new String(bytes, 0, bytes.length, UTF8_CHARSET);
++
++    buf.limit(oldLimit);
++    buf.position(oldPosition);
++
++    return result;
++  }
++
++  @Override
++  public float getFloat(ByteBuffer buf, int position)
++  {
++    throw new UnsupportedOperationException("ExtendColumnBufferAggregator 
does not support getFloat()");
++  }
++
++  @Override
++  public long getLong(ByteBuffer buf, int position)
++  {
++    throw new UnsupportedOperationException("ExtendColumnBufferAggregator 
does not support getLong()");
++  }
++
++  @Override
++  public double getDouble(ByteBuffer buf, int position)
++  {
++    throw new UnsupportedOperationException("ExtendColumnBufferAggregator 
does not support getDouble()");
++  }
++
++  @Override
++  public void close()
++  {
++  }
++}
+diff --git 
a/extensions-contrib/kylin-extendcolumn/src/main/java/io/druid/query/aggregation/kylin.extendcolumn/ExtendColumnDruidModule.java
 
b/extensions-contrib/kylin-extendcolumn/src/main/java/io/druid/query/aggregation/kylin.extendcolumn/ExtendColumnDruidModule.java
+new file mode 100644
+index 000000000..1c383d97a
+--- /dev/null
++++ 
b/extensions-contrib/kylin-extendcolumn/src/main/java/io/druid/query/aggregation/kylin.extendcolumn/ExtendColumnDruidModule.java
+@@ -0,0 +1,51 @@
++/*
++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. Metamarkets licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
++package io.druid.query.aggregation.kylin.extendcolumn;
++
++import com.fasterxml.jackson.databind.Module;
++import com.fasterxml.jackson.databind.jsontype.NamedType;
++import com.fasterxml.jackson.databind.module.SimpleModule;
++import com.google.common.collect.ImmutableList;
++import com.google.inject.Binder;
++import io.druid.initialization.DruidModule;
++import io.druid.segment.serde.ComplexMetrics;
++
++import java.util.List;
++
++public class ExtendColumnDruidModule implements DruidModule
++{
++  @Override
++  public List<? extends Module> getJacksonModules()
++  {
++    return ImmutableList.of(
++        new SimpleModule("KylinExtendColumnModule").registerSubtypes(
++            new NamedType(ExtendColumnAggregatorFactory.class, 
ExtendColumnAggregatorFactory.EXTEND_COLUMN)
++        )
++    );
++  }
++
++  @Override
++  public void configure(Binder binder)
++  {
++    if 
(ComplexMetrics.getSerdeForType(ExtendColumnAggregatorFactory.EXTEND_COLUMN) == 
null) {
++      
ComplexMetrics.registerSerde(ExtendColumnAggregatorFactory.EXTEND_COLUMN, new 
ExtendColumnSerde());
++    }
++  }
++}
+diff --git 
a/extensions-contrib/kylin-extendcolumn/src/main/java/io/druid/query/aggregation/kylin.extendcolumn/ExtendColumnSerde.java
 
b/extensions-contrib/kylin-extendcolumn/src/main/java/io/druid/query/aggregation/kylin.extendcolumn/ExtendColumnSerde.java
+new file mode 100644
+index 000000000..9969d232b
+--- /dev/null
++++ 
b/extensions-contrib/kylin-extendcolumn/src/main/java/io/druid/query/aggregation/kylin.extendcolumn/ExtendColumnSerde.java
+@@ -0,0 +1,139 @@
++/*
++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. Metamarkets licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
++package io.druid.query.aggregation.kylin.extendcolumn;
++
++import io.druid.data.input.InputRow;
++import io.druid.java.util.common.IAE;
++import io.druid.segment.GenericColumnSerializer;
++import io.druid.segment.column.ColumnBuilder;
++import io.druid.segment.data.GenericIndexed;
++import io.druid.segment.data.IOPeon;
++import io.druid.segment.data.ObjectStrategy;
++import io.druid.segment.serde.ComplexColumnPartSupplier;
++import io.druid.segment.serde.ComplexMetricExtractor;
++import io.druid.segment.serde.ComplexMetricSerde;
++import io.druid.segment.serde.LargeColumnSupportedComplexColumnSerializer;
++
++import java.nio.ByteBuffer;
++import java.nio.charset.Charset;
++
++public class ExtendColumnSerde extends ComplexMetricSerde
++{
++
++  private static final Charset UTF8_CHARSET = Charset.forName("UTF-8");
++
++  public ExtendColumnSerde()
++  {
++  }
++
++  @Override
++  public String getTypeName()
++  {
++    return ExtendColumnAggregatorFactory.EXTEND_COLUMN;
++  }
++
++  @Override
++  public ComplexMetricExtractor getExtractor()
++  {
++    return new ComplexMetricExtractor()
++    {
++      @Override
++      public Class<String> extractedClass()
++      {
++        return String.class;
++      }
++
++      @Override
++      public String extractValue(InputRow inputRow, String metricName)
++      {
++        Object rawValue = inputRow.getRaw(metricName);
++
++        if (String.class.isAssignableFrom(rawValue.getClass())) {
++          return (String) rawValue;
++        } else {
++          throw new IAE("The class must be ExtendByteArray");
++        }
++      }
++    };
++  }
++
++  @Override
++  public void deserializeColumn(ByteBuffer byteBuffer, ColumnBuilder 
columnBuilder)
++  {
++    final GenericIndexed column = GenericIndexed.read(byteBuffer, 
getObjectStrategy());
++    columnBuilder.setComplexColumn(new 
ComplexColumnPartSupplier(getTypeName(), column));
++  }
++
++  @Override
++  public ObjectStrategy getObjectStrategy()
++  {
++    return new ObjectStrategy()
++    {
++      @Override
++      public Class getClazz()
++      {
++        return String.class;
++      }
++
++      @Override
++      public Object fromByteBuffer(ByteBuffer buffer, int numBytes)
++      {
++        final ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer();
++        readOnlyBuffer.limit(readOnlyBuffer.position() + numBytes);
++
++        byte[] bytes = new byte[readOnlyBuffer.remaining()];
++        readOnlyBuffer.get(bytes, 0, bytes.length);
++        return new String(bytes, 0, bytes.length, UTF8_CHARSET);
++      }
++
++      @Override
++      public byte[] toBytes(Object val)
++      {
++        if (val == null) {
++          return new byte[]{};
++        }
++
++        if (val instanceof String) {
++          return ((String) val).getBytes(UTF8_CHARSET);
++        } else {
++          throw new IAE("Unknown class[%s], toString[%s]", val.getClass(), 
val);
++        }
++      }
++
++      @Override
++      public int compare(Object o1, Object o2)
++      {
++        String left = (String) o1;
++
++        if (left == null || left.length() == 0) {
++          return -1;
++        }
++
++        return 1;
++      }
++    };
++  }
++
++  @Override
++  public GenericColumnSerializer getSerializer(IOPeon peon, String column)
++  {
++    return LargeColumnSupportedComplexColumnSerializer.create(peon, column, 
this.getObjectStrategy());
++  }
++}
+diff --git 
a/extensions-contrib/kylin-extendcolumn/src/main/resources/META-INF/services/io.druid.initialization.DruidModule
 
b/extensions-contrib/kylin-extendcolumn/src/main/resources/META-INF/services/io.druid.initialization.DruidModule
+new file mode 100644
+index 000000000..18982d1ae
+--- /dev/null
++++ 
b/extensions-contrib/kylin-extendcolumn/src/main/resources/META-INF/services/io.druid.initialization.DruidModule
+@@ -0,0 +1 @@
++io.druid.query.aggregation.kylin.extendcolumn.ExtendColumnDruidModule
+diff --git 
a/extensions-contrib/kylin-extendcolumn/src/test/java/io.druid.query.aggregation.kylin.extendcolumn/ExtendColumnGroupByQueryTest.java
 
b/extensions-contrib/kylin-extendcolumn/src/test/java/io.druid.query.aggregation.kylin.extendcolumn/ExtendColumnGroupByQueryTest.java
+new file mode 100644
+index 000000000..2267d07d3
+--- /dev/null
++++ 
b/extensions-contrib/kylin-extendcolumn/src/test/java/io.druid.query.aggregation.kylin.extendcolumn/ExtendColumnGroupByQueryTest.java
+@@ -0,0 +1,229 @@
++/*
++ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. Metamarkets licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
++package io.druid.query.aggregation.kylin.extendcolumn;
++
++import com.fasterxml.jackson.databind.Module;
++import com.fasterxml.jackson.databind.ObjectMapper;
++import com.google.common.collect.ImmutableMap;
++import com.google.common.collect.Lists;
++import io.druid.data.input.MapBasedInputRow;
++import io.druid.data.input.Row;
++import io.druid.jackson.DefaultObjectMapper;
++import io.druid.java.util.common.Intervals;
++import io.druid.query.QueryRunnerTestHelper;
++import io.druid.query.aggregation.AggregatorFactory;
++import io.druid.query.dimension.DefaultDimensionSpec;
++import io.druid.query.dimension.DimensionSpec;
++import io.druid.query.groupby.GroupByQuery;
++import io.druid.query.groupby.GroupByQueryConfig;
++import io.druid.query.groupby.GroupByQueryRunnerFactory;
++import io.druid.query.groupby.GroupByQueryRunnerTest;
++import io.druid.query.groupby.GroupByQueryRunnerTestHelper;
++import io.druid.query.groupby.orderby.DefaultLimitSpec;
++import io.druid.query.groupby.orderby.OrderByColumnSpec;
++import io.druid.segment.IndexIO;
++import io.druid.segment.IndexMerger;
++import io.druid.segment.IndexMergerTestBase;
++import io.druid.segment.IndexMergerV9;
++import io.druid.segment.IndexSpec;
++import io.druid.segment.QueryableIndex;
++import io.druid.segment.QueryableIndexSegment;
++import io.druid.segment.TestHelper;
++import io.druid.segment.column.ColumnConfig;
++import io.druid.segment.data.CompressedObjectStrategy;
++import io.druid.segment.data.CompressionFactory;
++import io.druid.segment.data.ConciseBitmapSerdeFactory;
++import io.druid.segment.incremental.IncrementalIndex;
++import io.druid.segment.incremental.IncrementalIndexSchema;
++import io.druid.segment.serde.ComplexMetrics;
++import org.apache.commons.io.FileUtils;
++import org.junit.Test;
++
++import java.io.File;
++import java.nio.charset.Charset;
++import java.util.Arrays;
++import java.util.List;
++
++public class ExtendColumnGroupByQueryTest
++{
++
++  private static final IndexSpec INDEX_SPEC = 
IndexMergerTestBase.makeIndexSpec(
++      new ConciseBitmapSerdeFactory(),
++      CompressedObjectStrategy.CompressionStrategy.LZ4,
++      CompressedObjectStrategy.CompressionStrategy.LZ4,
++      CompressionFactory.LongEncodingStrategy.LONGS
++  );
++  private static IndexMerger INDEX_MERGER;
++
++  private static ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
++
++  private static IndexIO INDEX_IO;
++
++  static {
++    if 
(ComplexMetrics.getSerdeForType(ExtendColumnAggregatorFactory.EXTEND_COLUMN) == 
null) {
++      
ComplexMetrics.registerSerde(ExtendColumnAggregatorFactory.EXTEND_COLUMN, new 
ExtendColumnSerde());
++    }
++
++    for (Module mod : new ExtendColumnDruidModule().getJacksonModules()) {
++      JSON_MAPPER.registerModule(mod);
++    }
++
++    INDEX_IO = new IndexIO(JSON_MAPPER, new ColumnConfig()
++    {
++      @Override
++      public int columnCacheSizeBytes()
++      {
++        return 0;
++      }
++    });
++
++    INDEX_MERGER = new IndexMergerV9(JSON_MAPPER, INDEX_IO);
++  }
++
++  @Test
++  public void testGroupByWithDistinctCountAgg() throws Exception
++  {
++    final GroupByQueryConfig config = new GroupByQueryConfig();
++    config.setMaxIntermediateRows(100000000);
++    final GroupByQueryRunnerFactory factory = 
GroupByQueryRunnerTest.makeQueryRunnerFactory(config);
++
++    String visitor_id = "visitor_id";
++    String client_type = "client_type";
++    String kylin_extendcolumn = "kylin";
++    long timestamp = System.currentTimeMillis();
++
++    IncrementalIndex index = new IncrementalIndex.Builder()
++        .setIndexSchema(
++            new IncrementalIndexSchema.Builder()
++                .withMetrics(new AggregatorFactory[]{
++                    new ExtendColumnAggregatorFactory(
++                        "kylin",
++                        "kylin",
++                        100
++                    )
++                })
++                .withRollup(false)
++                .build()
++        )
++        .setReportParseExceptions(false)
++        .setConcurrentEventAdd(true)
++        .setMaxRowCount(1000000)
++        .buildOnheap();
++
++    final Charset UTF8_CHARSET = Charset.forName("UTF-8");
++
++    byte[] bytes1 = new byte[]{};
++
++    String array1 = new String(bytes1, 0, bytes1.length, UTF8_CHARSET);
++
++    byte[] bytes2 = new String("中国").getBytes(UTF8_CHARSET);
++    String array2 = new String(bytes2, UTF8_CHARSET);
++    String array3 = new String("342");
++
++    index.add(new MapBasedInputRow(
++        timestamp,
++        Lists.newArrayList(visitor_id, client_type),
++        ImmutableMap.<String, Object>of(
++            visitor_id,
++            "0",
++            client_type,
++            "iphone",
++            kylin_extendcolumn,
++            array1
++        )
++    ));
++    index.add(new MapBasedInputRow(
++        timestamp,
++        Lists.newArrayList(visitor_id, client_type),
++        ImmutableMap.<String, Object>of(
++            visitor_id,
++            "0",
++            client_type,
++            "iphone",
++            kylin_extendcolumn,
++            array2
++        )
++    ));
++    index.add(new MapBasedInputRow(
++        timestamp,
++        Lists.newArrayList(visitor_id, client_type),
++        ImmutableMap.<String, Object>of(
++            visitor_id,
++            "0",
++            client_type,
++            "android",
++            kylin_extendcolumn,
++            array3
++        )
++    ));
++
++    final File finalFile = new File("tmp");
++    INDEX_MERGER.persist(
++        index,
++        Intervals.of("1970-01-01T00:00:00.000Z/2020-01-01T00:00:00.000Z"),
++        finalFile,
++        INDEX_SPEC
++    );
++
++    QueryableIndex queryableIndex = INDEX_IO.loadIndex(finalFile);
++    QueryableIndexSegment segment = new QueryableIndexSegment("index", 
queryableIndex);
++
++    GroupByQuery query = new 
GroupByQuery.Builder().setDataSource(QueryRunnerTestHelper.dataSource)
++                                                   
.setGranularity(QueryRunnerTestHelper.allGran)
++                                                   
.setDimensions(Arrays.<DimensionSpec>asList(new DefaultDimensionSpec(
++                                                       client_type,
++                                                       client_type
++                                                   )))
++                                                   
.setInterval(QueryRunnerTestHelper.fullOnInterval)
++                                                   .setLimitSpec(
++                                                       new DefaultLimitSpec(
++                                                           Lists.newArrayList(
++                                                               new 
OrderByColumnSpec(
++                                                                   
client_type,
++                                                                   
OrderByColumnSpec.Direction.DESCENDING
++                                                               )
++                                                           ), 10))
++                                                   
.setAggregatorSpecs(Lists.newArrayList(new ExtendColumnAggregatorFactory(
++                                                       "kylin",
++                                                       "kylin",
++                                                       100
++                                                   )))
++                                                   .build();
++
++    Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, 
factory.createRunner(segment), query);
++
++    List<Row> expectedResults = Arrays.asList(
++        GroupByQueryRunnerTestHelper.createExpectedRow(
++            "1970-01-01T00:00:00.000Z",
++            client_type, "iphone",
++            "kylin", array2
++        ),
++        GroupByQueryRunnerTestHelper.createExpectedRow(
++            "1970-01-01T00:00:00.000Z",
++            client_type, "android",
++            "kylin", array3
++        )
++    );
++
++    TestHelper.assertExpectedObjects(expectedResults, results, 
"extend-column");
++
++    FileUtils.deleteDirectory(finalFile);
++  }
++}
+diff --git a/pom.xml b/pom.xml
+index bc5e967b3..59cfeb6d1 100644
+--- a/pom.xml
++++ b/pom.xml
+@@ -130,6 +130,9 @@
+         <module>extensions-contrib/rabbitmq</module>
+         <module>extensions-contrib/distinctcount</module>
+         <module>extensions-contrib/parquet-extensions</module>
++        <module>extensions-contrib/kylin-distinccount</module>
++        <module>extensions-contrib/kylin-extendcolumn</module>
++        <module>extensions-contrib/decimal</module>
+         <module>extensions-contrib/statsd-emitter</module>
+         <module>extensions-contrib/orc-extensions</module>
+         <module>extensions-contrib/time-min-max</module>
+diff --git 
a/processing/src/main/java/io/druid/jackson/DruidDefaultSerializersModule.java 
b/processing/src/main/java/io/druid/jackson/DruidDefaultSerializersModule.java
+index 632777f8d..5d81f3964 100644
+--- 
a/processing/src/main/java/io/druid/jackson/DruidDefaultSerializersModule.java
++++ 
b/processing/src/main/java/io/druid/jackson/DruidDefaultSerializersModule.java
+@@ -35,6 +35,7 @@ import io.druid.java.util.common.guava.Yielder;
+ import org.joda.time.DateTimeZone;
+ 
+ import java.io.IOException;
++import java.math.BigDecimal;
+ import java.nio.ByteOrder;
+ import java.util.TimeZone;
+ 
+@@ -48,6 +49,23 @@ public class DruidDefaultSerializersModule extends 
SimpleModule
+ 
+     JodaStuff.register(this);
+ 
++    addSerializer(
++            BigDecimal.class,
++            new JsonSerializer<BigDecimal>()
++            {
++              @Override
++              public void serialize(
++                      BigDecimal bigDecimal,
++                      JsonGenerator jsonGenerator,
++                      SerializerProvider serializerProvider
++              )
++                      throws IOException, JsonProcessingException
++              {
++                jsonGenerator.writeString(bigDecimal.toString());
++              }
++            }
++    );
++
+     addDeserializer(
+         DateTimeZone.class,
+         new JsonDeserializer<DateTimeZone>()
+diff --git 
a/processing/src/test/java/io/druid/segment/IndexMergerTestBase.java 
b/processing/src/test/java/io/druid/segment/IndexMergerTestBase.java
+index f5196d062..9a96bd060 100644
+--- a/processing/src/test/java/io/druid/segment/IndexMergerTestBase.java
++++ b/processing/src/test/java/io/druid/segment/IndexMergerTestBase.java
+@@ -109,7 +109,7 @@ public class IndexMergerTestBase
+     );
+   }
+ 
+-  static IndexSpec makeIndexSpec(
++  public static IndexSpec makeIndexSpec(
+       BitmapSerdeFactory bitmapSerdeFactory,
+       CompressedObjectStrategy.CompressionStrategy compressionStrategy,
+       CompressedObjectStrategy.CompressionStrategy dimCompressionStrategy,
+diff --git a/processing/src/test/java/io/druid/segment/TestHelper.java 
b/processing/src/test/java/io/druid/segment/TestHelper.java
+index 5ca389305..6ca65ebc7 100644
+--- a/processing/src/test/java/io/druid/segment/TestHelper.java
++++ b/processing/src/test/java/io/druid/segment/TestHelper.java
+@@ -22,6 +22,7 @@ package io.druid.segment;
+ import com.fasterxml.jackson.databind.InjectableValues;
+ import com.fasterxml.jackson.databind.ObjectMapper;
+ import com.google.common.collect.Lists;
++import io.druid.collections.bitmap.ImmutableBitmap;
+ import io.druid.data.input.MapBasedRow;
+ import io.druid.data.input.Row;
+ import io.druid.jackson.DefaultObjectMapper;
+@@ -318,7 +319,13 @@ public class TestHelper
+       final Object expectedValue = expectedMap.get(key);
+       final Object actualValue = actualMap.get(key);
+ 
+-      if (expectedValue instanceof Float || expectedValue instanceof Double) {
++      if (expectedValue instanceof ImmutableBitmap || actualValue instanceof 
ImmutableBitmap) {
++        Assert.assertEquals(
++            StringUtils.format("%s: key[%s]", msg, key),
++            ((ImmutableBitmap) expectedValue).size(),
++            ((ImmutableBitmap) actualValue).size()
++        );
++      } else if (expectedValue instanceof Float || expectedValue instanceof 
Double) {
+         Assert.assertEquals(
+             StringUtils.format("%s: key[%s]", msg, key),
+             ((Number) expectedValue).doubleValue(),
+-- 
+2.17.2 (Apple Git-113)
+

Reply via email to