This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch cluster_config_log2m
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit fec0e54e7622770cbf0ea9ce9fa63e610963037a
Author: Xiang Fu <[email protected]>
AuthorDate: Sun Jun 14 04:26:57 2020 -0700

    Adding support to configure log2m value for hyperloglog
---
 .../broker/broker/helix/HelixBrokerStarter.java    | 10 +++-
 .../requesthandler/BaseBrokerRequestHandler.java   | 62 ++++++++++++++++++++++
 .../apache/pinot/common/utils/CommonConstants.java |  3 ++
 .../helix/core/util/HelixSetupUtils.java           |  1 +
 .../pinot/controller/helix/ControllerTest.java     |  2 +
 .../DistinctCountHLLValueAggregator.java           |  1 +
 .../function/AggregationFunctionFactory.java       |  8 +--
 .../DistinctCountHLLAggregationFunction.java       | 52 +++++++++++-------
 .../DistinctCountHLLMVAggregationFunction.java     | 27 +++++-----
 .../DistinctCountRawHLLAggregationFunction.java    |  5 +-
 .../DistinctCountRawHLLMVAggregationFunction.java  |  5 +-
 .../tests/OfflineClusterIntegrationTest.java       | 28 ++++++++++
 12 files changed, 163 insertions(+), 41 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
index 822b37e..5bee98b 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
@@ -191,11 +191,19 @@ public class HelixBrokerStarter implements 
ServiceStartable {
         new 
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(_clusterName).build();
     Map<String, String> configMap = configAccessor.get(helixConfigScope, Arrays
         .asList(Helix.ENABLE_CASE_INSENSITIVE_KEY, 
Helix.DEPRECATED_ENABLE_CASE_INSENSITIVE_KEY,
-            Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE));
+            Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE, 
Helix.CONFIG_OF_DEFAULT_HYPERLOGLOG_LOG2M));
     if (Boolean.parseBoolean(configMap.get(Helix.ENABLE_CASE_INSENSITIVE_KEY)) 
|| Boolean
         
.parseBoolean(configMap.get(Helix.DEPRECATED_ENABLE_CASE_INSENSITIVE_KEY))) {
       _brokerConf.setProperty(Helix.ENABLE_CASE_INSENSITIVE_KEY, true);
     }
+    String log2mStr = configMap.get(Helix.CONFIG_OF_DEFAULT_HYPERLOGLOG_LOG2M);
+    if (log2mStr != null) {
+      try {
+        _brokerConf.setProperty(Helix.CONFIG_OF_DEFAULT_HYPERLOGLOG_LOG2M, 
Integer.parseInt(log2mStr));
+      } catch (NumberFormatException e) {
+        LOGGER.warn("Unable to set Log2M value {} for DistinctCountHLL 
function. {}", log2mStr, e);
+      }
+    }
     if 
(Boolean.parseBoolean(configMap.get(Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE)))
 {
       _brokerConf.setProperty(Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE, 
true);
     }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index bda87f2..584686e 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -62,6 +62,7 @@ import org.apache.pinot.common.request.FilterOperator;
 import org.apache.pinot.common.request.FilterQuery;
 import org.apache.pinot.common.request.FilterQueryMap;
 import org.apache.pinot.common.request.Identifier;
+import org.apache.pinot.common.request.Function;
 import org.apache.pinot.common.request.Literal;
 import org.apache.pinot.common.request.PinotQuery;
 import org.apache.pinot.common.request.Selection;
@@ -74,6 +75,7 @@ import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.CommonConstants.Broker;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.helix.TableCache;
+import org.apache.pinot.common.utils.request.RequestUtils;
 import 
org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
 import org.apache.pinot.core.query.reduce.BrokerReduceService;
 import org.apache.pinot.core.transport.ServerInstance;
@@ -112,6 +114,7 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
 
   private final boolean _enableCaseInsensitive;
   private final boolean _enableQueryLimitOverride;
+  private final int _hllLog2mOverride;
   private final TableCache _tableCache;
 
   public BaseBrokerRequestHandler(Configuration config, RoutingManager 
routingManager,
@@ -129,6 +132,13 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
     } else {
       _tableCache = null;
     }
+    if 
(_config.containsKey(CommonConstants.Helix.CONFIG_OF_DEFAULT_HYPERLOGLOG_LOG2M))
 {
+      _hllLog2mOverride = 
_config.getInt(CommonConstants.Helix.CONFIG_OF_DEFAULT_HYPERLOGLOG_LOG2M,
+          CommonConstants.Helix.DEFAULT_HYPERLOGLOG_LOG2M);
+    } else {
+      _hllLog2mOverride = -1;
+    }
+
 
     _enableQueryLimitOverride = 
_config.getBoolean(Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE, false);
 
@@ -200,6 +210,9 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
         LOGGER.warn("Caught exception while rewriting PQL to make it 
case-insensitive {}: {}, {}", requestId, query, e);
       }
     }
+    if (_hllLog2mOverride > 0) {
+      handleHyperloglogLog2mOverride(brokerRequest, _hllLog2mOverride);
+    }
     if (_enableQueryLimitOverride) {
       handleQueryLimitOverride(brokerRequest, _queryResponseLimit);
     }
@@ -426,6 +439,55 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
   }
 
   /**
+   * Set Log2m value for DistinctCountHLL Function
+   * @param brokerRequest
+   * @param hllLog2mOverride
+   */
+  static void handleHyperloglogLog2mOverride(BrokerRequest brokerRequest, int 
hllLog2mOverride) {
+    if (brokerRequest.getAggregationsInfo() != null) {
+      for (AggregationInfo aggregationInfo : 
brokerRequest.getAggregationsInfo()) {
+        switch 
(AggregationFunctionType.valueOf(aggregationInfo.getAggregationType().toUpperCase()))
 {
+          case DISTINCTCOUNTHLL:
+          case DISTINCTCOUNTHLLMV:
+          case DISTINCTCOUNTRAWHLL:
+          case DISTINCTCOUNTRAWHLLMV:
+            if (aggregationInfo.getExpressionsSize() == 1) {
+              
aggregationInfo.addToExpressions(Integer.toString(hllLog2mOverride));
+            }
+        }
+      }
+    }
+    if (brokerRequest.getPinotQuery() != null && brokerRequest.getSelections() 
!= null) {
+      for (Expression expr : brokerRequest.getPinotQuery().getSelectList()) {
+        updateDistinctCountHllExpr(expr, hllLog2mOverride);
+      }
+    }
+  }
+
+  private static void updateDistinctCountHllExpr(Expression expr, int 
hllLog2mOverride) {
+    if (expr == null || expr.getFunctionCall() == null) {
+      return;
+    }
+    Function functionCall = expr.getFunctionCall();
+    String funcName = functionCall.getOperator().toUpperCase();
+    switch (AggregationFunctionType.valueOf(funcName)) {
+      case DISTINCTCOUNTHLL:
+      case DISTINCTCOUNTHLLMV:
+      case DISTINCTCOUNTRAWHLL:
+      case DISTINCTCOUNTRAWHLLMV:
+        if (functionCall.getOperandsSize() == 1) {
+          
functionCall.addToOperands(RequestUtils.getLiteralExpression(hllLog2mOverride));
+        }
+        return;
+    }
+    if (functionCall.getOperandsSize() > 0) {
+      for (Expression operand : functionCall.getOperands()) {
+        updateDistinctCountHllExpr(operand, hllLog2mOverride);
+      }
+    }
+  }
+
+  /**
    * Reset limit for selection query if it exceeds maxQuerySelectionLimit.
    * @param brokerRequest
    * @param queryLimit
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index 8ed27dd..0a06f6d 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -44,6 +44,9 @@ public class CommonConstants {
     public static final String ENABLE_CASE_INSENSITIVE_KEY = 
"enable.case.insensitive";
     public static final String DEPRECATED_ENABLE_CASE_INSENSITIVE_KEY = 
"enable.case.insensitive.pql";
 
+    public static final String CONFIG_OF_DEFAULT_HYPERLOGLOG_LOG2M = 
"default.hyperloglog.log2m";
+    public static final int DEFAULT_HYPERLOGLOG_LOG2M = 8;
+
     // More information on why these numbers are set can be found in the 
following doc:
     // 
https://cwiki.apache.org/confluence/display/PINOT/Controller+Separation+between+Helix+and+Pinot
     public static final int NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE = 
24;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
index 3bb34f3..1030dc6 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
@@ -78,6 +78,7 @@ public class HelixSetupUtils {
       Map<String, String> configMap = new HashMap<>();
       configMap.put(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, 
Boolean.toString(true));
       configMap.put(ENABLE_CASE_INSENSITIVE_KEY, Boolean.toString(false));
+      configMap.put(CONFIG_OF_DEFAULT_HYPERLOGLOG_LOG2M, 
Integer.toString(DEFAULT_HYPERLOGLOG_LOG2M));
       
configMap.put(CommonConstants.Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE, 
Boolean.toString(false));
       admin.setConfig(configScope, configMap);
       LOGGER.info("New Helix cluster: {} created", helixClusterName);
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
index b9a3fa8..9592518 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
@@ -185,6 +185,8 @@ public abstract class ControllerTest {
     }
     //enable case insensitive pql for test cases.
     configAccessor.set(scope, 
CommonConstants.Helix.ENABLE_CASE_INSENSITIVE_KEY, Boolean.toString(true));
+    //Set hyperloglog log2m value to 12.
+    configAccessor.set(scope, 
CommonConstants.Helix.CONFIG_OF_DEFAULT_HYPERLOGLOG_LOG2M, 
Integer.toString(12));
   }
 
   protected ControllerStarter getControllerStarter(ControllerConf config) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/DistinctCountHLLValueAggregator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/DistinctCountHLLValueAggregator.java
index d4392b2..c85b455 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/DistinctCountHLLValueAggregator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/DistinctCountHLLValueAggregator.java
@@ -51,6 +51,7 @@ public class DistinctCountHLLValueAggregator implements 
ValueAggregator<Object,
       initialValue = deserializeAggregatedValue(bytes);
       _maxByteSize = Math.max(_maxByteSize, bytes.length);
     } else {
+      // TODO: Handle configurable log2m for StarTreeBuilder
       initialValue = new 
HyperLogLog(DistinctCountHLLAggregationFunction.DEFAULT_LOG2M);
       initialValue.offer(rawValue);
       _maxByteSize = Math.max(_maxByteSize, DEFAULT_LOG2M_BYTE_SIZE);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index 496b50e..f10b240 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -130,9 +130,9 @@ public class AggregationFunctionFactory {
           case DISTINCTCOUNT:
             return new DistinctCountAggregationFunction(column);
           case DISTINCTCOUNTHLL:
-            return new DistinctCountHLLAggregationFunction(column);
+            return new DistinctCountHLLAggregationFunction(arguments);
           case DISTINCTCOUNTRAWHLL:
-            return new DistinctCountRawHLLAggregationFunction(column);
+            return new DistinctCountRawHLLAggregationFunction(arguments);
           case FASTHLL:
             return new FastHLLAggregationFunction(column);
           case DISTINCTCOUNTTHETASKETCH:
@@ -154,9 +154,9 @@ public class AggregationFunctionFactory {
           case DISTINCTCOUNTMV:
             return new DistinctCountMVAggregationFunction(column);
           case DISTINCTCOUNTHLLMV:
-            return new DistinctCountHLLMVAggregationFunction(column);
+            return new DistinctCountHLLMVAggregationFunction(arguments);
           case DISTINCTCOUNTRAWHLLMV:
-            return new DistinctCountRawHLLMVAggregationFunction(column);
+            return new DistinctCountRawHLLMVAggregationFunction(arguments);
           case DISTINCT:
             Preconditions.checkState(brokerRequest != null,
                 "Broker request must be provided for 'DISTINCT' aggregation 
function");
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLAggregationFunction.java
index 88beff9..6979153 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLAggregationFunction.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.query.aggregation.function;
 
 import com.clearspring.analytics.stream.cardinality.HyperLogLog;
 import com.google.common.base.Preconditions;
+import java.util.List;
 import java.util.Map;
 import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.common.request.transform.TransformExpressionTree;
@@ -35,9 +36,20 @@ import org.apache.pinot.spi.data.FieldSpec.DataType;
 
 public class DistinctCountHLLAggregationFunction extends 
BaseSingleInputAggregationFunction<HyperLogLog, Long> {
   public static final int DEFAULT_LOG2M = 8;
+  protected final int _log2M;
 
-  public DistinctCountHLLAggregationFunction(String column) {
-    super(column);
+  public DistinctCountHLLAggregationFunction(List<String> arguments) {
+    super(arguments.get(0));
+    int numExpressions = arguments.size();
+    // This function expects 1 or 2 arguments.
+    Preconditions
+        .checkArgument(numExpressions <= 2 && numExpressions >= 1, 
"DistinctCountHLL expects 1 or 2 arguments, got: ",
+            numExpressions);
+    if (arguments.size() == 2) {
+      _log2M = Integer.valueOf(arguments.get(1).replace("'", ""));
+    } else {
+      _log2M = DEFAULT_LOG2M;
+    }
   }
 
   @Override
@@ -67,7 +79,7 @@ public class DistinctCountHLLAggregationFunction extends 
BaseSingleInputAggregat
     DataType valueType = blockValSet.getValueType();
 
     if (valueType != DataType.BYTES) {
-      HyperLogLog hyperLogLog = getDefaultHyperLogLog(aggregationResultHolder);
+      HyperLogLog hyperLogLog = getDefaultHyperLogLog(aggregationResultHolder, 
_log2M);
       switch (valueType) {
         case INT:
           int[] intValues = blockValSet.getIntValuesSV();
@@ -135,31 +147,31 @@ public class DistinctCountHLLAggregationFunction extends 
BaseSingleInputAggregat
       case INT:
         int[] intValues = blockValSet.getIntValuesSV();
         for (int i = 0; i < length; i++) {
-          getDefaultHyperLogLog(groupByResultHolder, 
groupKeyArray[i]).offer(intValues[i]);
+          getDefaultHyperLogLog(groupByResultHolder, groupKeyArray[i], 
_log2M).offer(intValues[i]);
         }
         break;
       case LONG:
         long[] longValues = blockValSet.getLongValuesSV();
         for (int i = 0; i < length; i++) {
-          getDefaultHyperLogLog(groupByResultHolder, 
groupKeyArray[i]).offer(longValues[i]);
+          getDefaultHyperLogLog(groupByResultHolder, groupKeyArray[i], 
_log2M).offer(longValues[i]);
         }
         break;
       case FLOAT:
         float[] floatValues = blockValSet.getFloatValuesSV();
         for (int i = 0; i < length; i++) {
-          getDefaultHyperLogLog(groupByResultHolder, 
groupKeyArray[i]).offer(floatValues[i]);
+          getDefaultHyperLogLog(groupByResultHolder, groupKeyArray[i], 
_log2M).offer(floatValues[i]);
         }
         break;
       case DOUBLE:
         double[] doubleValues = blockValSet.getDoubleValuesSV();
         for (int i = 0; i < length; i++) {
-          getDefaultHyperLogLog(groupByResultHolder, 
groupKeyArray[i]).offer(doubleValues[i]);
+          getDefaultHyperLogLog(groupByResultHolder, groupKeyArray[i], 
_log2M).offer(doubleValues[i]);
         }
         break;
       case STRING:
         String[] stringValues = blockValSet.getStringValuesSV();
         for (int i = 0; i < length; i++) {
-          getDefaultHyperLogLog(groupByResultHolder, 
groupKeyArray[i]).offer(stringValues[i]);
+          getDefaultHyperLogLog(groupByResultHolder, groupKeyArray[i], 
_log2M).offer(stringValues[i]);
         }
         break;
       case BYTES:
@@ -197,7 +209,7 @@ public class DistinctCountHLLAggregationFunction extends 
BaseSingleInputAggregat
         for (int i = 0; i < length; i++) {
           int value = intValues[i];
           for (int groupKey : groupKeysArray[i]) {
-            getDefaultHyperLogLog(groupByResultHolder, groupKey).offer(value);
+            getDefaultHyperLogLog(groupByResultHolder, groupKey, 
_log2M).offer(value);
           }
         }
         break;
@@ -206,7 +218,7 @@ public class DistinctCountHLLAggregationFunction extends 
BaseSingleInputAggregat
         for (int i = 0; i < length; i++) {
           long value = longValues[i];
           for (int groupKey : groupKeysArray[i]) {
-            getDefaultHyperLogLog(groupByResultHolder, groupKey).offer(value);
+            getDefaultHyperLogLog(groupByResultHolder, groupKey, 
_log2M).offer(value);
           }
         }
         break;
@@ -215,7 +227,7 @@ public class DistinctCountHLLAggregationFunction extends 
BaseSingleInputAggregat
         for (int i = 0; i < length; i++) {
           float value = floatValues[i];
           for (int groupKey : groupKeysArray[i]) {
-            getDefaultHyperLogLog(groupByResultHolder, groupKey).offer(value);
+            getDefaultHyperLogLog(groupByResultHolder, groupKey, 
_log2M).offer(value);
           }
         }
         break;
@@ -224,7 +236,7 @@ public class DistinctCountHLLAggregationFunction extends 
BaseSingleInputAggregat
         for (int i = 0; i < length; i++) {
           double value = doubleValues[i];
           for (int groupKey : groupKeysArray[i]) {
-            getDefaultHyperLogLog(groupByResultHolder, groupKey).offer(value);
+            getDefaultHyperLogLog(groupByResultHolder, groupKey, 
_log2M).offer(value);
           }
         }
         break;
@@ -233,7 +245,7 @@ public class DistinctCountHLLAggregationFunction extends 
BaseSingleInputAggregat
         for (int i = 0; i < length; i++) {
           String value = stringValues[i];
           for (int groupKey : groupKeysArray[i]) {
-            getDefaultHyperLogLog(groupByResultHolder, groupKey).offer(value);
+            getDefaultHyperLogLog(groupByResultHolder, groupKey, 
_log2M).offer(value);
           }
         }
         break;
@@ -267,7 +279,7 @@ public class DistinctCountHLLAggregationFunction extends 
BaseSingleInputAggregat
   public HyperLogLog extractAggregationResult(AggregationResultHolder 
aggregationResultHolder) {
     HyperLogLog hyperLogLog = aggregationResultHolder.getResult();
     if (hyperLogLog == null) {
-      return new HyperLogLog(DEFAULT_LOG2M);
+      return new HyperLogLog(_log2M);
     } else {
       return hyperLogLog;
     }
@@ -277,7 +289,7 @@ public class DistinctCountHLLAggregationFunction extends 
BaseSingleInputAggregat
   public HyperLogLog extractGroupByResult(GroupByResultHolder 
groupByResultHolder, int groupKey) {
     HyperLogLog hyperLogLog = groupByResultHolder.getResult(groupKey);
     if (hyperLogLog == null) {
-      return new HyperLogLog(DEFAULT_LOG2M);
+      return new HyperLogLog(_log2M);
     } else {
       return hyperLogLog;
     }
@@ -327,12 +339,13 @@ public class DistinctCountHLLAggregationFunction extends 
BaseSingleInputAggregat
    * Returns the HyperLogLog from the result holder or creates a new one with 
default log2m if it does not exist.
    *
    * @param aggregationResultHolder Result holder
+   * @param log2m
    * @return HyperLogLog from the result holder
    */
-  protected static HyperLogLog getDefaultHyperLogLog(AggregationResultHolder 
aggregationResultHolder) {
+  protected static HyperLogLog getDefaultHyperLogLog(AggregationResultHolder 
aggregationResultHolder, int log2m) {
     HyperLogLog hyperLogLog = aggregationResultHolder.getResult();
     if (hyperLogLog == null) {
-      hyperLogLog = new HyperLogLog(DEFAULT_LOG2M);
+      hyperLogLog = new HyperLogLog(log2m);
       aggregationResultHolder.setValue(hyperLogLog);
     }
     return hyperLogLog;
@@ -343,12 +356,13 @@ public class DistinctCountHLLAggregationFunction extends 
BaseSingleInputAggregat
    *
    * @param groupByResultHolder Result holder
    * @param groupKey Group key for which to return the HyperLogLog
+   * @param log2m
    * @return HyperLogLog for the group key
    */
-  protected static HyperLogLog getDefaultHyperLogLog(GroupByResultHolder 
groupByResultHolder, int groupKey) {
+  protected static HyperLogLog getDefaultHyperLogLog(GroupByResultHolder 
groupByResultHolder, int groupKey, int log2m) {
     HyperLogLog hyperLogLog = groupByResultHolder.getResult(groupKey);
     if (hyperLogLog == null) {
-      hyperLogLog = new HyperLogLog(DEFAULT_LOG2M);
+      hyperLogLog = new HyperLogLog(log2m);
       groupByResultHolder.setValueForKey(groupKey, hyperLogLog);
     }
     return hyperLogLog;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLMVAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLMVAggregationFunction.java
index 1ff547c..1c6e80d 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLMVAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLMVAggregationFunction.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.query.aggregation.function;
 
 import com.clearspring.analytics.stream.cardinality.HyperLogLog;
+import java.util.List;
 import java.util.Map;
 import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.common.request.transform.TransformExpressionTree;
@@ -30,8 +31,8 @@ import org.apache.pinot.spi.data.FieldSpec.DataType;
 
 public class DistinctCountHLLMVAggregationFunction extends 
DistinctCountHLLAggregationFunction {
 
-  public DistinctCountHLLMVAggregationFunction(String column) {
-    super(column);
+  public DistinctCountHLLMVAggregationFunction(List<String> arguments) {
+    super(arguments);
   }
 
   @Override
@@ -47,7 +48,7 @@ public class DistinctCountHLLMVAggregationFunction extends 
DistinctCountHLLAggre
   @Override
   public void aggregate(int length, AggregationResultHolder 
aggregationResultHolder,
       Map<TransformExpressionTree, BlockValSet> blockValSetMap) {
-    HyperLogLog hyperLogLog = getDefaultHyperLogLog(aggregationResultHolder);
+    HyperLogLog hyperLogLog = getDefaultHyperLogLog(aggregationResultHolder, 
_log2M);
 
     BlockValSet blockValSet = blockValSetMap.get(_expression);
     DataType valueType = blockValSet.getValueType();
@@ -108,7 +109,7 @@ public class DistinctCountHLLMVAggregationFunction extends 
DistinctCountHLLAggre
       case INT:
         int[][] intValuesArray = blockValSet.getIntValuesMV();
         for (int i = 0; i < length; i++) {
-          HyperLogLog hyperLogLog = getDefaultHyperLogLog(groupByResultHolder, 
groupKeyArray[i]);
+          HyperLogLog hyperLogLog = getDefaultHyperLogLog(groupByResultHolder, 
groupKeyArray[i], _log2M);
           for (int value : intValuesArray[i]) {
             hyperLogLog.offer(value);
           }
@@ -117,7 +118,7 @@ public class DistinctCountHLLMVAggregationFunction extends 
DistinctCountHLLAggre
       case LONG:
         long[][] longValuesArray = blockValSet.getLongValuesMV();
         for (int i = 0; i < length; i++) {
-          HyperLogLog hyperLogLog = getDefaultHyperLogLog(groupByResultHolder, 
groupKeyArray[i]);
+          HyperLogLog hyperLogLog = getDefaultHyperLogLog(groupByResultHolder, 
groupKeyArray[i], _log2M);
           for (long value : longValuesArray[i]) {
             hyperLogLog.offer(value);
           }
@@ -126,7 +127,7 @@ public class DistinctCountHLLMVAggregationFunction extends 
DistinctCountHLLAggre
       case FLOAT:
         float[][] floatValuesArray = blockValSet.getFloatValuesMV();
         for (int i = 0; i < length; i++) {
-          HyperLogLog hyperLogLog = getDefaultHyperLogLog(groupByResultHolder, 
groupKeyArray[i]);
+          HyperLogLog hyperLogLog = getDefaultHyperLogLog(groupByResultHolder, 
groupKeyArray[i], _log2M);
           for (float value : floatValuesArray[i]) {
             hyperLogLog.offer(value);
           }
@@ -135,7 +136,7 @@ public class DistinctCountHLLMVAggregationFunction extends 
DistinctCountHLLAggre
       case DOUBLE:
         double[][] doubleValuesArray = blockValSet.getDoubleValuesMV();
         for (int i = 0; i < length; i++) {
-          HyperLogLog hyperLogLog = getDefaultHyperLogLog(groupByResultHolder, 
groupKeyArray[i]);
+          HyperLogLog hyperLogLog = getDefaultHyperLogLog(groupByResultHolder, 
groupKeyArray[i], _log2M);
           for (double value : doubleValuesArray[i]) {
             hyperLogLog.offer(value);
           }
@@ -144,7 +145,7 @@ public class DistinctCountHLLMVAggregationFunction extends 
DistinctCountHLLAggre
       case STRING:
         String[][] stringValuesArray = blockValSet.getStringValuesMV();
         for (int i = 0; i < length; i++) {
-          HyperLogLog hyperLogLog = getDefaultHyperLogLog(groupByResultHolder, 
groupKeyArray[i]);
+          HyperLogLog hyperLogLog = getDefaultHyperLogLog(groupByResultHolder, 
groupKeyArray[i], _log2M);
           for (String value : stringValuesArray[i]) {
             hyperLogLog.offer(value);
           }
@@ -168,7 +169,7 @@ public class DistinctCountHLLMVAggregationFunction extends 
DistinctCountHLLAggre
         for (int i = 0; i < length; i++) {
           int[] intValues = intValuesArray[i];
           for (int groupKey : groupKeysArray[i]) {
-            HyperLogLog hyperLogLog = 
getDefaultHyperLogLog(groupByResultHolder, groupKey);
+            HyperLogLog hyperLogLog = 
getDefaultHyperLogLog(groupByResultHolder, groupKey, _log2M);
             for (int value : intValues) {
               hyperLogLog.offer(value);
             }
@@ -180,7 +181,7 @@ public class DistinctCountHLLMVAggregationFunction extends 
DistinctCountHLLAggre
         for (int i = 0; i < length; i++) {
           long[] longValues = longValuesArray[i];
           for (int groupKey : groupKeysArray[i]) {
-            HyperLogLog hyperLogLog = 
getDefaultHyperLogLog(groupByResultHolder, groupKey);
+            HyperLogLog hyperLogLog = 
getDefaultHyperLogLog(groupByResultHolder, groupKey, _log2M);
             for (long value : longValues) {
               hyperLogLog.offer(value);
             }
@@ -192,7 +193,7 @@ public class DistinctCountHLLMVAggregationFunction extends 
DistinctCountHLLAggre
         for (int i = 0; i < length; i++) {
           float[] floatValues = floatValuesArray[i];
           for (int groupKey : groupKeysArray[i]) {
-            HyperLogLog hyperLogLog = 
getDefaultHyperLogLog(groupByResultHolder, groupKey);
+            HyperLogLog hyperLogLog = 
getDefaultHyperLogLog(groupByResultHolder, groupKey, _log2M);
             for (float value : floatValues) {
               hyperLogLog.offer(value);
             }
@@ -204,7 +205,7 @@ public class DistinctCountHLLMVAggregationFunction extends 
DistinctCountHLLAggre
         for (int i = 0; i < length; i++) {
           double[] doubleValues = doubleValuesArray[i];
           for (int groupKey : groupKeysArray[i]) {
-            HyperLogLog hyperLogLog = 
getDefaultHyperLogLog(groupByResultHolder, groupKey);
+            HyperLogLog hyperLogLog = 
getDefaultHyperLogLog(groupByResultHolder, groupKey, _log2M);
             for (double value : doubleValues) {
               hyperLogLog.offer(value);
             }
@@ -216,7 +217,7 @@ public class DistinctCountHLLMVAggregationFunction extends 
DistinctCountHLLAggre
         for (int i = 0; i < length; i++) {
           String[] stringValues = stringValuesArray[i];
           for (int groupKey : groupKeysArray[i]) {
-            HyperLogLog hyperLogLog = 
getDefaultHyperLogLog(groupByResultHolder, groupKey);
+            HyperLogLog hyperLogLog = 
getDefaultHyperLogLog(groupByResultHolder, groupKey, _log2M);
             for (String value : stringValues) {
               hyperLogLog.offer(value);
             }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawHLLAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawHLLAggregationFunction.java
index 40050f6..9444948 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawHLLAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawHLLAggregationFunction.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.query.aggregation.function;
 
 import com.clearspring.analytics.stream.cardinality.HyperLogLog;
+import java.util.List;
 import java.util.Map;
 import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.common.request.transform.TransformExpressionTree;
@@ -32,8 +33,8 @@ import 
org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
 public class DistinctCountRawHLLAggregationFunction extends 
BaseSingleInputAggregationFunction<HyperLogLog, SerializedHLL> {
   private final DistinctCountHLLAggregationFunction 
_distinctCountHLLAggregationFunction;
 
-  public DistinctCountRawHLLAggregationFunction(String column) {
-    this(column, new DistinctCountHLLAggregationFunction(column));
+  public DistinctCountRawHLLAggregationFunction(List<String> arguments) {
+    this(arguments.get(0), new DistinctCountHLLAggregationFunction(arguments));
   }
 
   DistinctCountRawHLLAggregationFunction(String column,
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawHLLMVAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawHLLMVAggregationFunction.java
index 3712599..8365db9 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawHLLMVAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawHLLMVAggregationFunction.java
@@ -18,13 +18,14 @@
  */
 package org.apache.pinot.core.query.aggregation.function;
 
+import java.util.List;
 import org.apache.pinot.common.function.AggregationFunctionType;
 
 
 public class DistinctCountRawHLLMVAggregationFunction extends 
DistinctCountRawHLLAggregationFunction {
 
-  public DistinctCountRawHLLMVAggregationFunction(String column) {
-    super(column, new DistinctCountHLLMVAggregationFunction(column));
+  public DistinctCountRawHLLMVAggregationFunction(List<String> arguments) {
+    super(arguments.get(0), new 
DistinctCountHLLMVAggregationFunction(arguments));
   }
 
   @Override
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index fc06315..fa75eb2 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -1070,4 +1070,32 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
       return true;
     }, 10_000L, "Failed to get results for case-insensitive queries");
   }
+
+  @Test
+  public void testDistinctCountHll()
+      throws Exception {
+    String query;
+
+    // The Accurate value is 6538.
+    query = "SELECT distinctCount(FlightNum) FROM mytable ";
+    
assertEquals(postQuery(query).get("aggregationResults").get(0).get("value").asLong(),
 6538);
+    assertEquals(postSqlQuery(query, 
_brokerBaseApiUrl).get("resultTable").get("rows").get(0).get(0).asLong(), 6538);
+
+    // Expected distinctCountHll with different log2m value from 2 to 19. The 
Accurate value is 6538.
+    long[] expectedResults =
+        new long[]{3504, 6347, 8877, 9729, 9046, 7672, 7538, 6993, 6649, 6651, 
6553, 6525, 6459, 6523, 6532, 6544, 6538, 6539};
+
+    for (int i = 2; i < 20; i++) {
+      query = String.format("SELECT distinctCountHLL(FlightNum, %d) FROM 
mytable ", i);
+      
assertEquals(postQuery(query).get("aggregationResults").get(0).get("value").asLong(),
 expectedResults[i - 2]);
+      assertEquals(postSqlQuery(query, 
_brokerBaseApiUrl).get("resultTable").get("rows").get(0).get(0).asLong(),
+          expectedResults[i - 2]);
+    }
+
+    // Default HLL is set as log2m=12
+    query = "SELECT distinctCountHLL(FlightNum) FROM mytable ";
+    
assertEquals(postQuery(query).get("aggregationResults").get(0).get("value").asLong(),
 expectedResults[10]);
+    assertEquals(postSqlQuery(query, 
_brokerBaseApiUrl).get("resultTable").get("rows").get(0).get(0).asLong(),
+        expectedResults[10]);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to