This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new bd86e7bdc6 Add support for Apache Datasketches CPC sketch (#11774)
bd86e7bdc6 is described below

commit bd86e7bdc6df22846207b9f496ccab703e0ad462
Author: David Cromberge <[email protected]>
AuthorDate: Wed Oct 18 21:37:27 2023 +0100

    Add support for Apache Datasketches CPC sketch (#11774)
---
 .../function/AggregationFunctionTypeTest.java      |   8 +-
 .../apache/pinot/core/common/ObjectSerDeUtils.java |  31 +-
 .../core/function/scalar/SketchFunctions.java      | 116 ++++-
 .../function/AggregationFunctionFactory.java       |   4 +
 .../DistinctCountCPCSketchAggregationFunction.java | 540 +++++++++++++++++++++
 ...stinctCountRawCPCSketchAggregationFunction.java |  53 ++
 ....java => DistinctCountCPCSketchAggregator.java} |  32 +-
 .../DistinctCountThetaSketchAggregator.java        |   6 +-
 .../aggregator/ValueAggregatorFactory.java         |   3 +
 .../pinot/core/common/ObjectSerDeUtilsTest.java    |  18 +
 .../core/function/scalar/SketchFunctionsTest.java  |  18 +-
 .../v2/DistinctCountCPCSketchStarTreeV2Test.java   |  52 ++
 .../DistinctCountThetaSketchQueriesTest.java       |   2 +-
 .../integration/tests/custom/CpcSketchTest.java    | 198 ++++++++
 .../DistinctCountCPCSketchValueAggregator.java     | 193 ++++++++
 .../DistinctCountThetaSketchValueAggregator.java   |   4 +-
 .../local/aggregator/ValueAggregatorFactory.java   |   6 +
 .../local/customobject/SerializedCPCSketch.java    |  46 ++
 .../segment/local/utils/CustomSerDeUtils.java      |  23 +-
 .../segment/local/utils/TableConfigUtils.java      |  10 +-
 .../DistinctCountCPCSketchValueAggregatorTest.java | 161 ++++++
 .../pinot/segment/spi/AggregationFunctionType.java | 103 ++--
 .../apache/pinot/spi/utils/CommonConstants.java    |   2 +
 .../org/apache/pinot/tools/SegmentDumpTool.java    |  19 +-
 24 files changed, 1554 insertions(+), 94 deletions(-)

diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/function/AggregationFunctionTypeTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/function/AggregationFunctionTypeTest.java
index f7a9ca16b0..b12cda67c9 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/function/AggregationFunctionTypeTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/function/AggregationFunctionTypeTest.java
@@ -34,9 +34,9 @@ public class AggregationFunctionTypeTest {
     
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("AvG"), 
AggregationFunctionType.AVG);
     
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("MoDe"), 
AggregationFunctionType.MODE);
     
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("FiRsTwItHtImE"),
-            AggregationFunctionType.FIRSTWITHTIME);
+        AggregationFunctionType.FIRSTWITHTIME);
     
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("LaStWiThTiMe"),
-            AggregationFunctionType.LASTWITHTIME);
+        AggregationFunctionType.LASTWITHTIME);
     
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("MiNmAxRaNgE"),
         AggregationFunctionType.MINMAXRANGE);
     
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("DiStInCtCoUnT"),
@@ -79,6 +79,10 @@ public class AggregationFunctionTypeTest {
         AggregationFunctionType.PERCENTILERAWEST);
     
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("PeRcEnTiLeRaWtDiGeSt99"),
         AggregationFunctionType.PERCENTILERAWTDIGEST);
+    
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("DiStInCtCoUnTcPcSkEtCh"),
+        AggregationFunctionType.DISTINCTCOUNTCPCSKETCH);
+    
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("DiStInCtCoUnTrAwCpCsKeTcH"),
+        AggregationFunctionType.DISTINCTCOUNTRAWCPCSKETCH);
   }
 
   @Test(expectedExceptions = IllegalArgumentException.class)
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java 
b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
index 3b7d187bc7..45f1ebd489 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
@@ -59,6 +59,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.datasketches.common.ArrayOfStringsSerDe;
+import org.apache.datasketches.cpc.CpcSketch;
 import org.apache.datasketches.frequencies.ItemsSketch;
 import org.apache.datasketches.frequencies.LongsSketch;
 import org.apache.datasketches.kll.KllDoublesSketch;
@@ -142,8 +143,8 @@ public class ObjectSerDeUtils {
     IntegerTupleSketch(37),
     FrequentStringsSketch(38),
     FrequentLongsSketch(39),
-    HyperLogLogPlus(40);
-
+    HyperLogLogPlus(40),
+    CompressedProbabilisticCounting(41);
 
     private final int _value;
 
@@ -241,6 +242,8 @@ public class ObjectSerDeUtils {
         return ObjectType.FrequentLongsSketch;
       } else if (value instanceof HyperLogLogPlus) {
         return ObjectType.HyperLogLogPlus;
+      } else if (value instanceof CpcSketch) {
+        return ObjectType.CompressedProbabilisticCounting;
       } else {
         throw new IllegalArgumentException("Unsupported type of value: " + 
value.getClass().getSimpleName());
       }
@@ -952,7 +955,7 @@ public class ObjectSerDeUtils {
     }
   };
 
-  public static final ObjectSerDe<Sketch> DATA_SKETCH_SER_DE = new 
ObjectSerDe<Sketch>() {
+  public static final ObjectSerDe<Sketch> DATA_SKETCH_THETA_SER_DE = new 
ObjectSerDe<Sketch>() {
 
     @Override
     public byte[] serialize(Sketch value) {
@@ -1016,6 +1019,25 @@ public class ObjectSerDeUtils {
     }
   };
 
+  public static final ObjectSerDe<CpcSketch> DATA_SKETCH_CPC_SER_DE = new 
ObjectSerDe<CpcSketch>() {
+    @Override
+    public byte[] serialize(CpcSketch value) {
+      return value.toByteArray();
+    }
+
+    @Override
+    public CpcSketch deserialize(byte[] bytes) {
+      return CpcSketch.heapify(Memory.wrap(bytes));
+    }
+
+    @Override
+    public CpcSketch deserialize(ByteBuffer byteBuffer) {
+      byte[] bytes = new byte[byteBuffer.remaining()];
+      byteBuffer.get(bytes);
+      return CpcSketch.heapify(Memory.wrap(bytes));
+    }
+  };
+
   public static final ObjectSerDe<Geometry> GEOMETRY_SER_DE = new 
ObjectSerDe<Geometry>() {
 
     @Override
@@ -1383,7 +1405,7 @@ public class ObjectSerDeUtils {
       INT_SET_SER_DE,
       TDIGEST_SER_DE,
       DISTINCT_TABLE_SER_DE,
-      DATA_SKETCH_SER_DE,
+      DATA_SKETCH_THETA_SER_DE,
       GEOMETRY_SER_DE,
       ROARING_BITMAP_SER_DE,
       LONG_SET_SER_DE,
@@ -1412,6 +1434,7 @@ public class ObjectSerDeUtils {
       FREQUENT_STRINGS_SKETCH_SER_DE,
       FREQUENT_LONGS_SKETCH_SER_DE,
       HYPER_LOG_LOG_PLUS_SER_DE,
+      DATA_SKETCH_CPC_SER_DE,
   };
   //@formatter:on
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java
index 8b0b72d1d1..f26a062b10 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java
@@ -22,6 +22,8 @@ import 
com.clearspring.analytics.stream.cardinality.HyperLogLog;
 import java.math.BigDecimal;
 import java.util.Base64;
 import javax.annotation.Nullable;
+import org.apache.datasketches.cpc.CpcSketch;
+import org.apache.datasketches.cpc.CpcUnion;
 import org.apache.datasketches.memory.Memory;
 import org.apache.datasketches.theta.AnotB;
 import org.apache.datasketches.theta.Intersection;
@@ -68,6 +70,14 @@ import org.apache.pinot.spi.utils.CommonConstants;
  *     {
  *       "columnName": "names",
  *       "transformFunction": "toHLL(playerName, 8)"
+ *     },
+ *     {
+ *       "columnName": "players",
+ *       "transformFunction": "toCpcSketch(playerID)"
+ *     },
+ *     {
+ *       "columnName": "players",
+ *       "transformFunction": "toCpcSketch(playerID, 11)"
  *     }
  *   ]
  * }
@@ -119,7 +129,7 @@ public class SketchFunctions {
             "Unrecognised input type for Theta sketch: " + 
input.getClass().getSimpleName());
       }
     }
-    return ObjectSerDeUtils.DATA_SKETCH_SER_DE.serialize(sketch.compact());
+    return 
ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.serialize(sketch.compact());
   }
 
   /**
@@ -274,8 +284,8 @@ public class SketchFunctions {
     } else if (sketchObj instanceof byte[]) {
       return Sketches.wrapSketch(Memory.wrap((byte[]) sketchObj));
     } else {
-      throw new RuntimeException("Exception occurred getting estimate from 
Theta Sketch, unsupported Object type: "
-          + sketchObj.getClass());
+      throw new RuntimeException(
+          "Exception occurred getting estimate from Theta Sketch, unsupported 
Object type: " + sketchObj.getClass());
     }
   }
 
@@ -310,9 +320,8 @@ public class SketchFunctions {
   }
 
   private static byte[] intTupleSketchUnionVar(IntegerSummary.Mode mode, int 
nomEntries, Object... sketchObjects) {
-    org.apache.datasketches.tuple.Union<IntegerSummary>
-        union = new org.apache.datasketches.tuple.Union<>(nomEntries,
-        new IntegerSummarySetOperations(mode, mode));
+    org.apache.datasketches.tuple.Union<IntegerSummary> union =
+        new org.apache.datasketches.tuple.Union<>(nomEntries, new 
IntegerSummarySetOperations(mode, mode));
     for (Object sketchObj : sketchObjects) {
       union.union(asIntegerSketch(sketchObj));
     }
@@ -360,8 +369,8 @@ public class SketchFunctions {
     } else if (sketchObj instanceof byte[]) {
       return 
ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize((byte[]) sketchObj);
     } else {
-      throw new RuntimeException("Exception occurred getting reading Tuple 
Sketch, unsupported Object type: "
-          + sketchObj.getClass());
+      throw new RuntimeException(
+          "Exception occurred getting reading Tuple Sketch, unsupported Object 
type: " + sketchObj.getClass());
     }
   }
 
@@ -369,4 +378,95 @@ public class SketchFunctions {
   public static long getIntTupleSketchEstimate(Object o1) {
     return Math.round(asIntegerSketch(o1).getEstimate());
   }
+
+  /**
+   * Create a CPC Sketch containing the input
+   *
+   * @param input an Object we want to insert into the sketch, may be null to 
return an empty sketch
+   * @return serialized CPC sketch as bytes
+   */
+  @ScalarFunction(nullableParameters = true)
+  public static byte[] toCpcSketch(@Nullable Object input) {
+    return toCpcSketch(input, CommonConstants.Helix.DEFAULT_CPC_SKETCH_LGK);
+  }
+
+  @ScalarFunction(names = {"getCpcSketchEstimate", "get_cpc_sketch_estimate"})
+  public static long getCpcSketchEstimate(Object o1) {
+    return Math.round(asCpcSketch(o1).getEstimate());
+  }
+
+  @ScalarFunction(names = {"cpcSketchUnion", "cpc_sketch_union"})
+  public static byte[] cpcSketchUnion(Object o1, Object o2) {
+    return cpcSketchUnionVar(o1, o2);
+  }
+
+  @ScalarFunction(names = {"cpcSketchUnion", "cpc_sketch_union"})
+  public static byte[] cpcSketchUnion(Object o1, Object o2, Object o3) {
+    return cpcSketchUnionVar(o1, o2, o3);
+  }
+
+  @ScalarFunction(names = {"cpcSketchUnion", "cpc_sketch_union"})
+  public static byte[] cpcSketchUnion(Object o1, Object o2, Object o3, Object 
o4) {
+    return cpcSketchUnionVar(o1, o2, o3, o4);
+  }
+
+  @ScalarFunction(names = {"cpcSketchUnion", "cpc_sketch_union"})
+  public static byte[] cpcSketchUnion(Object o1, Object o2, Object o3, Object 
o4, Object o5) {
+    return cpcSketchUnionVar(o1, o2, o3, o4, o5);
+  }
+
+  /**
+   * Create a CPC Sketch containing the input, with a configured nominal 
entries
+   *
+   * @param input an Object we want to insert into the sketch, may be null to 
return an empty sketch
+   * @param lgK the given log_base2 of k, which is the nominal entries that 
the sketch is configured to keep
+   * @return serialized CPC sketch as bytes
+   */
+  @ScalarFunction(nullableParameters = true)
+  public static byte[] toCpcSketch(@Nullable Object input, int lgK) {
+    CpcSketch sketch = new CpcSketch(lgK);
+    if (input != null) {
+      if (input instanceof Integer) {
+        sketch.update((Integer) input);
+      } else if (input instanceof Long) {
+        sketch.update((Long) input);
+      } else if (input instanceof Float) {
+        sketch.update((Float) input);
+      } else if (input instanceof Double) {
+        sketch.update((Double) input);
+      } else if (input instanceof BigDecimal) {
+        sketch.update(((BigDecimal) input).toString());
+      } else if (input instanceof String) {
+        sketch.update((String) input);
+      } else if (input instanceof byte[]) {
+        sketch.update((byte[]) input);
+      } else {
+        throw new IllegalArgumentException(
+            "Unrecognised input type for CPC sketch: " + 
input.getClass().getSimpleName());
+      }
+    }
+    return ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.serialize(sketch);
+  }
+
+  private static CpcSketch asCpcSketch(Object sketchObj) {
+    if (sketchObj instanceof CpcSketch) {
+      return (CpcSketch) sketchObj;
+    } else if (sketchObj instanceof byte[]) {
+      return CpcSketch.heapify(Memory.wrap((byte[]) sketchObj));
+    } else if (sketchObj instanceof String) {
+      byte[] decoded = Base64.getDecoder().decode((String) sketchObj);
+      return CpcSketch.heapify(Memory.wrap((decoded)));
+    } else {
+      throw new RuntimeException(
+          "Exception occurred getting estimate from CPC Sketch, unsupported 
Object type: " + sketchObj.getClass());
+    }
+  }
+
+  private static byte[] cpcSketchUnionVar(Object... sketchObjects) {
+    CpcUnion union = new 
CpcUnion(CommonConstants.Helix.DEFAULT_CPC_SKETCH_LGK);
+    for (Object sketchObj : sketchObjects) {
+      union.update(asCpcSketch(sketchObj));
+    }
+    return union.getResult().toByteArray();
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index f83fe090c0..0480141f17 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -364,6 +364,10 @@ public class AggregationFunctionFactory {
             return new FrequentStringsSketchAggregationFunction(arguments);
           case FREQUENTLONGSSKETCH:
             return new FrequentLongsSketchAggregationFunction(arguments);
+          case DISTINCTCOUNTCPCSKETCH:
+            return new DistinctCountCPCSketchAggregationFunction(arguments);
+          case DISTINCTCOUNTRAWCPCSKETCH:
+            return new DistinctCountRawCPCSketchAggregationFunction(arguments);
 
           default:
             throw new IllegalArgumentException("Unsupported aggregation 
function type: " + functionType);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountCPCSketchAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountCPCSketchAggregationFunction.java
new file mode 100644
index 0000000000..1946200842
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountCPCSketchAggregationFunction.java
@@ -0,0 +1,540 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.Map;
+import org.apache.datasketches.cpc.CpcSketch;
+import org.apache.datasketches.cpc.CpcUnion;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import 
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.roaringbitmap.PeekableIntIterator;
+import org.roaringbitmap.RoaringBitmap;
+
+
+/**
+ * The {@code DistinctCountCPCSketchAggregationFunction} is used for 
space-efficient cardinality estimation.
+ * The Apache Datasketches CPC sketch is a unique-counting sketch that 
implements the
+ * <i>Compressed Probabilistic Counting (CPC, a.k.a FM85)</i> algorithms 
developed by Kevin Lang in his paper
+ * <a href="https://arxiv.org/abs/1708.06839";>Back to the Future: an Even More 
Nearly Optimal Cardinality Estimation
+ * Algorithm</a>.
+ * <br><br>
+ * The stored CPC sketch can consume about 40% less space than an HLL sketch 
of comparable accuracy. CPC sketches have
+ * been intentionally designed to offer different tradeoffs to HLL sketches so 
that, they complement each
+ * other in many ways.  For more information, see the Apache Datasketches 
documentation.
+ * <br><br>
+ * The aggregation function supports both pre-aggregated sketches or raw 
values, but no post-aggregation is supported.
+ * Usage examples:
+ * <ul>
+ *   <li>
+ *     Simple union (1 or 2 arguments): main expression to aggregate on, 
followed by an optional CPC sketch size
+ *     argument. The second argument is the sketch lgK – the given log_base2 
of k, and defaults to 12.
+ *     The "raw" equivalents return serialised sketches in base64-encoded 
strings.
+ *     <p>DISTINCT_COUNT_CPC_SKETCH(col)</p>
+ *     <p>DISTINCT_COUNT_CPC_SKETCH(col, 12)</p>
+ *     <p>DISTINCT_COUNT_RAW_CPC_SKETCH(col)</p>
+ *     <p>DISTINCT_COUNT_RAW_CPC_SKETCH(col, 12)</p>
+ *   <li>
+ *     Extracting a cardinality estimate from a CPC sketch:
+ *     <p>GET_CPC_SKETCH_ESTIMATE(sketch_bytes)</p>
+ *     <p>GET_CPC_SKETCH_ESTIMATE(DISTINCT_COUNT_RAW_CPC_SKETCH(col))</p>
+ *   </li>
+ *   <li>
+ *     Union between two sketches:
+ *     <p>
+ *       CPC_SKETCH_UNION(
+ *         DISTINCT_COUNT_RAW_CPC_SKETCH(col1),
+ *         DISTINCT_COUNT_RAW_CPC_SKETCH(col2)
+ *       )
+ *     </p>
+ *   </li>
+ * </ul>
+ */
+@SuppressWarnings({"rawtypes"})
+public class DistinctCountCPCSketchAggregationFunction
+    extends BaseSingleInputAggregationFunction<CpcSketch, Comparable> {
+  protected final int _lgK;
+
+  public DistinctCountCPCSketchAggregationFunction(List<ExpressionContext> 
arguments) {
+    super(arguments.get(0));
+    int numExpressions = arguments.size();
+    // This function expects 1 or 2 arguments - it is a code smell to extend 
the base for single
+    // input aggregation functions.  Nevertheless, there are other functions 
in the base class that
+    // are apply here.  See also: Theta sketch aggregation function.
+    Preconditions.checkArgument(numExpressions <= 2, "DistinctCountCPC expects 
1 or 2 arguments, got: %s",
+        numExpressions);
+    if (arguments.size() == 2) {
+      _lgK = arguments.get(1).getLiteral().getIntValue();
+    } else {
+      _lgK = CommonConstants.Helix.DEFAULT_CPC_SKETCH_LGK;
+    }
+  }
+
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.DISTINCTCOUNTCPCSKETCH;
+  }
+
+  @Override
+  public AggregationResultHolder createAggregationResultHolder() {
+    return new ObjectAggregationResultHolder();
+  }
+
+  @Override
+  public GroupByResultHolder createGroupByResultHolder(int initialCapacity, 
int maxCapacity) {
+    return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+  }
+
+  @Override
+  public void aggregate(int length, AggregationResultHolder 
aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+    // Treat BYTES value as serialized CPC Sketch
+    FieldSpec.DataType storedType = blockValSet.getValueType().getStoredType();
+    if (storedType == DataType.BYTES) {
+      byte[][] bytesValues = blockValSet.getBytesValuesSV();
+      try {
+        CpcSketch cpcSketch = aggregationResultHolder.getResult();
+        CpcUnion union = new CpcUnion(_lgK);
+        if (cpcSketch != null) {
+          union.update(cpcSketch);
+        }
+        for (int i = 0; i < length; i++) {
+          
union.update(ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.deserialize(bytesValues[i]));
+        }
+        aggregationResultHolder.setValue(union.getResult());
+      } catch (Exception e) {
+        throw new RuntimeException("Caught exception while merging CPC 
sketches", e);
+      }
+      return;
+    }
+
+    // For dictionary-encoded expression, store dictionary ids into the bitmap
+    Dictionary dictionary = blockValSet.getDictionary();
+    if (dictionary != null) {
+      int[] dictIds = blockValSet.getDictionaryIdsSV();
+      getDictIdBitmap(aggregationResultHolder, dictionary).addN(dictIds, 0, 
length);
+      return;
+    }
+
+    // For non-dictionary-encoded expression, store values into the CpcSketch
+    CpcSketch cpcSketch = getCpcSketch(aggregationResultHolder);
+    switch (storedType) {
+      case INT:
+        int[] intValues = blockValSet.getIntValuesSV();
+        for (int i = 0; i < length; i++) {
+          cpcSketch.update(intValues[i]);
+        }
+        break;
+      case LONG:
+        long[] longValues = blockValSet.getLongValuesSV();
+        for (int i = 0; i < length; i++) {
+          cpcSketch.update(longValues[i]);
+        }
+        break;
+      case FLOAT:
+        float[] floatValues = blockValSet.getFloatValuesSV();
+        for (int i = 0; i < length; i++) {
+          cpcSketch.update(floatValues[i]);
+        }
+        break;
+      case DOUBLE:
+        double[] doubleValues = blockValSet.getDoubleValuesSV();
+        for (int i = 0; i < length; i++) {
+          cpcSketch.update(doubleValues[i]);
+        }
+        break;
+      case STRING:
+        String[] stringValues = blockValSet.getStringValuesSV();
+        for (int i = 0; i < length; i++) {
+          cpcSketch.update(stringValues[i]);
+        }
+        break;
+      default:
+        throw new IllegalStateException("Illegal data type for 
DISTINCT_COUNT_CPC aggregation function: " + storedType);
+    }
+  }
+
+  @Override
+  public void aggregateGroupBySV(int length, int[] groupKeyArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+    // Treat BYTES value as serialized CPC Sketch
+    DataType storedType = blockValSet.getValueType().getStoredType();
+    if (storedType == DataType.BYTES) {
+      byte[][] bytesValues = blockValSet.getBytesValuesSV();
+      try {
+        for (int i = 0; i < length; i++) {
+          CpcSketch value = 
ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.deserialize(bytesValues[i]);
+          int groupKey = groupKeyArray[i];
+          CpcSketch cpcSketch = groupByResultHolder.getResult(groupKey);
+          if (cpcSketch != null) {
+            CpcUnion union = new CpcUnion(_lgK);
+            union.update(cpcSketch);
+            union.update(value);
+            groupByResultHolder.setValueForKey(groupKey, union.getResult());
+          } else {
+            groupByResultHolder.setValueForKey(groupKey, value);
+          }
+        }
+      } catch (Exception e) {
+        throw new RuntimeException("Caught exception while merging CPC 
sketches", e);
+      }
+      return;
+    }
+
+    // For dictionary-encoded expression, store dictionary ids into the bitmap
+    Dictionary dictionary = blockValSet.getDictionary();
+    if (dictionary != null) {
+      int[] dictIds = blockValSet.getDictionaryIdsSV();
+      for (int i = 0; i < length; i++) {
+        getDictIdBitmap(groupByResultHolder, groupKeyArray[i], 
dictionary).add(dictIds[i]);
+      }
+      return;
+    }
+
+    // For non-dictionary-encoded expression, store values into the CpcSketch
+    switch (storedType) {
+      case INT:
+        int[] intValues = blockValSet.getIntValuesSV();
+        for (int i = 0; i < length; i++) {
+          getCpcSketch(groupByResultHolder, 
groupKeyArray[i]).update(intValues[i]);
+        }
+        break;
+      case LONG:
+        long[] longValues = blockValSet.getLongValuesSV();
+        for (int i = 0; i < length; i++) {
+          getCpcSketch(groupByResultHolder, 
groupKeyArray[i]).update(longValues[i]);
+        }
+        break;
+      case FLOAT:
+        float[] floatValues = blockValSet.getFloatValuesSV();
+        for (int i = 0; i < length; i++) {
+          getCpcSketch(groupByResultHolder, 
groupKeyArray[i]).update(floatValues[i]);
+        }
+        break;
+      case DOUBLE:
+        double[] doubleValues = blockValSet.getDoubleValuesSV();
+        for (int i = 0; i < length; i++) {
+          getCpcSketch(groupByResultHolder, 
groupKeyArray[i]).update(doubleValues[i]);
+        }
+        break;
+      case STRING:
+        String[] stringValues = blockValSet.getStringValuesSV();
+        for (int i = 0; i < length; i++) {
+          getCpcSketch(groupByResultHolder, 
groupKeyArray[i]).update(stringValues[i]);
+        }
+        break;
+      default:
+        throw new IllegalStateException("Illegal data type for 
DISTINCT_COUNT_CPC aggregation function: " + storedType);
+    }
+  }
+
+  @Override
+  public void aggregateGroupByMV(int length, int[][] groupKeysArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+    // Treat BYTES value as serialized CPC Sketch
+    DataType storedType = blockValSet.getValueType().getStoredType();
+    if (storedType == DataType.BYTES) {
+      byte[][] bytesValues = blockValSet.getBytesValuesSV();
+      try {
+        for (int i = 0; i < length; i++) {
+          CpcSketch value = 
ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.deserialize(bytesValues[i]);
+          for (int groupKey : groupKeysArray[i]) {
+            CpcSketch cpcSketch = groupByResultHolder.getResult(groupKey);
+            if (cpcSketch != null) {
+              CpcUnion union = new CpcUnion(_lgK);
+              union.update(cpcSketch);
+              union.update(value);
+              groupByResultHolder.setValueForKey(groupKey, union.getResult());
+            } else {
+              groupByResultHolder.setValueForKey(groupKey, value);
+            }
+          }
+        }
+      } catch (Exception e) {
+        throw new RuntimeException("Caught exception while merging CPC 
sketches", e);
+      }
+      return;
+    }
+
+    // For dictionary-encoded expression, store dictionary ids into the bitmap
+    Dictionary dictionary = blockValSet.getDictionary();
+    if (dictionary != null) {
+      int[] dictIds = blockValSet.getDictionaryIdsSV();
+      for (int i = 0; i < length; i++) {
+        setDictIdForGroupKeys(groupByResultHolder, groupKeysArray[i], 
dictionary, dictIds[i]);
+      }
+      return;
+    }
+
+    // For non-dictionary-encoded expression, store values into the CpcSketch
+    switch (storedType) {
+      case INT:
+        int[] intValues = blockValSet.getIntValuesSV();
+        for (int i = 0; i < length; i++) {
+          for (int groupKey : groupKeysArray[i]) {
+            getCpcSketch(groupByResultHolder, groupKey).update(intValues[i]);
+          }
+        }
+        break;
+      case LONG:
+        long[] longValues = blockValSet.getLongValuesSV();
+        for (int i = 0; i < length; i++) {
+          for (int groupKey : groupKeysArray[i]) {
+            getCpcSketch(groupByResultHolder, groupKey).update(longValues[i]);
+          }
+        }
+        break;
+      case FLOAT:
+        float[] floatValues = blockValSet.getFloatValuesSV();
+        for (int i = 0; i < length; i++) {
+          for (int groupKey : groupKeysArray[i]) {
+            getCpcSketch(groupByResultHolder, groupKey).update(floatValues[i]);
+          }
+        }
+        break;
+      case DOUBLE:
+        double[] doubleValues = blockValSet.getDoubleValuesSV();
+        for (int i = 0; i < length; i++) {
+          for (int groupKey : groupKeysArray[i]) {
+            getCpcSketch(groupByResultHolder, 
groupKey).update(doubleValues[i]);
+          }
+        }
+        break;
+      case STRING:
+        String[] stringValues = blockValSet.getStringValuesSV();
+        for (int i = 0; i < length; i++) {
+          for (int groupKey : groupKeysArray[i]) {
+            getCpcSketch(groupByResultHolder, 
groupKey).update(stringValues[i]);
+          }
+        }
+        break;
+      default:
+        throw new IllegalStateException("Illegal data type for 
DISTINCT_COUNT_CPC aggregation function: " + storedType);
+    }
+  }
+
+  @Override
+  public CpcSketch extractAggregationResult(AggregationResultHolder 
aggregationResultHolder) {
+    Object result = aggregationResultHolder.getResult();
+    if (result == null) {
+      return new CpcSketch(_lgK);
+    }
+
+    if (result instanceof DictIdsWrapper) {
+      // For dictionary-encoded expression, convert dictionary ids to CpcSketch
+      return convertToCpcSketch((DictIdsWrapper) result);
+    } else {
+      // For non-dictionary-encoded expression, directly return the CpcSketch
+      return (CpcSketch) result;
+    }
+  }
+
+  @Override
+  public CpcSketch extractGroupByResult(GroupByResultHolder 
groupByResultHolder, int groupKey) {
+    Object result = groupByResultHolder.getResult(groupKey);
+    if (result == null) {
+      return new CpcSketch(_lgK);
+    }
+
+    if (result instanceof DictIdsWrapper) {
+      // For dictionary-encoded expression, convert dictionary ids to CpcSketch
+      return convertToCpcSketch((DictIdsWrapper) result);
+    } else {
+      // For non-dictionary-encoded expression, directly return the CpcSketch
+      return (CpcSketch) result;
+    }
+  }
+
+  @Override
+  public CpcSketch merge(CpcSketch intermediateResult1, CpcSketch 
intermediateResult2) {
+    if (intermediateResult1 == null && intermediateResult2 != null) {
+      return intermediateResult2;
+    } else if (intermediateResult1 != null && intermediateResult2 == null) {
+      return intermediateResult1;
+    } else if (intermediateResult1 == null) {
+      return new CpcSketch(_lgK);
+    }
+
+    CpcUnion union = new CpcUnion(_lgK);
+    union.update(intermediateResult1);
+    union.update(intermediateResult2);
+    return union.getResult();
+  }
+
+  @Override
+  public DataSchema.ColumnDataType getIntermediateResultColumnType() {
+    return DataSchema.ColumnDataType.OBJECT;
+  }
+
+  @Override
+  public DataSchema.ColumnDataType getFinalResultColumnType() {
+    return DataSchema.ColumnDataType.LONG;
+  }
+
+  @Override
+  public Comparable extractFinalResult(CpcSketch intermediateResult) {
+    return Math.round(intermediateResult.getEstimate());
+  }
+
+  /**
+   * Returns the dictionary id bitmap from the result holder or creates a new 
one if it does not exist.
+   */
+  protected static RoaringBitmap getDictIdBitmap(AggregationResultHolder 
aggregationResultHolder,
+      Dictionary dictionary) {
+    DictIdsWrapper dictIdsWrapper = aggregationResultHolder.getResult();
+    if (dictIdsWrapper == null) {
+      dictIdsWrapper = new DictIdsWrapper(dictionary);
+      aggregationResultHolder.setValue(dictIdsWrapper);
+    }
+    return dictIdsWrapper._dictIdBitmap;
+  }
+
+  /**
+   * Returns the CpcSketch from the result holder or creates a new one if it 
does not exist.
+   */
+  protected CpcSketch getCpcSketch(AggregationResultHolder 
aggregationResultHolder) {
+    CpcSketch cpcSketch = aggregationResultHolder.getResult();
+    if (cpcSketch == null) {
+      cpcSketch = new CpcSketch(_lgK);
+      aggregationResultHolder.setValue(cpcSketch);
+    }
+    return cpcSketch;
+  }
+
+  /**
+   * Returns the dictionary id bitmap for the given group key or creates a new 
one if it does not exist.
+   */
+  protected static RoaringBitmap getDictIdBitmap(GroupByResultHolder 
groupByResultHolder, int groupKey,
+      Dictionary dictionary) {
+    DictIdsWrapper dictIdsWrapper = groupByResultHolder.getResult(groupKey);
+    if (dictIdsWrapper == null) {
+      dictIdsWrapper = new DictIdsWrapper(dictionary);
+      groupByResultHolder.setValueForKey(groupKey, dictIdsWrapper);
+    }
+    return dictIdsWrapper._dictIdBitmap;
+  }
+
+  /**
+   * Returns the CpcSketch for the given group key or creates a new one if it 
does not exist.
+   */
+  protected CpcSketch getCpcSketch(GroupByResultHolder groupByResultHolder, 
int groupKey) {
+    CpcSketch cpcSketch = groupByResultHolder.getResult(groupKey);
+    if (cpcSketch == null) {
+      cpcSketch = new CpcSketch(_lgK);
+      groupByResultHolder.setValueForKey(groupKey, cpcSketch);
+    }
+    return cpcSketch;
+  }
+
+  /**
+   * Helper method to set dictionary id for the given group keys into the 
result holder.
+   */
+  private static void setDictIdForGroupKeys(GroupByResultHolder 
groupByResultHolder, int[] groupKeys,
+      Dictionary dictionary, int dictId) {
+    for (int groupKey : groupKeys) {
+      getDictIdBitmap(groupByResultHolder, groupKey, dictionary).add(dictId);
+    }
+  }
+
+  private CpcSketch convertToCpcSketch(DictIdsWrapper dictIdsWrapper) {
+    CpcSketch cpcSketch = new CpcSketch(_lgK);
+    Dictionary dictionary = dictIdsWrapper._dictionary;
+    RoaringBitmap dictIdBitmap = dictIdsWrapper._dictIdBitmap;
+    PeekableIntIterator iterator = dictIdBitmap.getIntIterator();
+    while (iterator.hasNext()) {
+      Object value = dictionary.get(iterator.next());
+      addObjectToSketch(value, cpcSketch);
+    }
+    return cpcSketch;
+  }
+
+  private void addObjectToSketch(Object rawValue, CpcSketch sketch) {
+    if (rawValue instanceof String) {
+      sketch.update((String) rawValue);
+    } else if (rawValue instanceof Integer) {
+      sketch.update((Integer) rawValue);
+    } else if (rawValue instanceof Long) {
+      sketch.update((Long) rawValue);
+    } else if (rawValue instanceof Double) {
+      sketch.update((Double) rawValue);
+    } else if (rawValue instanceof Float) {
+      sketch.update((Float) rawValue);
+    } else if (rawValue instanceof Object[]) {
+      addObjectsToSketch((Object[]) rawValue, sketch);
+    } else {
+      throw new IllegalStateException(
+          "Unsupported data type for CPC Sketch aggregation: " + 
rawValue.getClass().getSimpleName());
+    }
+  }
+
+  private void addObjectsToSketch(Object[] rawValues, CpcSketch sketch) {
+    if (rawValues instanceof String[]) {
+      for (String s : (String[]) rawValues) {
+        sketch.update(s);
+      }
+    } else if (rawValues instanceof Integer[]) {
+      for (Integer i : (Integer[]) rawValues) {
+        sketch.update(i);
+      }
+    } else if (rawValues instanceof Long[]) {
+      for (Long l : (Long[]) rawValues) {
+        sketch.update(l);
+      }
+    } else if (rawValues instanceof Double[]) {
+      for (Double d : (Double[]) rawValues) {
+        sketch.update(d);
+      }
+    } else if (rawValues instanceof Float[]) {
+      for (Float f : (Float[]) rawValues) {
+        sketch.update(f);
+      }
+    } else {
+      throw new IllegalStateException(
+          "Unsupported data type for CPC Sketch aggregation: " + 
rawValues.getClass().getSimpleName());
+    }
+  }
+
+  private static final class DictIdsWrapper {
+    final Dictionary _dictionary;
+    final RoaringBitmap _dictIdBitmap;
+
+    private DictIdsWrapper(Dictionary dictionary) {
+      _dictionary = dictionary;
+      _dictIdBitmap = new RoaringBitmap();
+    }
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawCPCSketchAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawCPCSketchAggregationFunction.java
new file mode 100644
index 0000000000..ab153c8835
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawCPCSketchAggregationFunction.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import org.apache.datasketches.cpc.CpcSketch;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.local.customobject.SerializedCPCSketch;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+/**
+ * The {@code DistinctCountRawCPCAggregationFunction} shares the same usage as 
the
+ * {@link DistinctCountCPCSketchAggregationFunction}, and returns the sketch 
as a base64 encoded string.
+ */
+public class DistinctCountRawCPCSketchAggregationFunction extends 
DistinctCountCPCSketchAggregationFunction {
+
+  public DistinctCountRawCPCSketchAggregationFunction(List<ExpressionContext> 
arguments) {
+    super(arguments);
+  }
+
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.DISTINCTCOUNTRAWCPCSKETCH;
+  }
+
+  @Override
+  public ColumnDataType getFinalResultColumnType() {
+    return ColumnDataType.STRING;
+  }
+
+  @Override
+  public SerializedCPCSketch extractFinalResult(CpcSketch sketch) {
+    return new SerializedCPCSketch(sketch);
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountThetaSketchAggregator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountCPCSketchAggregator.java
similarity index 53%
copy from 
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountThetaSketchAggregator.java
copy to 
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountCPCSketchAggregator.java
index 62a350a367..82e9a74161 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountThetaSketchAggregator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountCPCSketchAggregator.java
@@ -18,26 +18,34 @@
  */
 package org.apache.pinot.core.segment.processing.aggregator;
 
-import org.apache.datasketches.theta.Sketch;
-import org.apache.datasketches.theta.Union;
+import org.apache.datasketches.cpc.CpcSketch;
+import org.apache.datasketches.cpc.CpcUnion;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
 import org.apache.pinot.spi.utils.CommonConstants;
 
 
-public class DistinctCountThetaSketchAggregator implements ValueAggregator {
+public class DistinctCountCPCSketchAggregator implements ValueAggregator {
 
-  private final Union _union;
-
-  public DistinctCountThetaSketchAggregator() {
-    // TODO: Handle configurable nominal entries
-    _union = 
Union.builder().setNominalEntries(CommonConstants.Helix.DEFAULT_THETA_SKETCH_NOMINAL_ENTRIES).buildUnion();
+  public DistinctCountCPCSketchAggregator() {
   }
 
   @Override
   public Object aggregate(Object value1, Object value2) {
-    Sketch first = ObjectSerDeUtils.DATA_SKETCH_SER_DE.deserialize((byte[]) 
value1);
-    Sketch second = ObjectSerDeUtils.DATA_SKETCH_SER_DE.deserialize((byte[]) 
value2);
-    Sketch result = _union.union(first, second);
-    return ObjectSerDeUtils.DATA_SKETCH_SER_DE.serialize(result);
+    CpcSketch first = 
ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.deserialize((byte[]) value1);
+    CpcSketch second = 
ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.deserialize((byte[]) value2);
+    CpcSketch result;
+    if (first == null && second == null) {
+      result = new CpcSketch(CommonConstants.Helix.DEFAULT_CPC_SKETCH_LGK);
+    } else if (second == null) {
+      result = first;
+    } else if (first == null) {
+      result = second;
+    } else {
+      CpcUnion union = new 
CpcUnion(CommonConstants.Helix.DEFAULT_CPC_SKETCH_LGK);
+      union.update(first);
+      union.update(second);
+      result = union.getResult();
+    }
+    return ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.serialize(result);
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountThetaSketchAggregator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountThetaSketchAggregator.java
index 62a350a367..b11f7d7b00 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountThetaSketchAggregator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountThetaSketchAggregator.java
@@ -35,9 +35,9 @@ public class DistinctCountThetaSketchAggregator implements 
ValueAggregator {
 
   @Override
   public Object aggregate(Object value1, Object value2) {
-    Sketch first = ObjectSerDeUtils.DATA_SKETCH_SER_DE.deserialize((byte[]) 
value1);
-    Sketch second = ObjectSerDeUtils.DATA_SKETCH_SER_DE.deserialize((byte[]) 
value2);
+    Sketch first = 
ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.deserialize((byte[]) value1);
+    Sketch second = 
ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.deserialize((byte[]) value2);
     Sketch result = _union.union(first, second);
-    return ObjectSerDeUtils.DATA_SKETCH_SER_DE.serialize(result);
+    return ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.serialize(result);
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java
index cd5388870d..26182d2c74 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java
@@ -55,6 +55,9 @@ public class ValueAggregatorFactory {
       case SUMVALUESINTEGERSUMTUPLESKETCH:
       case AVGVALUEINTEGERSUMTUPLESKETCH:
         return new IntegerTupleSketchAggregator(IntegerSummary.Mode.Sum);
+      case DISTINCTCOUNTCPCSKETCH:
+      case DISTINCTCOUNTRAWCPCSKETCH:
+        return new DistinctCountCPCSketchAggregator();
       default:
         throw new IllegalStateException("Unsupported aggregation type: " + 
aggregationType);
     }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
index ccd9ae3f6f..8116a29373 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
@@ -33,6 +33,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import org.apache.commons.lang.RandomStringUtils;
+import org.apache.datasketches.cpc.CpcSketch;
 import 
org.apache.pinot.core.query.aggregation.function.PercentileEstAggregationFunction;
 import 
org.apache.pinot.core.query.aggregation.function.PercentileTDigestAggregationFunction;
 import org.apache.pinot.segment.local.customobject.AvgPair;
@@ -374,4 +375,21 @@ public class ObjectSerDeUtilsTest {
       assertEquals(actual, expected, ERROR_MESSAGE);
     }
   }
+
+  @Test
+  public void testCpcSketch() {
+    for (int i = 0; i < NUM_ITERATIONS; i++) {
+      CpcSketch sketch = new CpcSketch();
+      int size = RANDOM.nextInt(100) + 1;
+      for (int j = 0; j < size; j++) {
+        sketch.update(RANDOM.nextLong());
+      }
+
+      byte[] bytes = ObjectSerDeUtils.serialize(sketch);
+      CpcSketch actual =
+          ObjectSerDeUtils.deserialize(bytes, 
ObjectSerDeUtils.ObjectType.CompressedProbabilisticCounting);
+
+      assertEquals(actual.getEstimate(), sketch.getEstimate(), ERROR_MESSAGE);
+    }
+  }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/function/scalar/SketchFunctionsTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/function/scalar/SketchFunctionsTest.java
index b62d363c4f..039ba01018 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/function/scalar/SketchFunctionsTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/function/scalar/SketchFunctionsTest.java
@@ -27,7 +27,7 @@ import org.testng.annotations.Test;
 public class SketchFunctionsTest {
 
   private double thetaEstimate(byte[] bytes) {
-    return 
ObjectSerDeUtils.DATA_SKETCH_SER_DE.deserialize(bytes).getEstimate();
+    return 
ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.deserialize(bytes).getEstimate();
   }
 
   byte[] _bytes = {1, 2, 3};
@@ -78,4 +78,20 @@ public class SketchFunctionsTest {
     Assert.assertThrows(IllegalArgumentException.class,
         () -> SketchFunctions.toIntegerSumTupleSketch(new Object(), 1, 1024));
   }
+
+  private double cpcEstimate(byte[] bytes) {
+    return 
ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.deserialize(bytes).getEstimate();
+  }
+
+  @Test
+  public void testCpcCreation() {
+    for (Object i : _inputs) {
+      Assert.assertEquals(cpcEstimate(SketchFunctions.toCpcSketch(i)), 1.0);
+      Assert.assertEquals(cpcEstimate(SketchFunctions.toCpcSketch(i, 11)), 
1.0);
+    }
+    Assert.assertEquals(cpcEstimate(SketchFunctions.toCpcSketch(null)), 0.0);
+    Assert.assertEquals(cpcEstimate(SketchFunctions.toCpcSketch(null, 11)), 
0.0);
+    Assert.assertThrows(IllegalArgumentException.class, () -> 
SketchFunctions.toCpcSketch(new Object()));
+    Assert.assertThrows(IllegalArgumentException.class, () -> 
SketchFunctions.toCpcSketch(new Object(), 11));
+  }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/DistinctCountCPCSketchStarTreeV2Test.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/DistinctCountCPCSketchStarTreeV2Test.java
new file mode 100644
index 0000000000..3732d3553b
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/DistinctCountCPCSketchStarTreeV2Test.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.startree.v2;
+
+import java.util.Collections;
+import java.util.Random;
+import org.apache.datasketches.cpc.CpcSketch;
+import 
org.apache.pinot.segment.local.aggregator.DistinctCountCPCSketchValueAggregator;
+import org.apache.pinot.segment.local.aggregator.ValueAggregator;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class DistinctCountCPCSketchStarTreeV2Test extends 
BaseStarTreeV2Test<Object, CpcSketch> {
+
+  @Override
+  ValueAggregator<Object, CpcSketch> getValueAggregator() {
+    return new DistinctCountCPCSketchValueAggregator(Collections.emptyList());
+  }
+
+  @Override
+  DataType getRawValueType() {
+    return DataType.INT;
+  }
+
+  @Override
+  Object getRandomRawValue(Random random) {
+    return random.nextInt(100);
+  }
+
+  @Override
+  void assertAggregatedValue(CpcSketch starTreeResult, CpcSketch 
nonStarTreeResult) {
+    assertEquals((long) starTreeResult.getEstimate(), (long) 
nonStarTreeResult.getEstimate());
+  }
+}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountThetaSketchQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountThetaSketchQueriesTest.java
index 35fc8127bc..4d04539b4a 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountThetaSketchQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountThetaSketchQueriesTest.java
@@ -299,7 +299,7 @@ public class DistinctCountThetaSketchQueriesTest extends 
BaseQueriesTest {
     String query = "SELECT DISTINCT_COUNT_RAW_THETA_SKETCH(intSVColumn) FROM 
testTable";
     BrokerResponseNative brokerResponse = getBrokerResponse(query);
     String serializedSketch = (String) 
brokerResponse.getResultTable().getRows().get(0)[0];
-    Sketch sketch = 
ObjectSerDeUtils.DATA_SKETCH_SER_DE.deserialize(Base64.getDecoder().decode(serializedSketch));
+    Sketch sketch = 
ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.deserialize(Base64.getDecoder().decode(serializedSketch));
     assertEquals(Math.round(sketch.getEstimate()), NUM_RECORDS);
   }
 
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CpcSketchTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CpcSketchTest.java
new file mode 100644
index 0000000000..94008c404e
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CpcSketchTest.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.custom;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.ImmutableList;
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.Base64;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.commons.lang.math.RandomUtils;
+import org.apache.datasketches.cpc.CpcSketch;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+@Test(suiteName = "CustomClusterIntegrationTest")
+public class CpcSketchTest extends CustomDataQueryClusterIntegrationTest {
+
+  private static final String DEFAULT_TABLE_NAME = "CpcSketchTest";
+  private static final String ID = "id";
+  private static final String MET_CPC_SKETCH_BYTES = "metCpcSketchBytes";
+
+  @Override
+  protected long getCountStarResult() {
+    return 1000;
+  }
+
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testQueries(boolean useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    String query = String.format("SELECT DISTINCT_COUNT_CPC_SKETCH(%s), 
DISTINCT_COUNT_RAW_CPC_SKETCH(%s) FROM %s",
+        MET_CPC_SKETCH_BYTES, MET_CPC_SKETCH_BYTES, getTableName());
+    JsonNode jsonNode = postQuery(query);
+    long distinctCount = 
jsonNode.get("resultTable").get("rows").get(0).get(0).asLong();
+    byte[] rawSketchBytes = 
Base64.getDecoder().decode(jsonNode.get("resultTable").get("rows").get(0).get(1).asText());
+    CpcSketch deserializedSketch = 
ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.deserialize(rawSketchBytes);
+
+    assertTrue(distinctCount > 0);
+    assertEquals(Math.round(deserializedSketch.getEstimate()), distinctCount);
+  }
+
+  @Test(dataProvider = "useV2QueryEngine")
+  public void testCpcUnionQueries(boolean useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    for (int i = 0; i < 10; i++) {
+      String query = "SELECT " + 
"DISTINCT_COUNT_CPC_SKETCH(metCpcSketchBytes), "
+          + 
"GET_CPC_SKETCH_ESTIMATE(DISTINCT_COUNT_RAW_CPC_SKETCH(metCpcSketchBytes)) " + 
"FROM " + getTableName()
+          + " WHERE id=" + i;
+      JsonNode jsonNode = postQuery(query);
+      long distinctCount = 
jsonNode.get("resultTable").get("rows").get(0).get(0).asLong();
+      
assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(1).asLong(), 
distinctCount);
+      query = "SELECT " + 
"GET_CPC_SKETCH_ESTIMATE(DISTINCT_COUNT_RAW_CPC_SKETCH(metCpcSketchBytes) "
+          + "FILTER (WHERE id = " + i + ")) " + "FROM " + getTableName();
+      jsonNode = postQuery(query);
+      
assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(0).asLong(), 
distinctCount);
+    }
+
+    for (int i = 0; i < 10; i++) {
+      for (int j = 0; j < 10; j++) {
+        // Query Type 1:
+        String query = "SELECT DISTINCT_COUNT_CPC_SKETCH(metCpcSketchBytes), "
+            + 
"GET_CPC_SKETCH_ESTIMATE(DISTINCT_COUNT_RAW_CPC_SKETCH(metCpcSketchBytes)) " + 
"FROM " + getTableName()
+            + " WHERE id=" + i + " OR id=" + j;
+        JsonNode jsonNode = postQuery(query);
+        long distinctCount = 
jsonNode.get("resultTable").get("rows").get(0).get(0).asLong();
+        
assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(1).asLong(), 
distinctCount);
+
+        // Query Type 2:
+        query = "SELECT " + 
"GET_CPC_SKETCH_ESTIMATE(DISTINCT_COUNT_RAW_CPC_SKETCH(metCpcSketchBytes) "
+            + "FILTER (WHERE id = " + i + " OR id = " + j + ")) " + "FROM " + 
getTableName();
+        jsonNode = postQuery(query);
+        
assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(0).asLong(), 
distinctCount);
+
+        // Query Type 3:
+        query = "SELECT " + "GET_CPC_SKETCH_ESTIMATE(CPC_SKETCH_UNION( "
+            + "DISTINCT_COUNT_RAW_CPC_SKETCH(metCpcSketchBytes) FILTER (WHERE 
id = " + i + "),"
+            + "DISTINCT_COUNT_RAW_CPC_SKETCH(metCpcSketchBytes) FILTER (WHERE 
id = " + j + ")))" + " FROM "
+            + getTableName();
+
+        jsonNode = postQuery(query);
+        
assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(0).asLong(), 
distinctCount);
+      }
+    }
+  }
+
+  @Test(dataProvider = "useV2QueryEngine")
+  public void testUnionWithSketchQueries(boolean useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+
+    String query = String.format(
+        "SELECT " + "DISTINCT_COUNT_CPC_SKETCH(%s), " + 
"DISTINCT_COUNT_RAW_CPC_SKETCH(%s) " + "FROM " + "("
+            + "SELECT %s FROM %s WHERE %s = 4 " + "UNION ALL " + "SELECT %s 
FROM %s WHERE %s = 5 " + "UNION ALL "
+            + "SELECT %s FROM %s WHERE %s = 6 " + "UNION ALL " + "SELECT %s 
FROM %s WHERE %s = 7 " + ")",
+        MET_CPC_SKETCH_BYTES, MET_CPC_SKETCH_BYTES, MET_CPC_SKETCH_BYTES, 
getTableName(), ID, MET_CPC_SKETCH_BYTES,
+        getTableName(), ID, MET_CPC_SKETCH_BYTES, getTableName(), ID, 
MET_CPC_SKETCH_BYTES, getTableName(), ID);
+    JsonNode jsonNode = postQuery(query);
+    long distinctCount = 
jsonNode.get("resultTable").get("rows").get(0).get(0).asLong();
+    byte[] rawSketchBytes = 
Base64.getDecoder().decode(jsonNode.get("resultTable").get("rows").get(0).get(1).asText());
+    CpcSketch deserializedSketch = 
ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.deserialize(rawSketchBytes);
+
+    assertTrue(distinctCount > 0);
+    assertEquals(Math.round(deserializedSketch.getEstimate()), distinctCount);
+  }
+
+  @Test(dataProvider = "useV2QueryEngine")
+  public void testJoinWithSketchQueries(boolean useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    String query = String.format(
+        "SELECT " + "DISTINCT_COUNT_CPC_SKETCH(a.%s), " + 
"DISTINCT_COUNT_RAW_CPC_SKETCH(a.%s), "
+            + "DISTINCT_COUNT_CPC_SKETCH(b.%s), " + 
"DISTINCT_COUNT_RAW_CPC_SKETCH(b.%s) " + "FROM "
+            + "(SELECT * FROM %s WHERE %s < 8 ) a " + "JOIN " + "(SELECT * 
FROM %s WHERE %s > 3 ) b "
+            + "ON a.%s = b.%s", MET_CPC_SKETCH_BYTES, MET_CPC_SKETCH_BYTES, 
MET_CPC_SKETCH_BYTES, MET_CPC_SKETCH_BYTES,
+        getTableName(), ID, getTableName(), ID, ID, ID);
+    JsonNode jsonNode = postQuery(query);
+    long distinctCount = 
jsonNode.get("resultTable").get("rows").get(0).get(0).asLong();
+    byte[] rawSketchBytes = 
Base64.getDecoder().decode(jsonNode.get("resultTable").get("rows").get(0).get(1).asText());
+    CpcSketch deserializedSketch = 
ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.deserialize(rawSketchBytes);
+    assertTrue(distinctCount > 0);
+    assertEquals(Math.round(deserializedSketch.getEstimate()), distinctCount);
+
+    distinctCount = 
jsonNode.get("resultTable").get("rows").get(0).get(2).asLong();
+    rawSketchBytes = 
Base64.getDecoder().decode(jsonNode.get("resultTable").get("rows").get(0).get(3).asText());
+    deserializedSketch = 
ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.deserialize(rawSketchBytes);
+    assertTrue(distinctCount > 0);
+    assertEquals(Math.round(deserializedSketch.getEstimate()), distinctCount);
+  }
+
+  @Override
+  public String getTableName() {
+    return DEFAULT_TABLE_NAME;
+  }
+
+  @Override
+  public Schema createSchema() {
+    return new 
Schema.SchemaBuilder().setSchemaName(getTableName()).addSingleValueDimension(ID,
 FieldSpec.DataType.INT)
+        .addMetric(MET_CPC_SKETCH_BYTES, FieldSpec.DataType.BYTES).build();
+  }
+
+  @Override
+  public File createAvroFile()
+      throws Exception {
+    // create avro schema
+    org.apache.avro.Schema avroSchema = 
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
+    avroSchema.setFields(ImmutableList.of(
+        new org.apache.avro.Schema.Field(ID, 
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT), null,
+            null), new org.apache.avro.Schema.Field(MET_CPC_SKETCH_BYTES,
+            org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BYTES), 
null, null)));
+
+    // create avro file
+    File avroFile = new File(_tempDir, "data.avro");
+    try (DataFileWriter<GenericData.Record> fileWriter = new 
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
+      fileWriter.create(avroSchema, avroFile);
+      for (int i = 0; i < getCountStarResult(); i++) {
+        // create avro record
+        GenericData.Record record = new GenericData.Record(avroSchema);
+        record.put(ID, RandomUtils.nextInt(10));
+        record.put(MET_CPC_SKETCH_BYTES, ByteBuffer.wrap(getRandomRawValue()));
+        // add avro record to file
+        fileWriter.append(record);
+      }
+    }
+    return avroFile;
+  }
+
+  private byte[] getRandomRawValue() {
+    CpcSketch sketch = new CpcSketch(4);
+    sketch.update(RANDOM.nextInt(100));
+    return ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.serialize(sketch);
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountCPCSketchValueAggregator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountCPCSketchValueAggregator.java
new file mode 100644
index 0000000000..7ac3090188
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountCPCSketchValueAggregator.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.aggregator;
+
+import java.util.List;
+import org.apache.datasketches.cpc.CpcSketch;
+import org.apache.datasketches.cpc.CpcUnion;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.segment.local.utils.CustomSerDeUtils;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+public class DistinctCountCPCSketchValueAggregator implements 
ValueAggregator<Object, CpcSketch> {
+  public static final DataType AGGREGATED_VALUE_TYPE = DataType.BYTES;
+
+  private final int _lgK;
+
+  private int _maxByteSize;
+
+  public DistinctCountCPCSketchValueAggregator(List<ExpressionContext> 
arguments) {
+    // length 1 means we use the Helix default
+    if (arguments.size() <= 1) {
+      _lgK = CommonConstants.Helix.DEFAULT_CPC_SKETCH_LGK;
+    } else {
+      _lgK = arguments.get(1).getLiteral().getIntValue();
+    }
+  }
+
+  @Override
+  public AggregationFunctionType getAggregationType() {
+    return AggregationFunctionType.DISTINCTCOUNTCPCSKETCH;
+  }
+
+  @Override
+  public DataType getAggregatedValueType() {
+    return AGGREGATED_VALUE_TYPE;
+  }
+
+  @Override
+  public CpcSketch getInitialAggregatedValue(Object rawValue) {
+    CpcSketch initialValue;
+    if (rawValue instanceof byte[]) { // Serialized Sketch
+      byte[] bytes = (byte[]) rawValue;
+      initialValue = deserializeAggregatedValue(bytes);
+      _maxByteSize = Math.max(_maxByteSize, bytes.length);
+    } else if (rawValue instanceof byte[][]) { // Multiple Serialized Sketches
+      byte[][] serializedSketches = (byte[][]) rawValue;
+      CpcUnion union = new CpcUnion(_lgK);
+      for (byte[] bytes : serializedSketches) {
+        union.update(deserializeAggregatedValue(bytes));
+      }
+      initialValue = union.getResult();
+      updateMaxByteSize(initialValue);
+    } else {
+      initialValue = empty();
+      addObjectToSketch(rawValue, initialValue);
+      updateMaxByteSize(initialValue);
+    }
+    return initialValue;
+  }
+
+  @Override
+  public CpcSketch applyRawValue(CpcSketch value, Object rawValue) {
+    if (rawValue instanceof byte[]) {
+      byte[] bytes = (byte[]) rawValue;
+      CpcSketch sketch = union(value, deserializeAggregatedValue(bytes));
+      updateMaxByteSize(sketch);
+      return sketch;
+    } else {
+      addObjectToSketch(rawValue, value);
+      updateMaxByteSize(value);
+      return value;
+    }
+  }
+
+  @Override
+  public CpcSketch applyAggregatedValue(CpcSketch value, CpcSketch 
aggregatedValue) {
+    CpcSketch result = union(value, aggregatedValue);
+    updateMaxByteSize(result);
+    return result;
+  }
+
+  @Override
+  public CpcSketch cloneAggregatedValue(CpcSketch value) {
+    return deserializeAggregatedValue(serializeAggregatedValue(value));
+  }
+
+  @Override
+  public int getMaxAggregatedValueByteSize() {
+    // NOTE: For aggregated metrics, initial aggregated value might have not 
been generated. Returns the byte size
+    //       based on lgK.
+    return _maxByteSize > 0 ? _maxByteSize : 
CpcSketch.getMaxSerializedBytes(_lgK);
+  }
+
+  @Override
+  public byte[] serializeAggregatedValue(CpcSketch value) {
+    return CustomSerDeUtils.DATA_SKETCH_CPC_SER_DE.serialize(value);
+  }
+
+  @Override
+  public CpcSketch deserializeAggregatedValue(byte[] bytes) {
+    return CustomSerDeUtils.DATA_SKETCH_CPC_SER_DE.deserialize(bytes);
+  }
+
+  private CpcSketch union(CpcSketch left, CpcSketch right) {
+    if (left == null && right == null) {
+      return empty();
+    } else if (left == null) {
+      return right;
+    } else if (right == null) {
+      return left;
+    }
+
+    CpcUnion union = new CpcUnion(_lgK);
+    union.update(left);
+    union.update(right);
+    return union.getResult();
+  }
+
+  private void addObjectToSketch(Object rawValue, CpcSketch sketch) {
+    if (rawValue instanceof String) {
+      sketch.update((String) rawValue);
+    } else if (rawValue instanceof Integer) {
+      sketch.update((Integer) rawValue);
+    } else if (rawValue instanceof Long) {
+      sketch.update((Long) rawValue);
+    } else if (rawValue instanceof Double) {
+      sketch.update((Double) rawValue);
+    } else if (rawValue instanceof Float) {
+      sketch.update((Float) rawValue);
+    } else if (rawValue instanceof Object[]) {
+      addObjectsToSketch((Object[]) rawValue, sketch);
+    } else {
+      throw new IllegalStateException(
+          "Unsupported data type for CPC Sketch aggregation: " + 
rawValue.getClass().getSimpleName());
+    }
+  }
+
+  private void addObjectsToSketch(Object[] rawValues, CpcSketch sketch) {
+    if (rawValues instanceof String[]) {
+      for (String s : (String[]) rawValues) {
+        sketch.update(s);
+      }
+    } else if (rawValues instanceof Integer[]) {
+      for (Integer i : (Integer[]) rawValues) {
+        sketch.update(i);
+      }
+    } else if (rawValues instanceof Long[]) {
+      for (Long l : (Long[]) rawValues) {
+        sketch.update(l);
+      }
+    } else if (rawValues instanceof Double[]) {
+      for (Double d : (Double[]) rawValues) {
+        sketch.update(d);
+      }
+    } else if (rawValues instanceof Float[]) {
+      for (Float f : (Float[]) rawValues) {
+        sketch.update(f);
+      }
+    } else {
+      throw new IllegalStateException(
+          "Unsupported data type for CPC Sketch aggregation: " + 
rawValues.getClass().getSimpleName());
+    }
+  }
+
+  private void updateMaxByteSize(CpcSketch sketch) {
+    if (sketch != null) {
+      _maxByteSize = Math.max(_maxByteSize, sketch.toByteArray().length);
+    }
+  }
+
+  private CpcSketch empty() {
+    return new CpcSketch(_lgK);
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountThetaSketchValueAggregator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountThetaSketchValueAggregator.java
index 9b2a428c3b..f36f9a00e9 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountThetaSketchValueAggregator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountThetaSketchValueAggregator.java
@@ -168,11 +168,11 @@ public class DistinctCountThetaSketchValueAggregator 
implements ValueAggregator<
 
   @Override
   public byte[] serializeAggregatedValue(Sketch value) {
-    return CustomSerDeUtils.DATA_SKETCH_SER_DE.serialize(value);
+    return CustomSerDeUtils.DATA_SKETCH_THETA_SER_DE.serialize(value);
   }
 
   @Override
   public Sketch deserializeAggregatedValue(byte[] bytes) {
-    return CustomSerDeUtils.DATA_SKETCH_SER_DE.deserialize(bytes);
+    return CustomSerDeUtils.DATA_SKETCH_THETA_SER_DE.deserialize(bytes);
   }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java
index 16dc0328f7..18c5ac96f6 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java
@@ -78,6 +78,9 @@ public class ValueAggregatorFactory {
       case AVGVALUEINTEGERSUMTUPLESKETCH:
       case SUMVALUESINTEGERSUMTUPLESKETCH:
         return new IntegerTupleSketchValueAggregator(IntegerSummary.Mode.Sum);
+      case DISTINCTCOUNTCPCSKETCH:
+      case DISTINCTCOUNTRAWCPCSKETCH:
+        return new DistinctCountCPCSketchValueAggregator(arguments);
       default:
         throw new IllegalStateException("Unsupported aggregation type: " + 
aggregationType);
     }
@@ -127,6 +130,9 @@ public class ValueAggregatorFactory {
       case AVGVALUEINTEGERSUMTUPLESKETCH:
       case SUMVALUESINTEGERSUMTUPLESKETCH:
         return IntegerTupleSketchValueAggregator.AGGREGATED_VALUE_TYPE;
+      case DISTINCTCOUNTCPCSKETCH:
+      case DISTINCTCOUNTRAWCPCSKETCH:
+        return DistinctCountCPCSketchValueAggregator.AGGREGATED_VALUE_TYPE;
       default:
         throw new IllegalStateException("Unsupported aggregation type: " + 
aggregationType);
     }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedCPCSketch.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedCPCSketch.java
new file mode 100644
index 0000000000..52dbf4f414
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedCPCSketch.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.customobject;
+
+import java.util.Base64;
+import org.apache.datasketches.cpc.CpcSketch;
+
+
+/**
+ * Serialized and comparable version of CPC Sketch.
+ * Ordering is defined by the cardinality estimate and not the size
+ * of the underlying sketch.
+ */
+public class SerializedCPCSketch implements Comparable<SerializedCPCSketch> {
+  private final CpcSketch _sketch;
+
+  public SerializedCPCSketch(CpcSketch sketch) {
+    _sketch = sketch;
+  }
+
+  @Override
+  public int compareTo(SerializedCPCSketch other) {
+    return Double.compare(_sketch.getEstimate(), other._sketch.getEstimate());
+  }
+
+  @Override
+  public String toString() {
+    return Base64.getEncoder().encodeToString(_sketch.toByteArray());
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/CustomSerDeUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/CustomSerDeUtils.java
index f5a6275a3b..b1515c5b41 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/CustomSerDeUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/CustomSerDeUtils.java
@@ -25,6 +25,7 @@ import com.tdunning.math.stats.MergingDigest;
 import com.tdunning.math.stats.TDigest;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import org.apache.datasketches.cpc.CpcSketch;
 import org.apache.datasketches.memory.Memory;
 import org.apache.datasketches.theta.Sketch;
 import org.apache.datasketches.tuple.aninteger.IntegerSummary;
@@ -237,7 +238,7 @@ public class CustomSerDeUtils {
     }
   };
 
-  public static final ObjectSerDe<Sketch> DATA_SKETCH_SER_DE = new 
ObjectSerDe<Sketch>() {
+  public static final ObjectSerDe<Sketch> DATA_SKETCH_THETA_SER_DE = new 
ObjectSerDe<Sketch>() {
 
     @Override
     public byte[] serialize(Sketch value) {
@@ -281,6 +282,26 @@ public class CustomSerDeUtils {
         }
       };
 
+  public static final ObjectSerDe<CpcSketch> DATA_SKETCH_CPC_SER_DE = new 
ObjectSerDe<CpcSketch>() {
+
+    @Override
+    public byte[] serialize(CpcSketch value) {
+      return value.toByteArray();
+    }
+
+    @Override
+    public CpcSketch deserialize(byte[] bytes) {
+      return CpcSketch.heapify(Memory.wrap(bytes));
+    }
+
+    @Override
+    public CpcSketch deserialize(ByteBuffer byteBuffer) {
+      byte[] bytes = new byte[byteBuffer.remaining()];
+      byteBuffer.get(bytes);
+      return CpcSketch.heapify(Memory.wrap(bytes));
+    }
+  };
+
   public static final ObjectSerDe<RoaringBitmap> ROARING_BITMAP_SER_DE = new 
ObjectSerDe<RoaringBitmap>() {
 
     @Override
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 71e47c7c62..5065d1b186 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -213,8 +213,7 @@ public final class TableConfigUtils {
     String rawTableName = 
TableNameBuilder.extractRawTableName(tableConfig.getTableName());
     String schemaName = tableConfig.getValidationConfig().getSchemaName();
     if (schemaName != null && !schemaName.equals(rawTableName)) {
-      throw new IllegalStateException(
-          "Schema name: " + schemaName + " does not match table name: " + 
rawTableName);
+      throw new IllegalStateException("Schema name: " + schemaName + " does 
not match table name: " + rawTableName);
     }
   }
 
@@ -576,7 +575,8 @@ public final class TableConfigUtils {
   public final static EnumSet<AggregationFunctionType> 
AVAILABLE_CORE_VALUE_AGGREGATORS =
       EnumSet.of(MIN, MAX, SUM, DISTINCTCOUNTHLL, DISTINCTCOUNTRAWHLL, 
DISTINCTCOUNTTHETASKETCH,
           DISTINCTCOUNTRAWTHETASKETCH, DISTINCTCOUNTTUPLESKETCH, 
DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH,
-          SUMVALUESINTEGERSUMTUPLESKETCH, AVGVALUEINTEGERSUMTUPLESKETCH, 
DISTINCTCOUNTHLLPLUS, DISTINCTCOUNTRAWHLLPLUS);
+          SUMVALUESINTEGERSUMTUPLESKETCH, AVGVALUEINTEGERSUMTUPLESKETCH, 
DISTINCTCOUNTHLLPLUS, DISTINCTCOUNTRAWHLLPLUS,
+          DISTINCTCOUNTCPCSKETCH, DISTINCTCOUNTRAWCPCSKETCH);
 
   @VisibleForTesting
   static void validateTaskConfigs(TableConfig tableConfig, Schema schema) {
@@ -626,8 +626,8 @@ public final class TableConfigUtils {
                   throw new IllegalArgumentException("ValueAggregator not 
enabled for type: " + aft.toString());
                 }
               } catch (IllegalArgumentException e) {
-                String err = String.format(
-                    "Column \"%s\" has invalid aggregate type: %s", 
entry.getKey(), entry.getValue());
+                String err =
+                    String.format("Column \"%s\" has invalid aggregate type: 
%s", entry.getKey(), entry.getValue());
                 throw new IllegalStateException(err);
               }
             }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/DistinctCountCPCSketchValueAggregatorTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/DistinctCountCPCSketchValueAggregatorTest.java
new file mode 100644
index 0000000000..b8dcb701f5
--- /dev/null
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/DistinctCountCPCSketchValueAggregatorTest.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.aggregator;
+
+import java.util.Collections;
+import java.util.stream.IntStream;
+import org.apache.datasketches.cpc.CpcSketch;
+import org.apache.datasketches.cpc.CpcUnion;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertThrows;
+
+
+public class DistinctCountCPCSketchValueAggregatorTest {
+
+  @Test
+  public void initialShouldCreateSingleItemSketch() {
+    DistinctCountCPCSketchValueAggregator agg = new 
DistinctCountCPCSketchValueAggregator(Collections.emptyList());
+    assertEquals(agg.getInitialAggregatedValue("hello world").getEstimate(), 
1.0);
+  }
+
+  @Test
+  public void initialShouldParseASketch() {
+    CpcSketch input = new CpcSketch();
+    IntStream.range(0, 1000).forEach(input::update);
+    DistinctCountCPCSketchValueAggregator agg = new 
DistinctCountCPCSketchValueAggregator(Collections.emptyList());
+    byte[] bytes = agg.serializeAggregatedValue(input);
+    assertEquals(agg.getInitialAggregatedValue(bytes).getEstimate(), 
input.getEstimate());
+
+    // and should update the max size
+    assertEquals(agg.getMaxAggregatedValueByteSize(), 
input.toByteArray().length);
+  }
+
+  @Test
+  public void initialShouldParseMultiValueSketches() {
+    CpcSketch input1 = new CpcSketch();
+    input1.update("hello");
+    CpcSketch input2 = new CpcSketch();
+    input2.update("world");
+    DistinctCountCPCSketchValueAggregator agg = new 
DistinctCountCPCSketchValueAggregator(Collections.emptyList());
+    byte[][] bytes = {agg.serializeAggregatedValue(input1), 
agg.serializeAggregatedValue(input2)};
+    
assertEquals(Math.round(agg.getInitialAggregatedValue(bytes).getEstimate()), 2);
+  }
+
+  @Test
+  public void applyAggregatedValueShouldUnion() {
+    CpcSketch input1 = new CpcSketch();
+    IntStream.range(0, 1000).forEach(input1::update);
+    CpcSketch input2 = new CpcSketch();
+    IntStream.range(0, 1000).forEach(input2::update);
+    DistinctCountCPCSketchValueAggregator agg = new 
DistinctCountCPCSketchValueAggregator(Collections.emptyList());
+    CpcSketch result = agg.applyAggregatedValue(input1, input2);
+
+    CpcUnion union = new 
CpcUnion(CommonConstants.Helix.DEFAULT_CPC_SKETCH_LGK);
+    union.update(input1);
+    union.update(input2);
+    CpcSketch merged = union.getResult();
+
+    assertEquals(result.getEstimate(), merged.getEstimate());
+
+    // and should update the max size
+    assertEquals(agg.getMaxAggregatedValueByteSize(), 
merged.toByteArray().length);
+  }
+
+  @Test
+  public void applyRawValueShouldUnion() {
+    CpcSketch input1 = new CpcSketch();
+    IntStream.range(0, 1000).forEach(input1::update);
+    CpcSketch input2 = new CpcSketch();
+    IntStream.range(0, 1000).forEach(input2::update);
+    DistinctCountCPCSketchValueAggregator agg = new 
DistinctCountCPCSketchValueAggregator(Collections.emptyList());
+    byte[] result2bytes = agg.serializeAggregatedValue(input2);
+    CpcSketch result = agg.applyRawValue(input1, result2bytes);
+
+    CpcUnion union = new 
CpcUnion(CommonConstants.Helix.DEFAULT_CPC_SKETCH_LGK);
+    union.update(input1);
+    union.update(input2);
+    CpcSketch merged = union.getResult();
+
+    assertEquals(result.getEstimate(), merged.getEstimate());
+
+    // and should update the max size
+    assertEquals(agg.getMaxAggregatedValueByteSize(), 
merged.toByteArray().length);
+  }
+
+  @Test
+  public void applyRawValueShouldAdd() {
+    CpcSketch input1 = new CpcSketch();
+    input1.update("hello".hashCode());
+    DistinctCountCPCSketchValueAggregator agg = new 
DistinctCountCPCSketchValueAggregator(Collections.emptyList());
+    CpcSketch result = agg.applyRawValue(input1, "world");
+    assertEquals(Math.round(result.getEstimate()), 2);
+
+    CpcSketch pristine = new CpcSketch();
+    pristine.update("hello");
+    pristine.update("world");
+    assertEquals(agg.getMaxAggregatedValueByteSize(), 
pristine.toByteArray().length);
+  }
+
+  @Test
+  public void applyRawValueShouldSupportMultiValue() {
+    CpcSketch input1 = new CpcSketch();
+    input1.update("hello");
+    DistinctCountCPCSketchValueAggregator agg = new 
DistinctCountCPCSketchValueAggregator(Collections.emptyList());
+    String[] strings = {"hello", "world", "this", "is", "some", "strings"};
+    CpcSketch result = agg.applyRawValue(input1, strings);
+
+    assertEquals(Math.round(result.getEstimate()), 6);
+
+    CpcSketch pristine = new CpcSketch();
+    for (String value : strings) {
+      pristine.update(value);
+    }
+    assertEquals(agg.getMaxAggregatedValueByteSize(), 
pristine.toByteArray().length);
+  }
+
+  @Test
+  public void getInitialValueShouldSupportDifferentTypes() {
+    DistinctCountCPCSketchValueAggregator agg = new 
DistinctCountCPCSketchValueAggregator(Collections.emptyList());
+    assertEquals(agg.getInitialAggregatedValue(12345).getEstimate(), 1.0);
+    assertEquals(agg.getInitialAggregatedValue(12345L).getEstimate(), 1.0);
+    assertEquals(agg.getInitialAggregatedValue(12.345f).getEstimate(), 1.0);
+    assertEquals(agg.getInitialAggregatedValue(12.345d).getEstimate(), 1.0);
+    assertThrows(() -> agg.getInitialAggregatedValue(new Object()));
+  }
+
+  @Test
+  public void getInitialValueShouldSupportMultiValueTypes() {
+    DistinctCountCPCSketchValueAggregator agg = new 
DistinctCountCPCSketchValueAggregator(Collections.emptyList());
+    Integer[] ints = {12345};
+    assertEquals(agg.getInitialAggregatedValue(ints).getEstimate(), 1.0);
+    Long[] longs = {12345L};
+    assertEquals(agg.getInitialAggregatedValue(longs).getEstimate(), 1.0);
+    Float[] floats = {12.345f};
+    assertEquals(agg.getInitialAggregatedValue(floats).getEstimate(), 1.0);
+    Double[] doubles = {12.345d};
+    assertEquals(agg.getInitialAggregatedValue(doubles).getEstimate(), 1.0);
+    Object[] objects = {new Object()};
+    assertThrows(() -> agg.getInitialAggregatedValue(objects));
+    byte[][] zeroSketches = {};
+    assertEquals(agg.getInitialAggregatedValue(zeroSketches).getEstimate(), 
0.0);
+  }
+}
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
index b9aaf8ee86..5a7c127e54 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
@@ -69,17 +69,14 @@ public enum AggregationFunctionType {
   FIRSTWITHTIME("firstWithTime", null, SqlKind.OTHER_FUNCTION, 
SqlFunctionCategory.USER_DEFINED_FUNCTION,
       OperandTypes.or(
           OperandTypes.family(ImmutableList.of(SqlTypeFamily.ANY, 
SqlTypeFamily.TIMESTAMP, SqlTypeFamily.CHARACTER)),
-          OperandTypes.family(ImmutableList.of(SqlTypeFamily.ANY, 
SqlTypeFamily.NUMERIC, SqlTypeFamily.CHARACTER))
-      ),
+          OperandTypes.family(ImmutableList.of(SqlTypeFamily.ANY, 
SqlTypeFamily.NUMERIC, SqlTypeFamily.CHARACTER))),
       ReturnTypes.ARG0, ReturnTypes.explicit(SqlTypeName.OTHER)),
-  LASTWITHTIME("lastWithTime", null, SqlKind.OTHER_FUNCTION, 
SqlFunctionCategory.USER_DEFINED_FUNCTION,
-      OperandTypes.or(
-          OperandTypes.family(ImmutableList.of(SqlTypeFamily.ANY, 
SqlTypeFamily.TIMESTAMP, SqlTypeFamily.CHARACTER)),
-          OperandTypes.family(ImmutableList.of(SqlTypeFamily.ANY, 
SqlTypeFamily.NUMERIC, SqlTypeFamily.CHARACTER))
-      ),
+  LASTWITHTIME("lastWithTime", null, SqlKind.OTHER_FUNCTION, 
SqlFunctionCategory.USER_DEFINED_FUNCTION, OperandTypes.or(
+      OperandTypes.family(ImmutableList.of(SqlTypeFamily.ANY, 
SqlTypeFamily.TIMESTAMP, SqlTypeFamily.CHARACTER)),
+      OperandTypes.family(ImmutableList.of(SqlTypeFamily.ANY, 
SqlTypeFamily.NUMERIC, SqlTypeFamily.CHARACTER))),
+      ReturnTypes.ARG0, ReturnTypes.explicit(SqlTypeName.OTHER)),
+  MINMAXRANGE("minMaxRange", null, SqlKind.OTHER_FUNCTION, 
SqlFunctionCategory.NUMERIC, OperandTypes.NUMERIC,
       ReturnTypes.ARG0, ReturnTypes.explicit(SqlTypeName.OTHER)),
-  MINMAXRANGE("minMaxRange", null, SqlKind.OTHER_FUNCTION, 
SqlFunctionCategory.NUMERIC,
-      OperandTypes.NUMERIC, ReturnTypes.ARG0, 
ReturnTypes.explicit(SqlTypeName.OTHER)),
   /**
    * for all distinct count family functions:
    * (1) distinct_count only supports single argument;
@@ -107,20 +104,19 @@ public enum AggregationFunctionType {
       OperandTypes.family(ImmutableList.of(SqlTypeFamily.ANY, 
SqlTypeFamily.CHARACTER), ordinal -> ordinal > 0),
       ReturnTypes.BIGINT, ReturnTypes.explicit(SqlTypeName.OTHER)),
   // DEPRECATED in v2
-  @Deprecated
-  FASTHLL("fastHLL"),
-  DISTINCTCOUNTTHETASKETCH("distinctCountThetaSketch", null,
-      SqlKind.OTHER_FUNCTION, SqlFunctionCategory.USER_DEFINED_FUNCTION,
+  @Deprecated FASTHLL("fastHLL"),
+  DISTINCTCOUNTTHETASKETCH("distinctCountThetaSketch", null, 
SqlKind.OTHER_FUNCTION,
+      SqlFunctionCategory.USER_DEFINED_FUNCTION,
       OperandTypes.family(ImmutableList.of(SqlTypeFamily.ANY, 
SqlTypeFamily.CHARACTER), ordinal -> ordinal > 0),
       ReturnTypes.BIGINT, ReturnTypes.explicit(SqlTypeName.OTHER)),
-  DISTINCTCOUNTRAWTHETASKETCH("distinctCountRawThetaSketch", null,
-      SqlKind.OTHER_FUNCTION, SqlFunctionCategory.USER_DEFINED_FUNCTION,
+  DISTINCTCOUNTRAWTHETASKETCH("distinctCountRawThetaSketch", null, 
SqlKind.OTHER_FUNCTION,
+      SqlFunctionCategory.USER_DEFINED_FUNCTION,
       OperandTypes.family(ImmutableList.of(SqlTypeFamily.ANY, 
SqlTypeFamily.CHARACTER), ordinal -> ordinal > 0),
       ReturnTypes.VARCHAR_2000, ReturnTypes.explicit(SqlTypeName.OTHER)),
-  DISTINCTSUM("distinctSum", null, SqlKind.OTHER_FUNCTION, 
SqlFunctionCategory.NUMERIC,
-      OperandTypes.NUMERIC, ReturnTypes.AGG_SUM, 
ReturnTypes.explicit(SqlTypeName.OTHER)),
-  DISTINCTAVG("distinctAvg", null, SqlKind.OTHER_FUNCTION, 
SqlFunctionCategory.NUMERIC,
-      OperandTypes.NUMERIC, ReturnTypes.explicit(SqlTypeName.DOUBLE), 
ReturnTypes.explicit(SqlTypeName.OTHER)),
+  DISTINCTSUM("distinctSum", null, SqlKind.OTHER_FUNCTION, 
SqlFunctionCategory.NUMERIC, OperandTypes.NUMERIC,
+      ReturnTypes.AGG_SUM, ReturnTypes.explicit(SqlTypeName.OTHER)),
+  DISTINCTAVG("distinctAvg", null, SqlKind.OTHER_FUNCTION, 
SqlFunctionCategory.NUMERIC, OperandTypes.NUMERIC,
+      ReturnTypes.explicit(SqlTypeName.DOUBLE), 
ReturnTypes.explicit(SqlTypeName.OTHER)),
 
   PERCENTILE("percentile", null, SqlKind.OTHER_FUNCTION, 
SqlFunctionCategory.USER_DEFINED_FUNCTION,
       OperandTypes.family(ImmutableList.of(SqlTypeFamily.NUMERIC, 
SqlTypeFamily.NUMERIC)), ReturnTypes.ARG0,
@@ -138,8 +134,7 @@ public enum AggregationFunctionType {
       OperandTypes.family(ImmutableList.of(SqlTypeFamily.NUMERIC, 
SqlTypeFamily.NUMERIC)), ReturnTypes.VARCHAR_2000,
       ReturnTypes.explicit(SqlTypeName.OTHER)),
   // DEPRECATED in v2
-  @Deprecated
-  PERCENTILESMARTTDIGEST("percentileSmartTDigest"),
+  @Deprecated PERCENTILESMARTTDIGEST("percentileSmartTDigest"),
   PERCENTILEKLL("percentileKLL", null, SqlKind.OTHER_FUNCTION, 
SqlFunctionCategory.USER_DEFINED_FUNCTION,
       OperandTypes.family(ImmutableList.of(SqlTypeFamily.NUMERIC, 
SqlTypeFamily.NUMERIC)), ReturnTypes.ARG0,
       ReturnTypes.explicit(SqlTypeName.OTHER)),
@@ -157,8 +152,7 @@ public enum AggregationFunctionType {
       ReturnTypes.VARCHAR_2000, ReturnTypes.explicit(SqlTypeName.OTHER)),
 
   // DEPRECATED in v2
-  @Deprecated
-  IDSET("idSet"),
+  @Deprecated IDSET("idSet"),
 
   // TODO: support histogram requires solving ARRAY constructor and 
multi-function signature without optional ordinal
   HISTOGRAM("histogram"),
@@ -179,10 +173,10 @@ public enum AggregationFunctionType {
       OperandTypes.NUMERIC, ReturnTypes.DOUBLE, 
ReturnTypes.explicit(SqlTypeName.OTHER)),
   STDDEVSAMP("stdDevSamp", Collections.emptyList(), SqlKind.OTHER_FUNCTION, 
SqlFunctionCategory.USER_DEFINED_FUNCTION,
       OperandTypes.NUMERIC, ReturnTypes.DOUBLE, 
ReturnTypes.explicit(SqlTypeName.OTHER)),
-  SKEWNESS("skewness", null, SqlKind.OTHER_FUNCTION, 
SqlFunctionCategory.USER_DEFINED_FUNCTION,
-      OperandTypes.NUMERIC, ReturnTypes.DOUBLE, 
ReturnTypes.explicit(SqlTypeName.OTHER)),
-  KURTOSIS("kurtosis", null, SqlKind.OTHER_FUNCTION, 
SqlFunctionCategory.USER_DEFINED_FUNCTION,
-      OperandTypes.NUMERIC, ReturnTypes.DOUBLE, 
ReturnTypes.explicit(SqlTypeName.OTHER)),
+  SKEWNESS("skewness", null, SqlKind.OTHER_FUNCTION, 
SqlFunctionCategory.USER_DEFINED_FUNCTION, OperandTypes.NUMERIC,
+      ReturnTypes.DOUBLE, ReturnTypes.explicit(SqlTypeName.OTHER)),
+  KURTOSIS("kurtosis", null, SqlKind.OTHER_FUNCTION, 
SqlFunctionCategory.USER_DEFINED_FUNCTION, OperandTypes.NUMERIC,
+      ReturnTypes.DOUBLE, ReturnTypes.explicit(SqlTypeName.OTHER)),
   FOURTHMOMENT("fourthMoment"),
 
   // DataSketches Tuple Sketch support
@@ -204,9 +198,20 @@ public enum AggregationFunctionType {
       SqlKind.OTHER_FUNCTION, SqlFunctionCategory.USER_DEFINED_FUNCTION, 
OperandTypes.BINARY, ReturnTypes.BIGINT,
       ReturnTypes.explicit(SqlTypeName.OTHER)),
 
+  // Datasketches Frequent Items support
   FREQUENTSTRINGSSKETCH("frequentStringsSketch"),
   FREQUENTLONGSSKETCH("frequentLongsSketch"),
 
+  // Datasketches CPC Sketch support
+  DISTINCTCOUNTCPCSKETCH("distinctCountCPCSketch", 
ImmutableList.of("DISTINCT_COUNT_CPC_SKETCH"),
+      SqlKind.OTHER_FUNCTION, SqlFunctionCategory.USER_DEFINED_FUNCTION,
+      OperandTypes.family(ImmutableList.of(SqlTypeFamily.ANY, 
SqlTypeFamily.CHARACTER), ordinal -> ordinal > 0),
+      ReturnTypes.BIGINT, ReturnTypes.explicit(SqlTypeName.OTHER)),
+  DISTINCTCOUNTRAWCPCSKETCH("distinctCountRawCPCSketch", 
ImmutableList.of("DISTINCT_COUNT_RAW_CPC_SKETCH"),
+      SqlKind.OTHER_FUNCTION, SqlFunctionCategory.USER_DEFINED_FUNCTION,
+      OperandTypes.family(ImmutableList.of(SqlTypeFamily.ANY, 
SqlTypeFamily.CHARACTER), ordinal -> ordinal > 0),
+      ReturnTypes.VARCHAR_2000, ReturnTypes.explicit(SqlTypeName.OTHER)),
+
   // Geo aggregation functions
   STUNION("STUnion", ImmutableList.of("ST_UNION"), SqlKind.OTHER_FUNCTION, 
SqlFunctionCategory.USER_DEFINED_FUNCTION,
       OperandTypes.BINARY, ReturnTypes.explicit(SqlTypeName.VARBINARY), 
ReturnTypes.explicit(SqlTypeName.OTHER)),
@@ -271,35 +276,35 @@ public enum AggregationFunctionType {
       ReturnTypes.explicit(SqlTypeName.OTHER)),
 
   // boolean aggregate functions
-  BOOLAND("boolAnd", null, SqlKind.OTHER_FUNCTION, 
SqlFunctionCategory.USER_DEFINED_FUNCTION,
-      OperandTypes.BOOLEAN, ReturnTypes.BOOLEAN, 
ReturnTypes.explicit(SqlTypeName.INTEGER)),
-  BOOLOR("boolOr", null, SqlKind.OTHER_FUNCTION, 
SqlFunctionCategory.USER_DEFINED_FUNCTION,
-      OperandTypes.BOOLEAN, ReturnTypes.BOOLEAN, 
ReturnTypes.explicit(SqlTypeName.INTEGER)),
+  BOOLAND("boolAnd", null, SqlKind.OTHER_FUNCTION, 
SqlFunctionCategory.USER_DEFINED_FUNCTION, OperandTypes.BOOLEAN,
+      ReturnTypes.BOOLEAN, ReturnTypes.explicit(SqlTypeName.INTEGER)),
+  BOOLOR("boolOr", null, SqlKind.OTHER_FUNCTION, 
SqlFunctionCategory.USER_DEFINED_FUNCTION, OperandTypes.BOOLEAN,
+      ReturnTypes.BOOLEAN, ReturnTypes.explicit(SqlTypeName.INTEGER)),
 
   // ExprMin and ExprMax
   // TODO: revisit support for ExprMin/Max count in V2, particularly plug 
query rewriter in the right place
   EXPRMIN("exprMin", null, SqlKind.OTHER_FUNCTION, 
SqlFunctionCategory.USER_DEFINED_FUNCTION,
-      OperandTypes.family(ImmutableList.of(SqlTypeFamily.ANY), ordinal -> 
ordinal > 1),
-      ReturnTypes.ARG0, ReturnTypes.explicit(SqlTypeName.OTHER)),
+      OperandTypes.family(ImmutableList.of(SqlTypeFamily.ANY), ordinal -> 
ordinal > 1), ReturnTypes.ARG0,
+      ReturnTypes.explicit(SqlTypeName.OTHER)),
   EXPRMAX("exprMax", null, SqlKind.OTHER_FUNCTION, 
SqlFunctionCategory.USER_DEFINED_FUNCTION,
-      OperandTypes.family(ImmutableList.of(SqlTypeFamily.ANY), ordinal -> 
ordinal > 1),
-      ReturnTypes.ARG0, ReturnTypes.explicit(SqlTypeName.OTHER)),
+      OperandTypes.family(ImmutableList.of(SqlTypeFamily.ANY), ordinal -> 
ordinal > 1), ReturnTypes.ARG0,
+      ReturnTypes.explicit(SqlTypeName.OTHER)),
 
-  
PARENTEXPRMIN(CommonConstants.RewriterConstants.PARENT_AGGREGATION_NAME_PREFIX 
+ EXPRMIN.getName(),
-      null, SqlKind.OTHER_FUNCTION, SqlFunctionCategory.USER_DEFINED_FUNCTION,
+  
PARENTEXPRMIN(CommonConstants.RewriterConstants.PARENT_AGGREGATION_NAME_PREFIX 
+ EXPRMIN.getName(), null,
+      SqlKind.OTHER_FUNCTION, SqlFunctionCategory.USER_DEFINED_FUNCTION,
       OperandTypes.family(ImmutableList.of(SqlTypeFamily.INTEGER, 
SqlTypeFamily.ANY), ordinal -> ordinal > 2),
       ReturnTypes.explicit(SqlTypeName.OTHER), 
ReturnTypes.explicit(SqlTypeName.OTHER)),
-  
PARENTEXPRMAX(CommonConstants.RewriterConstants.PARENT_AGGREGATION_NAME_PREFIX 
+ EXPRMAX.getName(),
-      null, SqlKind.OTHER_FUNCTION, SqlFunctionCategory.USER_DEFINED_FUNCTION,
+  
PARENTEXPRMAX(CommonConstants.RewriterConstants.PARENT_AGGREGATION_NAME_PREFIX 
+ EXPRMAX.getName(), null,
+      SqlKind.OTHER_FUNCTION, SqlFunctionCategory.USER_DEFINED_FUNCTION,
       OperandTypes.family(ImmutableList.of(SqlTypeFamily.INTEGER, 
SqlTypeFamily.ANY), ordinal -> ordinal > 2),
       ReturnTypes.explicit(SqlTypeName.OTHER), 
ReturnTypes.explicit(SqlTypeName.OTHER)),
 
-  CHILDEXPRMIN(CommonConstants.RewriterConstants.CHILD_AGGREGATION_NAME_PREFIX 
+ EXPRMIN.getName(),
-      null, SqlKind.OTHER_FUNCTION, SqlFunctionCategory.USER_DEFINED_FUNCTION,
+  CHILDEXPRMIN(CommonConstants.RewriterConstants.CHILD_AGGREGATION_NAME_PREFIX 
+ EXPRMIN.getName(), null,
+      SqlKind.OTHER_FUNCTION, SqlFunctionCategory.USER_DEFINED_FUNCTION,
       OperandTypes.family(ImmutableList.of(SqlTypeFamily.INTEGER, 
SqlTypeFamily.ANY), ordinal -> ordinal > 3),
       ReturnTypes.ARG1, ReturnTypes.explicit(SqlTypeName.OTHER)),
-  CHILDEXPRMAX(CommonConstants.RewriterConstants.CHILD_AGGREGATION_NAME_PREFIX 
+ EXPRMAX.getName(),
-      null, SqlKind.OTHER_FUNCTION, SqlFunctionCategory.USER_DEFINED_FUNCTION,
+  CHILDEXPRMAX(CommonConstants.RewriterConstants.CHILD_AGGREGATION_NAME_PREFIX 
+ EXPRMAX.getName(), null,
+      SqlKind.OTHER_FUNCTION, SqlFunctionCategory.USER_DEFINED_FUNCTION,
       OperandTypes.family(ImmutableList.of(SqlTypeFamily.INTEGER, 
SqlTypeFamily.ANY), ordinal -> ordinal > 3),
       ReturnTypes.ARG1, ReturnTypes.explicit(SqlTypeName.OTHER)),
 
@@ -307,8 +312,9 @@ public enum AggregationFunctionType {
   // TODO: revisit support for funnel count in V2
   FUNNELCOUNT("funnelCount");
 
-  private static final Set<String> NAMES = 
Arrays.stream(values()).flatMap(func -> Stream.of(func.name(),
-      func.getName(), 
func.getName().toLowerCase())).collect(Collectors.toSet());
+  private static final Set<String> NAMES =
+      Arrays.stream(values()).flatMap(func -> Stream.of(func.name(), 
func.getName(), func.getName().toLowerCase()))
+          .collect(Collectors.toSet());
 
   // --------------------------------------------------------------------------
   // Function signature used by Calcite.
@@ -373,8 +379,7 @@ public enum AggregationFunctionType {
 
     _returnTypeInference = finalReturnType;
     _operandTypeChecker = operandTypeChecker;
-    _intermediateReturnTypeInference = intermediateReturnType == null ? 
_returnTypeInference
-        : intermediateReturnType;
+    _intermediateReturnTypeInference = intermediateReturnType == null ? 
_returnTypeInference : intermediateReturnType;
   }
 
   public String getName() {
@@ -427,9 +432,7 @@ public enum AggregationFunctionType {
 
   public static String getUnderscoreSplitAggregationFunctionName(String 
functionName) {
     // Skip functions that have numbers for now and return their name as is
-    return functionName.matches(".*\\d.*")
-        ? functionName
-        : functionName.replaceAll("(.)(\\p{Upper}+|\\d+)", "$1_$2");
+    return functionName.matches(".*\\d.*") ? functionName : 
functionName.replaceAll("(.)(\\p{Upper}+|\\d+)", "$1_$2");
   }
 
   /**
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 303ad6c716..b5064fad3c 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -104,6 +104,8 @@ public class CommonConstants {
 
     public static final int DEFAULT_TUPLE_SKETCH_LGK = 16;
 
+    public static final int DEFAULT_CPC_SKETCH_LGK = 12;
+
     // Whether to rewrite DistinctCount to DistinctCountBitmap
     public static final String ENABLE_DISTINCT_COUNT_BITMAP_OVERRIDE_KEY = 
"enable.distinct.count.bitmap.override";
 
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/SegmentDumpTool.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/SegmentDumpTool.java
index 361eb0b61c..ae42127d7a 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/SegmentDumpTool.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/SegmentDumpTool.java
@@ -40,8 +40,8 @@ import picocli.CommandLine;
 
 @CommandLine.Command
 public class SegmentDumpTool extends AbstractBaseCommand implements Command {
-  @CommandLine.Option(names = {"-path"}, required = true,
-      description = "Path of the folder containing the segment" + " file")
+  @CommandLine.Option(names = {"-path"}, required = true, description = "Path 
of the folder containing the segment"
+      + " file")
   private String _segmentDir = null;
 
   @CommandLine.Option(names = {"-columns"}, arity = "1..*", description = 
"Columns to dump")
@@ -50,8 +50,8 @@ public class SegmentDumpTool extends AbstractBaseCommand 
implements Command {
   @CommandLine.Option(names = {"-dumpStarTree"})
   private boolean _dumpStarTree = false;
 
-  @CommandLine.Option(names = {"-help", "-h", "--h", "--help"}, required = 
false, usageHelp = true,
-      description = "Print this message.")
+  @CommandLine.Option(names = {"-help", "-h", "--h", "--help"}, required = 
false, usageHelp = true, description =
+      "Print this message.")
   private boolean _help = false;
 
   private void dump()
@@ -94,6 +94,15 @@ public class SegmentDumpTool extends AbstractBaseCommand 
implements Command {
     }
   }
 
+  // Adds custom output formatting depending on the type of value
+  private void printRowValue(Object value) {
+    if (value instanceof byte[]) {
+      System.out.printf("%s bytes", ((byte[]) value).length);
+    } else {
+      System.out.print(value);
+    }
+  }
+
   private void dumpRows(PinotSegmentRecordReader reader, GenericRow reuse, 
Set<String> mvColumns) {
     int docId = 0;
 
@@ -103,7 +112,7 @@ public class SegmentDumpTool extends AbstractBaseCommand 
implements Command {
 
       for (String columnName : _columnNames) {
         if (!mvColumns.contains(columnName)) {
-          System.out.print(row.getValue(columnName));
+          printRowValue(row.getValue(columnName));
           System.out.print("\t");
         } else {
           Object[] values = (Object[]) row.getValue(columnName);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to