This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch cluster_config_log2m in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit fec0e54e7622770cbf0ea9ce9fa63e610963037a Author: Xiang Fu <[email protected]> AuthorDate: Sun Jun 14 04:26:57 2020 -0700 Adding support to configure log2m value for hyperloglog --- .../broker/broker/helix/HelixBrokerStarter.java | 10 +++- .../requesthandler/BaseBrokerRequestHandler.java | 62 ++++++++++++++++++++++ .../apache/pinot/common/utils/CommonConstants.java | 3 ++ .../helix/core/util/HelixSetupUtils.java | 1 + .../pinot/controller/helix/ControllerTest.java | 2 + .../DistinctCountHLLValueAggregator.java | 1 + .../function/AggregationFunctionFactory.java | 8 +-- .../DistinctCountHLLAggregationFunction.java | 52 +++++++++++------- .../DistinctCountHLLMVAggregationFunction.java | 27 +++++----- .../DistinctCountRawHLLAggregationFunction.java | 5 +- .../DistinctCountRawHLLMVAggregationFunction.java | 5 +- .../tests/OfflineClusterIntegrationTest.java | 28 ++++++++++ 12 files changed, 163 insertions(+), 41 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java index 822b37e..5bee98b 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java @@ -191,11 +191,19 @@ public class HelixBrokerStarter implements ServiceStartable { new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(_clusterName).build(); Map<String, String> configMap = configAccessor.get(helixConfigScope, Arrays .asList(Helix.ENABLE_CASE_INSENSITIVE_KEY, Helix.DEPRECATED_ENABLE_CASE_INSENSITIVE_KEY, - Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE)); + Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE, Helix.CONFIG_OF_DEFAULT_HYPERLOGLOG_LOG2M)); if (Boolean.parseBoolean(configMap.get(Helix.ENABLE_CASE_INSENSITIVE_KEY)) || Boolean .parseBoolean(configMap.get(Helix.DEPRECATED_ENABLE_CASE_INSENSITIVE_KEY))) { _brokerConf.setProperty(Helix.ENABLE_CASE_INSENSITIVE_KEY, true); } + String log2mStr = configMap.get(Helix.CONFIG_OF_DEFAULT_HYPERLOGLOG_LOG2M); + if (log2mStr != null) { + try { + _brokerConf.setProperty(Helix.CONFIG_OF_DEFAULT_HYPERLOGLOG_LOG2M, Integer.parseInt(log2mStr)); + } catch (NumberFormatException e) { + LOGGER.warn("Unable to set Log2M value {} for DistinctCountHLL function. {}", log2mStr, e); + } + } if (Boolean.parseBoolean(configMap.get(Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE))) { _brokerConf.setProperty(Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE, true); } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java index bda87f2..584686e 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java @@ -62,6 +62,7 @@ import org.apache.pinot.common.request.FilterOperator; import org.apache.pinot.common.request.FilterQuery; import org.apache.pinot.common.request.FilterQueryMap; import org.apache.pinot.common.request.Identifier; +import org.apache.pinot.common.request.Function; import org.apache.pinot.common.request.Literal; import org.apache.pinot.common.request.PinotQuery; import org.apache.pinot.common.request.Selection; @@ -74,6 +75,7 @@ import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.common.utils.CommonConstants.Broker; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.helix.TableCache; +import org.apache.pinot.common.utils.request.RequestUtils; import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils; import org.apache.pinot.core.query.reduce.BrokerReduceService; import org.apache.pinot.core.transport.ServerInstance; @@ -112,6 +114,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { private final boolean _enableCaseInsensitive; private final boolean _enableQueryLimitOverride; + private final int _hllLog2mOverride; private final TableCache _tableCache; public BaseBrokerRequestHandler(Configuration config, RoutingManager routingManager, @@ -129,6 +132,13 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { } else { _tableCache = null; } + if (_config.containsKey(CommonConstants.Helix.CONFIG_OF_DEFAULT_HYPERLOGLOG_LOG2M)) { + _hllLog2mOverride = _config.getInt(CommonConstants.Helix.CONFIG_OF_DEFAULT_HYPERLOGLOG_LOG2M, + CommonConstants.Helix.DEFAULT_HYPERLOGLOG_LOG2M); + } else { + _hllLog2mOverride = -1; + } + _enableQueryLimitOverride = _config.getBoolean(Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE, false); @@ -200,6 +210,9 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { LOGGER.warn("Caught exception while rewriting PQL to make it case-insensitive {}: {}, {}", requestId, query, e); } } + if (_hllLog2mOverride > 0) { + handleHyperloglogLog2mOverride(brokerRequest, _hllLog2mOverride); + } if (_enableQueryLimitOverride) { handleQueryLimitOverride(brokerRequest, _queryResponseLimit); } @@ -426,6 +439,55 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { } /** + * Set Log2m value for DistinctCountHLL Function + * @param brokerRequest + * @param hllLog2mOverride + */ + static void handleHyperloglogLog2mOverride(BrokerRequest brokerRequest, int hllLog2mOverride) { + if (brokerRequest.getAggregationsInfo() != null) { + for (AggregationInfo aggregationInfo : brokerRequest.getAggregationsInfo()) { + switch (AggregationFunctionType.valueOf(aggregationInfo.getAggregationType().toUpperCase())) { + case DISTINCTCOUNTHLL: + case DISTINCTCOUNTHLLMV: + case DISTINCTCOUNTRAWHLL: + case DISTINCTCOUNTRAWHLLMV: + if (aggregationInfo.getExpressionsSize() == 1) { + aggregationInfo.addToExpressions(Integer.toString(hllLog2mOverride)); + } + } + } + } + if (brokerRequest.getPinotQuery() != null && brokerRequest.getSelections() != null) { + for (Expression expr : brokerRequest.getPinotQuery().getSelectList()) { + updateDistinctCountHllExpr(expr, hllLog2mOverride); + } + } + } + + private static void updateDistinctCountHllExpr(Expression expr, int hllLog2mOverride) { + if (expr == null || expr.getFunctionCall() == null) { + return; + } + Function functionCall = expr.getFunctionCall(); + String funcName = functionCall.getOperator().toUpperCase(); + switch (AggregationFunctionType.valueOf(funcName)) { + case DISTINCTCOUNTHLL: + case DISTINCTCOUNTHLLMV: + case DISTINCTCOUNTRAWHLL: + case DISTINCTCOUNTRAWHLLMV: + if (functionCall.getOperandsSize() == 1) { + functionCall.addToOperands(RequestUtils.getLiteralExpression(hllLog2mOverride)); + } + return; + } + if (functionCall.getOperandsSize() > 0) { + for (Expression operand : functionCall.getOperands()) { + updateDistinctCountHllExpr(operand, hllLog2mOverride); + } + } + } + + /** * Reset limit for selection query if it exceeds maxQuerySelectionLimit. * @param brokerRequest * @param queryLimit diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java index 8ed27dd..0a06f6d 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java @@ -44,6 +44,9 @@ public class CommonConstants { public static final String ENABLE_CASE_INSENSITIVE_KEY = "enable.case.insensitive"; public static final String DEPRECATED_ENABLE_CASE_INSENSITIVE_KEY = "enable.case.insensitive.pql"; + public static final String CONFIG_OF_DEFAULT_HYPERLOGLOG_LOG2M = "default.hyperloglog.log2m"; + public static final int DEFAULT_HYPERLOGLOG_LOG2M = 8; + // More information on why these numbers are set can be found in the following doc: // https://cwiki.apache.org/confluence/display/PINOT/Controller+Separation+between+Helix+and+Pinot public static final int NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE = 24; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java index 3bb34f3..1030dc6 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java @@ -78,6 +78,7 @@ public class HelixSetupUtils { Map<String, String> configMap = new HashMap<>(); configMap.put(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, Boolean.toString(true)); configMap.put(ENABLE_CASE_INSENSITIVE_KEY, Boolean.toString(false)); + configMap.put(CONFIG_OF_DEFAULT_HYPERLOGLOG_LOG2M, Integer.toString(DEFAULT_HYPERLOGLOG_LOG2M)); configMap.put(CommonConstants.Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE, Boolean.toString(false)); admin.setConfig(configScope, configMap); LOGGER.info("New Helix cluster: {} created", helixClusterName); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java index b9a3fa8..9592518 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java @@ -185,6 +185,8 @@ public abstract class ControllerTest { } //enable case insensitive pql for test cases. configAccessor.set(scope, CommonConstants.Helix.ENABLE_CASE_INSENSITIVE_KEY, Boolean.toString(true)); + //Set hyperloglog log2m value to 12. + configAccessor.set(scope, CommonConstants.Helix.CONFIG_OF_DEFAULT_HYPERLOGLOG_LOG2M, Integer.toString(12)); } protected ControllerStarter getControllerStarter(ControllerConf config) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/DistinctCountHLLValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/DistinctCountHLLValueAggregator.java index d4392b2..c85b455 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/DistinctCountHLLValueAggregator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/DistinctCountHLLValueAggregator.java @@ -51,6 +51,7 @@ public class DistinctCountHLLValueAggregator implements ValueAggregator<Object, initialValue = deserializeAggregatedValue(bytes); _maxByteSize = Math.max(_maxByteSize, bytes.length); } else { + // TODO: Handle configurable log2m for StarTreeBuilder initialValue = new HyperLogLog(DistinctCountHLLAggregationFunction.DEFAULT_LOG2M); initialValue.offer(rawValue); _maxByteSize = Math.max(_maxByteSize, DEFAULT_LOG2M_BYTE_SIZE); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java index 496b50e..f10b240 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java @@ -130,9 +130,9 @@ public class AggregationFunctionFactory { case DISTINCTCOUNT: return new DistinctCountAggregationFunction(column); case DISTINCTCOUNTHLL: - return new DistinctCountHLLAggregationFunction(column); + return new DistinctCountHLLAggregationFunction(arguments); case DISTINCTCOUNTRAWHLL: - return new DistinctCountRawHLLAggregationFunction(column); + return new DistinctCountRawHLLAggregationFunction(arguments); case FASTHLL: return new FastHLLAggregationFunction(column); case DISTINCTCOUNTTHETASKETCH: @@ -154,9 +154,9 @@ public class AggregationFunctionFactory { case DISTINCTCOUNTMV: return new DistinctCountMVAggregationFunction(column); case DISTINCTCOUNTHLLMV: - return new DistinctCountHLLMVAggregationFunction(column); + return new DistinctCountHLLMVAggregationFunction(arguments); case DISTINCTCOUNTRAWHLLMV: - return new DistinctCountRawHLLMVAggregationFunction(column); + return new DistinctCountRawHLLMVAggregationFunction(arguments); case DISTINCT: Preconditions.checkState(brokerRequest != null, "Broker request must be provided for 'DISTINCT' aggregation function"); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLAggregationFunction.java index 88beff9..6979153 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLAggregationFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLAggregationFunction.java @@ -20,6 +20,7 @@ package org.apache.pinot.core.query.aggregation.function; import com.clearspring.analytics.stream.cardinality.HyperLogLog; import com.google.common.base.Preconditions; +import java.util.List; import java.util.Map; import org.apache.pinot.common.function.AggregationFunctionType; import org.apache.pinot.common.request.transform.TransformExpressionTree; @@ -35,9 +36,20 @@ import org.apache.pinot.spi.data.FieldSpec.DataType; public class DistinctCountHLLAggregationFunction extends BaseSingleInputAggregationFunction<HyperLogLog, Long> { public static final int DEFAULT_LOG2M = 8; + protected final int _log2M; - public DistinctCountHLLAggregationFunction(String column) { - super(column); + public DistinctCountHLLAggregationFunction(List<String> arguments) { + super(arguments.get(0)); + int numExpressions = arguments.size(); + // This function expects 1 or 2 arguments. + Preconditions + .checkArgument(numExpressions <= 2 && numExpressions >= 1, "DistinctCountHLL expects 1 or 2 arguments, got: ", + numExpressions); + if (arguments.size() == 2) { + _log2M = Integer.valueOf(arguments.get(1).replace("'", "")); + } else { + _log2M = DEFAULT_LOG2M; + } } @Override @@ -67,7 +79,7 @@ public class DistinctCountHLLAggregationFunction extends BaseSingleInputAggregat DataType valueType = blockValSet.getValueType(); if (valueType != DataType.BYTES) { - HyperLogLog hyperLogLog = getDefaultHyperLogLog(aggregationResultHolder); + HyperLogLog hyperLogLog = getDefaultHyperLogLog(aggregationResultHolder, _log2M); switch (valueType) { case INT: int[] intValues = blockValSet.getIntValuesSV(); @@ -135,31 +147,31 @@ public class DistinctCountHLLAggregationFunction extends BaseSingleInputAggregat case INT: int[] intValues = blockValSet.getIntValuesSV(); for (int i = 0; i < length; i++) { - getDefaultHyperLogLog(groupByResultHolder, groupKeyArray[i]).offer(intValues[i]); + getDefaultHyperLogLog(groupByResultHolder, groupKeyArray[i], _log2M).offer(intValues[i]); } break; case LONG: long[] longValues = blockValSet.getLongValuesSV(); for (int i = 0; i < length; i++) { - getDefaultHyperLogLog(groupByResultHolder, groupKeyArray[i]).offer(longValues[i]); + getDefaultHyperLogLog(groupByResultHolder, groupKeyArray[i], _log2M).offer(longValues[i]); } break; case FLOAT: float[] floatValues = blockValSet.getFloatValuesSV(); for (int i = 0; i < length; i++) { - getDefaultHyperLogLog(groupByResultHolder, groupKeyArray[i]).offer(floatValues[i]); + getDefaultHyperLogLog(groupByResultHolder, groupKeyArray[i], _log2M).offer(floatValues[i]); } break; case DOUBLE: double[] doubleValues = blockValSet.getDoubleValuesSV(); for (int i = 0; i < length; i++) { - getDefaultHyperLogLog(groupByResultHolder, groupKeyArray[i]).offer(doubleValues[i]); + getDefaultHyperLogLog(groupByResultHolder, groupKeyArray[i], _log2M).offer(doubleValues[i]); } break; case STRING: String[] stringValues = blockValSet.getStringValuesSV(); for (int i = 0; i < length; i++) { - getDefaultHyperLogLog(groupByResultHolder, groupKeyArray[i]).offer(stringValues[i]); + getDefaultHyperLogLog(groupByResultHolder, groupKeyArray[i], _log2M).offer(stringValues[i]); } break; case BYTES: @@ -197,7 +209,7 @@ public class DistinctCountHLLAggregationFunction extends BaseSingleInputAggregat for (int i = 0; i < length; i++) { int value = intValues[i]; for (int groupKey : groupKeysArray[i]) { - getDefaultHyperLogLog(groupByResultHolder, groupKey).offer(value); + getDefaultHyperLogLog(groupByResultHolder, groupKey, _log2M).offer(value); } } break; @@ -206,7 +218,7 @@ public class DistinctCountHLLAggregationFunction extends BaseSingleInputAggregat for (int i = 0; i < length; i++) { long value = longValues[i]; for (int groupKey : groupKeysArray[i]) { - getDefaultHyperLogLog(groupByResultHolder, groupKey).offer(value); + getDefaultHyperLogLog(groupByResultHolder, groupKey, _log2M).offer(value); } } break; @@ -215,7 +227,7 @@ public class DistinctCountHLLAggregationFunction extends BaseSingleInputAggregat for (int i = 0; i < length; i++) { float value = floatValues[i]; for (int groupKey : groupKeysArray[i]) { - getDefaultHyperLogLog(groupByResultHolder, groupKey).offer(value); + getDefaultHyperLogLog(groupByResultHolder, groupKey, _log2M).offer(value); } } break; @@ -224,7 +236,7 @@ public class DistinctCountHLLAggregationFunction extends BaseSingleInputAggregat for (int i = 0; i < length; i++) { double value = doubleValues[i]; for (int groupKey : groupKeysArray[i]) { - getDefaultHyperLogLog(groupByResultHolder, groupKey).offer(value); + getDefaultHyperLogLog(groupByResultHolder, groupKey, _log2M).offer(value); } } break; @@ -233,7 +245,7 @@ public class DistinctCountHLLAggregationFunction extends BaseSingleInputAggregat for (int i = 0; i < length; i++) { String value = stringValues[i]; for (int groupKey : groupKeysArray[i]) { - getDefaultHyperLogLog(groupByResultHolder, groupKey).offer(value); + getDefaultHyperLogLog(groupByResultHolder, groupKey, _log2M).offer(value); } } break; @@ -267,7 +279,7 @@ public class DistinctCountHLLAggregationFunction extends BaseSingleInputAggregat public HyperLogLog extractAggregationResult(AggregationResultHolder aggregationResultHolder) { HyperLogLog hyperLogLog = aggregationResultHolder.getResult(); if (hyperLogLog == null) { - return new HyperLogLog(DEFAULT_LOG2M); + return new HyperLogLog(_log2M); } else { return hyperLogLog; } @@ -277,7 +289,7 @@ public class DistinctCountHLLAggregationFunction extends BaseSingleInputAggregat public HyperLogLog extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) { HyperLogLog hyperLogLog = groupByResultHolder.getResult(groupKey); if (hyperLogLog == null) { - return new HyperLogLog(DEFAULT_LOG2M); + return new HyperLogLog(_log2M); } else { return hyperLogLog; } @@ -327,12 +339,13 @@ public class DistinctCountHLLAggregationFunction extends BaseSingleInputAggregat * Returns the HyperLogLog from the result holder or creates a new one with default log2m if it does not exist. * * @param aggregationResultHolder Result holder + * @param log2m * @return HyperLogLog from the result holder */ - protected static HyperLogLog getDefaultHyperLogLog(AggregationResultHolder aggregationResultHolder) { + protected static HyperLogLog getDefaultHyperLogLog(AggregationResultHolder aggregationResultHolder, int log2m) { HyperLogLog hyperLogLog = aggregationResultHolder.getResult(); if (hyperLogLog == null) { - hyperLogLog = new HyperLogLog(DEFAULT_LOG2M); + hyperLogLog = new HyperLogLog(log2m); aggregationResultHolder.setValue(hyperLogLog); } return hyperLogLog; @@ -343,12 +356,13 @@ public class DistinctCountHLLAggregationFunction extends BaseSingleInputAggregat * * @param groupByResultHolder Result holder * @param groupKey Group key for which to return the HyperLogLog + * @param log2m * @return HyperLogLog for the group key */ - protected static HyperLogLog getDefaultHyperLogLog(GroupByResultHolder groupByResultHolder, int groupKey) { + protected static HyperLogLog getDefaultHyperLogLog(GroupByResultHolder groupByResultHolder, int groupKey, int log2m) { HyperLogLog hyperLogLog = groupByResultHolder.getResult(groupKey); if (hyperLogLog == null) { - hyperLogLog = new HyperLogLog(DEFAULT_LOG2M); + hyperLogLog = new HyperLogLog(log2m); groupByResultHolder.setValueForKey(groupKey, hyperLogLog); } return hyperLogLog; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLMVAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLMVAggregationFunction.java index 1ff547c..1c6e80d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLMVAggregationFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLMVAggregationFunction.java @@ -19,6 +19,7 @@ package org.apache.pinot.core.query.aggregation.function; import com.clearspring.analytics.stream.cardinality.HyperLogLog; +import java.util.List; import java.util.Map; import org.apache.pinot.common.function.AggregationFunctionType; import org.apache.pinot.common.request.transform.TransformExpressionTree; @@ -30,8 +31,8 @@ import org.apache.pinot.spi.data.FieldSpec.DataType; public class DistinctCountHLLMVAggregationFunction extends DistinctCountHLLAggregationFunction { - public DistinctCountHLLMVAggregationFunction(String column) { - super(column); + public DistinctCountHLLMVAggregationFunction(List<String> arguments) { + super(arguments); } @Override @@ -47,7 +48,7 @@ public class DistinctCountHLLMVAggregationFunction extends DistinctCountHLLAggre @Override public void aggregate(int length, AggregationResultHolder aggregationResultHolder, Map<TransformExpressionTree, BlockValSet> blockValSetMap) { - HyperLogLog hyperLogLog = getDefaultHyperLogLog(aggregationResultHolder); + HyperLogLog hyperLogLog = getDefaultHyperLogLog(aggregationResultHolder, _log2M); BlockValSet blockValSet = blockValSetMap.get(_expression); DataType valueType = blockValSet.getValueType(); @@ -108,7 +109,7 @@ public class DistinctCountHLLMVAggregationFunction extends DistinctCountHLLAggre case INT: int[][] intValuesArray = blockValSet.getIntValuesMV(); for (int i = 0; i < length; i++) { - HyperLogLog hyperLogLog = getDefaultHyperLogLog(groupByResultHolder, groupKeyArray[i]); + HyperLogLog hyperLogLog = getDefaultHyperLogLog(groupByResultHolder, groupKeyArray[i], _log2M); for (int value : intValuesArray[i]) { hyperLogLog.offer(value); } @@ -117,7 +118,7 @@ public class DistinctCountHLLMVAggregationFunction extends DistinctCountHLLAggre case LONG: long[][] longValuesArray = blockValSet.getLongValuesMV(); for (int i = 0; i < length; i++) { - HyperLogLog hyperLogLog = getDefaultHyperLogLog(groupByResultHolder, groupKeyArray[i]); + HyperLogLog hyperLogLog = getDefaultHyperLogLog(groupByResultHolder, groupKeyArray[i], _log2M); for (long value : longValuesArray[i]) { hyperLogLog.offer(value); } @@ -126,7 +127,7 @@ public class DistinctCountHLLMVAggregationFunction extends DistinctCountHLLAggre case FLOAT: float[][] floatValuesArray = blockValSet.getFloatValuesMV(); for (int i = 0; i < length; i++) { - HyperLogLog hyperLogLog = getDefaultHyperLogLog(groupByResultHolder, groupKeyArray[i]); + HyperLogLog hyperLogLog = getDefaultHyperLogLog(groupByResultHolder, groupKeyArray[i], _log2M); for (float value : floatValuesArray[i]) { hyperLogLog.offer(value); } @@ -135,7 +136,7 @@ public class DistinctCountHLLMVAggregationFunction extends DistinctCountHLLAggre case DOUBLE: double[][] doubleValuesArray = blockValSet.getDoubleValuesMV(); for (int i = 0; i < length; i++) { - HyperLogLog hyperLogLog = getDefaultHyperLogLog(groupByResultHolder, groupKeyArray[i]); + HyperLogLog hyperLogLog = getDefaultHyperLogLog(groupByResultHolder, groupKeyArray[i], _log2M); for (double value : doubleValuesArray[i]) { hyperLogLog.offer(value); } @@ -144,7 +145,7 @@ public class DistinctCountHLLMVAggregationFunction extends DistinctCountHLLAggre case STRING: String[][] stringValuesArray = blockValSet.getStringValuesMV(); for (int i = 0; i < length; i++) { - HyperLogLog hyperLogLog = getDefaultHyperLogLog(groupByResultHolder, groupKeyArray[i]); + HyperLogLog hyperLogLog = getDefaultHyperLogLog(groupByResultHolder, groupKeyArray[i], _log2M); for (String value : stringValuesArray[i]) { hyperLogLog.offer(value); } @@ -168,7 +169,7 @@ public class DistinctCountHLLMVAggregationFunction extends DistinctCountHLLAggre for (int i = 0; i < length; i++) { int[] intValues = intValuesArray[i]; for (int groupKey : groupKeysArray[i]) { - HyperLogLog hyperLogLog = getDefaultHyperLogLog(groupByResultHolder, groupKey); + HyperLogLog hyperLogLog = getDefaultHyperLogLog(groupByResultHolder, groupKey, _log2M); for (int value : intValues) { hyperLogLog.offer(value); } @@ -180,7 +181,7 @@ public class DistinctCountHLLMVAggregationFunction extends DistinctCountHLLAggre for (int i = 0; i < length; i++) { long[] longValues = longValuesArray[i]; for (int groupKey : groupKeysArray[i]) { - HyperLogLog hyperLogLog = getDefaultHyperLogLog(groupByResultHolder, groupKey); + HyperLogLog hyperLogLog = getDefaultHyperLogLog(groupByResultHolder, groupKey, _log2M); for (long value : longValues) { hyperLogLog.offer(value); } @@ -192,7 +193,7 @@ public class DistinctCountHLLMVAggregationFunction extends DistinctCountHLLAggre for (int i = 0; i < length; i++) { float[] floatValues = floatValuesArray[i]; for (int groupKey : groupKeysArray[i]) { - HyperLogLog hyperLogLog = getDefaultHyperLogLog(groupByResultHolder, groupKey); + HyperLogLog hyperLogLog = getDefaultHyperLogLog(groupByResultHolder, groupKey, _log2M); for (float value : floatValues) { hyperLogLog.offer(value); } @@ -204,7 +205,7 @@ public class DistinctCountHLLMVAggregationFunction extends DistinctCountHLLAggre for (int i = 0; i < length; i++) { double[] doubleValues = doubleValuesArray[i]; for (int groupKey : groupKeysArray[i]) { - HyperLogLog hyperLogLog = getDefaultHyperLogLog(groupByResultHolder, groupKey); + HyperLogLog hyperLogLog = getDefaultHyperLogLog(groupByResultHolder, groupKey, _log2M); for (double value : doubleValues) { hyperLogLog.offer(value); } @@ -216,7 +217,7 @@ public class DistinctCountHLLMVAggregationFunction extends DistinctCountHLLAggre for (int i = 0; i < length; i++) { String[] stringValues = stringValuesArray[i]; for (int groupKey : groupKeysArray[i]) { - HyperLogLog hyperLogLog = getDefaultHyperLogLog(groupByResultHolder, groupKey); + HyperLogLog hyperLogLog = getDefaultHyperLogLog(groupByResultHolder, groupKey, _log2M); for (String value : stringValues) { hyperLogLog.offer(value); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawHLLAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawHLLAggregationFunction.java index 40050f6..9444948 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawHLLAggregationFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawHLLAggregationFunction.java @@ -19,6 +19,7 @@ package org.apache.pinot.core.query.aggregation.function; import com.clearspring.analytics.stream.cardinality.HyperLogLog; +import java.util.List; import java.util.Map; import org.apache.pinot.common.function.AggregationFunctionType; import org.apache.pinot.common.request.transform.TransformExpressionTree; @@ -32,8 +33,8 @@ import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; public class DistinctCountRawHLLAggregationFunction extends BaseSingleInputAggregationFunction<HyperLogLog, SerializedHLL> { private final DistinctCountHLLAggregationFunction _distinctCountHLLAggregationFunction; - public DistinctCountRawHLLAggregationFunction(String column) { - this(column, new DistinctCountHLLAggregationFunction(column)); + public DistinctCountRawHLLAggregationFunction(List<String> arguments) { + this(arguments.get(0), new DistinctCountHLLAggregationFunction(arguments)); } DistinctCountRawHLLAggregationFunction(String column, diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawHLLMVAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawHLLMVAggregationFunction.java index 3712599..8365db9 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawHLLMVAggregationFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawHLLMVAggregationFunction.java @@ -18,13 +18,14 @@ */ package org.apache.pinot.core.query.aggregation.function; +import java.util.List; import org.apache.pinot.common.function.AggregationFunctionType; public class DistinctCountRawHLLMVAggregationFunction extends DistinctCountRawHLLAggregationFunction { - public DistinctCountRawHLLMVAggregationFunction(String column) { - super(column, new DistinctCountHLLMVAggregationFunction(column)); + public DistinctCountRawHLLMVAggregationFunction(List<String> arguments) { + super(arguments.get(0), new DistinctCountHLLMVAggregationFunction(arguments)); } @Override diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java index fc06315..fa75eb2 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java @@ -1070,4 +1070,32 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet return true; }, 10_000L, "Failed to get results for case-insensitive queries"); } + + @Test + public void testDistinctCountHll() + throws Exception { + String query; + + // The Accurate value is 6538. + query = "SELECT distinctCount(FlightNum) FROM mytable "; + assertEquals(postQuery(query).get("aggregationResults").get(0).get("value").asLong(), 6538); + assertEquals(postSqlQuery(query, _brokerBaseApiUrl).get("resultTable").get("rows").get(0).get(0).asLong(), 6538); + + // Expected distinctCountHll with different log2m value from 2 to 19. The Accurate value is 6538. + long[] expectedResults = + new long[]{3504, 6347, 8877, 9729, 9046, 7672, 7538, 6993, 6649, 6651, 6553, 6525, 6459, 6523, 6532, 6544, 6538, 6539}; + + for (int i = 2; i < 20; i++) { + query = String.format("SELECT distinctCountHLL(FlightNum, %d) FROM mytable ", i); + assertEquals(postQuery(query).get("aggregationResults").get(0).get("value").asLong(), expectedResults[i - 2]); + assertEquals(postSqlQuery(query, _brokerBaseApiUrl).get("resultTable").get("rows").get(0).get(0).asLong(), + expectedResults[i - 2]); + } + + // Default HLL is set as log2m=12 + query = "SELECT distinctCountHLL(FlightNum) FROM mytable "; + assertEquals(postQuery(query).get("aggregationResults").get(0).get("value").asLong(), expectedResults[10]); + assertEquals(postSqlQuery(query, _brokerBaseApiUrl).get("resultTable").get("rows").get(0).get(0).asLong(), + expectedResults[10]); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
