Jackie-Jiang commented on a change in pull request #5564:
URL: https://github.com/apache/incubator-pinot/pull/5564#discussion_r440342788
##########
File path:
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
##########
@@ -425,6 +437,58 @@ public BrokerResponse handleRequest(JsonNode request,
@Nullable RequesterIdentit
return brokerResponse;
}
+ /**
+ * Set Log2m value for DistinctCountHLL Function
+ * @param brokerRequest
+ * @param hllLog2mOverride
+ */
+ static void handleHyperloglogLog2mOverride(BrokerRequest brokerRequest, int
hllLog2mOverride) {
+ if (brokerRequest.getAggregationsInfo() != null) {
+ for (AggregationInfo aggregationInfo :
brokerRequest.getAggregationsInfo()) {
+ switch
(AggregationFunctionType.valueOf(aggregationInfo.getAggregationType().toUpperCase()))
{
+ case DISTINCTCOUNTHLL:
+ case DISTINCTCOUNTHLLMV:
+ case DISTINCTCOUNTRAWHLL:
+ case DISTINCTCOUNTRAWHLLMV:
+ if (aggregationInfo.getExpressionsSize() == 1) {
+
aggregationInfo.addToExpressions(Integer.toString(hllLog2mOverride));
+ }
+ }
+ }
+ }
+ if (brokerRequest.getPinotQuery() != null && brokerRequest.getSelections()
!= null) {
Review comment:
Why do we need `brokerRequest.getSelections() != null` here? It will
never be true for aggregation right?
##########
File path:
pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
##########
@@ -191,11 +191,19 @@ public void start()
new
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(_clusterName).build();
Map<String, String> configMap = configAccessor.get(helixConfigScope, Arrays
.asList(Helix.ENABLE_CASE_INSENSITIVE_KEY,
Helix.DEPRECATED_ENABLE_CASE_INSENSITIVE_KEY,
- Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE));
+ Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE,
Helix.CONFIG_OF_DEFAULT_HYPERLOGLOG_LOG2M));
if (Boolean.parseBoolean(configMap.get(Helix.ENABLE_CASE_INSENSITIVE_KEY))
|| Boolean
.parseBoolean(configMap.get(Helix.DEPRECATED_ENABLE_CASE_INSENSITIVE_KEY))) {
_brokerConf.setProperty(Helix.ENABLE_CASE_INSENSITIVE_KEY, true);
}
+ String log2mStr = configMap.get(Helix.CONFIG_OF_DEFAULT_HYPERLOGLOG_LOG2M);
+ if (log2mStr != null) {
+ try {
+ _brokerConf.setProperty(Helix.CONFIG_OF_DEFAULT_HYPERLOGLOG_LOG2M,
Integer.parseInt(log2mStr));
+ } catch (NumberFormatException e) {
+ LOGGER.warn("Unable to set Log2M value {} for DistinctCountHLL
function. {}", log2mStr, e);
Review comment:
```suggestion
LOGGER.warn("Invalid config of '{}': '{}', using: {} as the default
log2m for HyperLogLog", Helix.CONFIG_OF_DEFAULT_HYPERLOGLOG_LOG2M, log2mStr,
CommonConstants.Helix.DEFAULT_HYPERLOGLOG_LOG2M);
```
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLAggregationFunction.java
##########
@@ -35,9 +36,20 @@
public class DistinctCountHLLAggregationFunction extends
BaseSingleInputAggregationFunction<HyperLogLog, Long> {
public static final int DEFAULT_LOG2M = 8;
+ protected final int _log2M;
Review comment:
(nit) Rename to `_log2m`?
##########
File path:
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
##########
@@ -425,6 +437,58 @@ public BrokerResponse handleRequest(JsonNode request,
@Nullable RequesterIdentit
return brokerResponse;
}
+ /**
+ * Set Log2m value for DistinctCountHLL Function
+ * @param brokerRequest
+ * @param hllLog2mOverride
+ */
+ static void handleHyperloglogLog2mOverride(BrokerRequest brokerRequest, int
hllLog2mOverride) {
+ if (brokerRequest.getAggregationsInfo() != null) {
+ for (AggregationInfo aggregationInfo :
brokerRequest.getAggregationsInfo()) {
+ switch
(AggregationFunctionType.valueOf(aggregationInfo.getAggregationType().toUpperCase()))
{
+ case DISTINCTCOUNTHLL:
+ case DISTINCTCOUNTHLLMV:
+ case DISTINCTCOUNTRAWHLL:
+ case DISTINCTCOUNTRAWHLLMV:
+ if (aggregationInfo.getExpressionsSize() == 1) {
+
aggregationInfo.addToExpressions(Integer.toString(hllLog2mOverride));
+ }
+ }
+ }
+ }
+ if (brokerRequest.getPinotQuery() != null && brokerRequest.getSelections()
!= null) {
+ for (Expression expr : brokerRequest.getPinotQuery().getSelectList()) {
+ updateDistinctCountHllExpr(expr, hllLog2mOverride);
+ }
+ }
+ }
+
+ private static void updateDistinctCountHllExpr(Expression expr, int
hllLog2mOverride) {
+ if (expr == null || expr.getFunctionCall() == null) {
+ return;
+ }
+ Function functionCall = expr.getFunctionCall();
Review comment:
`expr` can never be `null` here.
```suggestion
Function functionCall = expr.getFunctionCall();
if (functionCall == null) {
return;
}
```
##########
File path:
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
##########
@@ -425,6 +437,58 @@ public BrokerResponse handleRequest(JsonNode request,
@Nullable RequesterIdentit
return brokerResponse;
}
+ /**
+ * Set Log2m value for DistinctCountHLL Function
+ * @param brokerRequest
+ * @param hllLog2mOverride
+ */
+ static void handleHyperloglogLog2mOverride(BrokerRequest brokerRequest, int
hllLog2mOverride) {
+ if (brokerRequest.getAggregationsInfo() != null) {
+ for (AggregationInfo aggregationInfo :
brokerRequest.getAggregationsInfo()) {
+ switch
(AggregationFunctionType.valueOf(aggregationInfo.getAggregationType().toUpperCase()))
{
Review comment:
Please handle exception here, or switch on String instead of
AggregationFunctionType
##########
File path:
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
##########
@@ -112,6 +114,7 @@
private final boolean _enableCaseInsensitive;
private final boolean _enableQueryLimitOverride;
+ private final int _hllLog2mOverride;
Review comment:
Rename to `_defaultHllLog2m`?
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLMVAggregationFunction.java
##########
@@ -47,7 +48,7 @@ public void accept(AggregationFunctionVisitorBase visitor) {
@Override
public void aggregate(int length, AggregationResultHolder
aggregationResultHolder,
Map<TransformExpressionTree, BlockValSet> blockValSetMap) {
- HyperLogLog hyperLogLog = getDefaultHyperLogLog(aggregationResultHolder);
+ HyperLogLog hyperLogLog = getDefaultHyperLogLog(aggregationResultHolder,
_log2M);
Review comment:
Same here, don't pass `_log2M`
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLAggregationFunction.java
##########
@@ -35,9 +36,20 @@
public class DistinctCountHLLAggregationFunction extends
BaseSingleInputAggregationFunction<HyperLogLog, Long> {
public static final int DEFAULT_LOG2M = 8;
+ protected final int _log2M;
- public DistinctCountHLLAggregationFunction(String column) {
- super(column);
+ public DistinctCountHLLAggregationFunction(List<String> arguments) {
+ super(arguments.get(0));
+ int numExpressions = arguments.size();
+ // This function expects 1 or 2 arguments.
+ Preconditions
+ .checkArgument(numExpressions <= 2 && numExpressions >= 1,
"DistinctCountHLL expects 1 or 2 arguments, got: ",
+ numExpressions);
+ if (arguments.size() == 2) {
+ _log2M = Integer.valueOf(arguments.get(1).replace("'", ""));
Review comment:
No need to replace single quote here
##########
File path:
pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
##########
@@ -44,6 +44,9 @@
public static final String ENABLE_CASE_INSENSITIVE_KEY =
"enable.case.insensitive";
public static final String DEPRECATED_ENABLE_CASE_INSENSITIVE_KEY =
"enable.case.insensitive.pql";
+ public static final String CONFIG_OF_DEFAULT_HYPERLOGLOG_LOG2M =
"default.hyperloglog.log2m";
Review comment:
```suggestion
public static final String DEFAULT_HYPERLOGLOG_LOG2M_KEY =
"default.hyperloglog.log2m";
```
##########
File path:
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
##########
@@ -425,6 +437,58 @@ public BrokerResponse handleRequest(JsonNode request,
@Nullable RequesterIdentit
return brokerResponse;
}
+ /**
+ * Set Log2m value for DistinctCountHLL Function
+ * @param brokerRequest
+ * @param hllLog2mOverride
+ */
+ static void handleHyperloglogLog2mOverride(BrokerRequest brokerRequest, int
hllLog2mOverride) {
+ if (brokerRequest.getAggregationsInfo() != null) {
+ for (AggregationInfo aggregationInfo :
brokerRequest.getAggregationsInfo()) {
+ switch
(AggregationFunctionType.valueOf(aggregationInfo.getAggregationType().toUpperCase()))
{
+ case DISTINCTCOUNTHLL:
+ case DISTINCTCOUNTHLLMV:
+ case DISTINCTCOUNTRAWHLL:
+ case DISTINCTCOUNTRAWHLLMV:
+ if (aggregationInfo.getExpressionsSize() == 1) {
+
aggregationInfo.addToExpressions(Integer.toString(hllLog2mOverride));
+ }
+ }
+ }
+ }
+ if (brokerRequest.getPinotQuery() != null && brokerRequest.getSelections()
!= null) {
+ for (Expression expr : brokerRequest.getPinotQuery().getSelectList()) {
+ updateDistinctCountHllExpr(expr, hllLog2mOverride);
+ }
+ }
+ }
+
+ private static void updateDistinctCountHllExpr(Expression expr, int
hllLog2mOverride) {
+ if (expr == null || expr.getFunctionCall() == null) {
+ return;
+ }
+ Function functionCall = expr.getFunctionCall();
+ try {
+ switch
(AggregationFunctionType.getAggregationFunctionType(functionCall.getOperator()))
{
+ case DISTINCTCOUNTHLL:
+ case DISTINCTCOUNTHLLMV:
+ case DISTINCTCOUNTRAWHLL:
+ case DISTINCTCOUNTRAWHLLMV:
+ if (functionCall.getOperandsSize() == 1) {
+
functionCall.addToOperands(RequestUtils.getLiteralExpression(hllLog2mOverride));
+ }
+ return;
+ }
+ } catch (Exception e) {
+ // Swallow exceptions
+ }
+ if (functionCall.getOperandsSize() > 0) {
Review comment:
(nit) This check is redundant?
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLAggregationFunction.java
##########
@@ -67,7 +79,7 @@ public void aggregate(int length, AggregationResultHolder
aggregationResultHolde
DataType valueType = blockValSet.getValueType();
if (valueType != DataType.BYTES) {
- HyperLogLog hyperLogLog = getDefaultHyperLogLog(aggregationResultHolder);
+ HyperLogLog hyperLogLog = getDefaultHyperLogLog(aggregationResultHolder,
_log2M);
Review comment:
You don't need to pass member variable `_log2M` to
`getDefaultHyperLogLog()`, just remove the `static` should be good
##########
File path:
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
##########
@@ -129,6 +132,12 @@ public BaseBrokerRequestHandler(Configuration config,
RoutingManager routingMana
} else {
_tableCache = null;
}
+ if
(_config.containsKey(CommonConstants.Helix.CONFIG_OF_DEFAULT_HYPERLOGLOG_LOG2M))
{
+ _hllLog2mOverride =
_config.getInt(CommonConstants.Helix.CONFIG_OF_DEFAULT_HYPERLOGLOG_LOG2M,
+ CommonConstants.Helix.DEFAULT_HYPERLOGLOG_LOG2M);
+ } else {
+ _hllLog2mOverride = -1;
+ }
Review comment:
_defaultHllLog2m =
```suggestion
_hllLog2mOverride =
_config.getInt(CommonConstants.Helix.CONFIG_OF_DEFAULT_HYPERLOGLOG_LOG2M,
CommonConstants.Helix.DEFAULT_HYPERLOGLOG_LOG2M);
```
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLAggregationFunction.java
##########
@@ -35,9 +36,20 @@
public class DistinctCountHLLAggregationFunction extends
BaseSingleInputAggregationFunction<HyperLogLog, Long> {
public static final int DEFAULT_LOG2M = 8;
+ protected final int _log2M;
- public DistinctCountHLLAggregationFunction(String column) {
- super(column);
+ public DistinctCountHLLAggregationFunction(List<String> arguments) {
+ super(arguments.get(0));
+ int numExpressions = arguments.size();
+ // This function expects 1 or 2 arguments.
+ Preconditions
+ .checkArgument(numExpressions <= 2 && numExpressions >= 1,
"DistinctCountHLL expects 1 or 2 arguments, got: ",
+ numExpressions);
Review comment:
```suggestion
Preconditions
.checkArgument(numExpressions <= 2 && numExpressions >= 1,
"DistinctCountHLL expects 1 or 2 arguments, got: %s",
numExpressions);
```
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLAggregationFunction.java
##########
@@ -35,9 +36,20 @@
public class DistinctCountHLLAggregationFunction extends
BaseSingleInputAggregationFunction<HyperLogLog, Long> {
public static final int DEFAULT_LOG2M = 8;
Review comment:
Replace this with the one in `CommonConstants`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]