Jackie-Jiang commented on a change in pull request #5564:
URL: https://github.com/apache/incubator-pinot/pull/5564#discussion_r440342788



##########
File path: 
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
##########
@@ -425,6 +437,58 @@ public BrokerResponse handleRequest(JsonNode request, 
@Nullable RequesterIdentit
     return brokerResponse;
   }
 
+  /**
+   * Set Log2m value for DistinctCountHLL Function
+   * @param brokerRequest
+   * @param hllLog2mOverride
+   */
+  static void handleHyperloglogLog2mOverride(BrokerRequest brokerRequest, int 
hllLog2mOverride) {
+    if (brokerRequest.getAggregationsInfo() != null) {
+      for (AggregationInfo aggregationInfo : 
brokerRequest.getAggregationsInfo()) {
+        switch 
(AggregationFunctionType.valueOf(aggregationInfo.getAggregationType().toUpperCase()))
 {
+          case DISTINCTCOUNTHLL:
+          case DISTINCTCOUNTHLLMV:
+          case DISTINCTCOUNTRAWHLL:
+          case DISTINCTCOUNTRAWHLLMV:
+            if (aggregationInfo.getExpressionsSize() == 1) {
+              
aggregationInfo.addToExpressions(Integer.toString(hllLog2mOverride));
+            }
+        }
+      }
+    }
+    if (brokerRequest.getPinotQuery() != null && brokerRequest.getSelections() 
!= null) {

Review comment:
       Why do we need `brokerRequest.getSelections() != null` here? It will 
never be true for aggregation right?

##########
File path: 
pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
##########
@@ -191,11 +191,19 @@ public void start()
         new 
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(_clusterName).build();
     Map<String, String> configMap = configAccessor.get(helixConfigScope, Arrays
         .asList(Helix.ENABLE_CASE_INSENSITIVE_KEY, 
Helix.DEPRECATED_ENABLE_CASE_INSENSITIVE_KEY,
-            Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE));
+            Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE, 
Helix.CONFIG_OF_DEFAULT_HYPERLOGLOG_LOG2M));
     if (Boolean.parseBoolean(configMap.get(Helix.ENABLE_CASE_INSENSITIVE_KEY)) 
|| Boolean
         
.parseBoolean(configMap.get(Helix.DEPRECATED_ENABLE_CASE_INSENSITIVE_KEY))) {
       _brokerConf.setProperty(Helix.ENABLE_CASE_INSENSITIVE_KEY, true);
     }
+    String log2mStr = configMap.get(Helix.CONFIG_OF_DEFAULT_HYPERLOGLOG_LOG2M);
+    if (log2mStr != null) {
+      try {
+        _brokerConf.setProperty(Helix.CONFIG_OF_DEFAULT_HYPERLOGLOG_LOG2M, 
Integer.parseInt(log2mStr));
+      } catch (NumberFormatException e) {
+        LOGGER.warn("Unable to set Log2M value {} for DistinctCountHLL 
function. {}", log2mStr, e);

Review comment:
       ```suggestion
           LOGGER.warn("Invalid config of '{}': '{}', using: {} as the default 
log2m for HyperLogLog", Helix.CONFIG_OF_DEFAULT_HYPERLOGLOG_LOG2M, log2mStr, 
CommonConstants.Helix.DEFAULT_HYPERLOGLOG_LOG2M);
   ```

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLAggregationFunction.java
##########
@@ -35,9 +36,20 @@
 
 public class DistinctCountHLLAggregationFunction extends 
BaseSingleInputAggregationFunction<HyperLogLog, Long> {
   public static final int DEFAULT_LOG2M = 8;
+  protected final int _log2M;

Review comment:
       (nit) Rename to `_log2m`?

##########
File path: 
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
##########
@@ -425,6 +437,58 @@ public BrokerResponse handleRequest(JsonNode request, 
@Nullable RequesterIdentit
     return brokerResponse;
   }
 
+  /**
+   * Set Log2m value for DistinctCountHLL Function
+   * @param brokerRequest
+   * @param hllLog2mOverride
+   */
+  static void handleHyperloglogLog2mOverride(BrokerRequest brokerRequest, int 
hllLog2mOverride) {
+    if (brokerRequest.getAggregationsInfo() != null) {
+      for (AggregationInfo aggregationInfo : 
brokerRequest.getAggregationsInfo()) {
+        switch 
(AggregationFunctionType.valueOf(aggregationInfo.getAggregationType().toUpperCase()))
 {
+          case DISTINCTCOUNTHLL:
+          case DISTINCTCOUNTHLLMV:
+          case DISTINCTCOUNTRAWHLL:
+          case DISTINCTCOUNTRAWHLLMV:
+            if (aggregationInfo.getExpressionsSize() == 1) {
+              
aggregationInfo.addToExpressions(Integer.toString(hllLog2mOverride));
+            }
+        }
+      }
+    }
+    if (brokerRequest.getPinotQuery() != null && brokerRequest.getSelections() 
!= null) {
+      for (Expression expr : brokerRequest.getPinotQuery().getSelectList()) {
+        updateDistinctCountHllExpr(expr, hllLog2mOverride);
+      }
+    }
+  }
+
+  private static void updateDistinctCountHllExpr(Expression expr, int 
hllLog2mOverride) {
+    if (expr == null || expr.getFunctionCall() == null) {
+      return;
+    }
+    Function functionCall = expr.getFunctionCall();

Review comment:
       `expr` can never be `null` here.
   ```suggestion
       Function functionCall = expr.getFunctionCall();
       if (functionCall == null) {
         return;
       }
   ```

##########
File path: 
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
##########
@@ -425,6 +437,58 @@ public BrokerResponse handleRequest(JsonNode request, 
@Nullable RequesterIdentit
     return brokerResponse;
   }
 
+  /**
+   * Set Log2m value for DistinctCountHLL Function
+   * @param brokerRequest
+   * @param hllLog2mOverride
+   */
+  static void handleHyperloglogLog2mOverride(BrokerRequest brokerRequest, int 
hllLog2mOverride) {
+    if (brokerRequest.getAggregationsInfo() != null) {
+      for (AggregationInfo aggregationInfo : 
brokerRequest.getAggregationsInfo()) {
+        switch 
(AggregationFunctionType.valueOf(aggregationInfo.getAggregationType().toUpperCase()))
 {

Review comment:
       Please handle exception here, or switch on String instead of 
AggregationFunctionType

##########
File path: 
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
##########
@@ -112,6 +114,7 @@
 
   private final boolean _enableCaseInsensitive;
   private final boolean _enableQueryLimitOverride;
+  private final int _hllLog2mOverride;

Review comment:
       Rename to `_defaultHllLog2m`?

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLMVAggregationFunction.java
##########
@@ -47,7 +48,7 @@ public void accept(AggregationFunctionVisitorBase visitor) {
   @Override
   public void aggregate(int length, AggregationResultHolder 
aggregationResultHolder,
       Map<TransformExpressionTree, BlockValSet> blockValSetMap) {
-    HyperLogLog hyperLogLog = getDefaultHyperLogLog(aggregationResultHolder);
+    HyperLogLog hyperLogLog = getDefaultHyperLogLog(aggregationResultHolder, 
_log2M);

Review comment:
       Same here, don't pass `_log2M`

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLAggregationFunction.java
##########
@@ -35,9 +36,20 @@
 
 public class DistinctCountHLLAggregationFunction extends 
BaseSingleInputAggregationFunction<HyperLogLog, Long> {
   public static final int DEFAULT_LOG2M = 8;
+  protected final int _log2M;
 
-  public DistinctCountHLLAggregationFunction(String column) {
-    super(column);
+  public DistinctCountHLLAggregationFunction(List<String> arguments) {
+    super(arguments.get(0));
+    int numExpressions = arguments.size();
+    // This function expects 1 or 2 arguments.
+    Preconditions
+        .checkArgument(numExpressions <= 2 && numExpressions >= 1, 
"DistinctCountHLL expects 1 or 2 arguments, got: ",
+            numExpressions);
+    if (arguments.size() == 2) {
+      _log2M = Integer.valueOf(arguments.get(1).replace("'", ""));

Review comment:
       No need to replace single quote here

##########
File path: 
pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
##########
@@ -44,6 +44,9 @@
     public static final String ENABLE_CASE_INSENSITIVE_KEY = 
"enable.case.insensitive";
     public static final String DEPRECATED_ENABLE_CASE_INSENSITIVE_KEY = 
"enable.case.insensitive.pql";
 
+    public static final String CONFIG_OF_DEFAULT_HYPERLOGLOG_LOG2M = 
"default.hyperloglog.log2m";

Review comment:
       ```suggestion
       public static final String DEFAULT_HYPERLOGLOG_LOG2M_KEY = 
"default.hyperloglog.log2m";
   ```

##########
File path: 
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
##########
@@ -425,6 +437,58 @@ public BrokerResponse handleRequest(JsonNode request, 
@Nullable RequesterIdentit
     return brokerResponse;
   }
 
+  /**
+   * Set Log2m value for DistinctCountHLL Function
+   * @param brokerRequest
+   * @param hllLog2mOverride
+   */
+  static void handleHyperloglogLog2mOverride(BrokerRequest brokerRequest, int 
hllLog2mOverride) {
+    if (brokerRequest.getAggregationsInfo() != null) {
+      for (AggregationInfo aggregationInfo : 
brokerRequest.getAggregationsInfo()) {
+        switch 
(AggregationFunctionType.valueOf(aggregationInfo.getAggregationType().toUpperCase()))
 {
+          case DISTINCTCOUNTHLL:
+          case DISTINCTCOUNTHLLMV:
+          case DISTINCTCOUNTRAWHLL:
+          case DISTINCTCOUNTRAWHLLMV:
+            if (aggregationInfo.getExpressionsSize() == 1) {
+              
aggregationInfo.addToExpressions(Integer.toString(hllLog2mOverride));
+            }
+        }
+      }
+    }
+    if (brokerRequest.getPinotQuery() != null && brokerRequest.getSelections() 
!= null) {
+      for (Expression expr : brokerRequest.getPinotQuery().getSelectList()) {
+        updateDistinctCountHllExpr(expr, hllLog2mOverride);
+      }
+    }
+  }
+
+  private static void updateDistinctCountHllExpr(Expression expr, int 
hllLog2mOverride) {
+    if (expr == null || expr.getFunctionCall() == null) {
+      return;
+    }
+    Function functionCall = expr.getFunctionCall();
+    try {
+      switch 
(AggregationFunctionType.getAggregationFunctionType(functionCall.getOperator()))
 {
+        case DISTINCTCOUNTHLL:
+        case DISTINCTCOUNTHLLMV:
+        case DISTINCTCOUNTRAWHLL:
+        case DISTINCTCOUNTRAWHLLMV:
+          if (functionCall.getOperandsSize() == 1) {
+            
functionCall.addToOperands(RequestUtils.getLiteralExpression(hllLog2mOverride));
+          }
+          return;
+      }
+    } catch (Exception e) {
+      // Swallow exceptions
+    }
+    if (functionCall.getOperandsSize() > 0) {

Review comment:
       (nit) This check is redundant?

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLAggregationFunction.java
##########
@@ -67,7 +79,7 @@ public void aggregate(int length, AggregationResultHolder 
aggregationResultHolde
     DataType valueType = blockValSet.getValueType();
 
     if (valueType != DataType.BYTES) {
-      HyperLogLog hyperLogLog = getDefaultHyperLogLog(aggregationResultHolder);
+      HyperLogLog hyperLogLog = getDefaultHyperLogLog(aggregationResultHolder, 
_log2M);

Review comment:
       You don't need to pass member variable `_log2M` to 
`getDefaultHyperLogLog()`, just remove the `static` should be good

##########
File path: 
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
##########
@@ -129,6 +132,12 @@ public BaseBrokerRequestHandler(Configuration config, 
RoutingManager routingMana
     } else {
       _tableCache = null;
     }
+    if 
(_config.containsKey(CommonConstants.Helix.CONFIG_OF_DEFAULT_HYPERLOGLOG_LOG2M))
 {
+      _hllLog2mOverride = 
_config.getInt(CommonConstants.Helix.CONFIG_OF_DEFAULT_HYPERLOGLOG_LOG2M,
+          CommonConstants.Helix.DEFAULT_HYPERLOGLOG_LOG2M);
+    } else {
+      _hllLog2mOverride = -1;
+    }

Review comment:
       _defaultHllLog2m = 
   ```suggestion
       _hllLog2mOverride = 
_config.getInt(CommonConstants.Helix.CONFIG_OF_DEFAULT_HYPERLOGLOG_LOG2M,
             CommonConstants.Helix.DEFAULT_HYPERLOGLOG_LOG2M);
   ```

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLAggregationFunction.java
##########
@@ -35,9 +36,20 @@
 
 public class DistinctCountHLLAggregationFunction extends 
BaseSingleInputAggregationFunction<HyperLogLog, Long> {
   public static final int DEFAULT_LOG2M = 8;
+  protected final int _log2M;
 
-  public DistinctCountHLLAggregationFunction(String column) {
-    super(column);
+  public DistinctCountHLLAggregationFunction(List<String> arguments) {
+    super(arguments.get(0));
+    int numExpressions = arguments.size();
+    // This function expects 1 or 2 arguments.
+    Preconditions
+        .checkArgument(numExpressions <= 2 && numExpressions >= 1, 
"DistinctCountHLL expects 1 or 2 arguments, got: ",
+            numExpressions);

Review comment:
       ```suggestion
       Preconditions
           .checkArgument(numExpressions <= 2 && numExpressions >= 1, 
"DistinctCountHLL expects 1 or 2 arguments, got: %s",
               numExpressions);
   ```

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLAggregationFunction.java
##########
@@ -35,9 +36,20 @@
 
 public class DistinctCountHLLAggregationFunction extends 
BaseSingleInputAggregationFunction<HyperLogLog, Long> {
   public static final int DEFAULT_LOG2M = 8;

Review comment:
       Replace this with the one in `CommonConstants`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to