kgyrtkirk commented on code in PR #16682:
URL: https://github.com/apache/druid/pull/16682#discussion_r1675570590
##########
sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/BuiltinApproxCountDistinctSqlAggregator.java:
##########
@@ -118,12 +120,14 @@ public Aggregation toDruidAggregation(
}
if (inputType.is(ValueType.COMPLEX)) {
- aggregatorFactory = new HyperUniquesAggregatorFactory(
- aggregatorName,
- dimensionSpec.getOutputName(),
- false,
- true
- );
+ if (validateInputType(inputType)) {
Review Comment:
why not do the `return null` here with the setPlanning error?
##########
extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchBaseSqlAggregator.java:
##########
@@ -116,26 +121,38 @@ public Aggregation toDruidAggregation(
);
}
- final DimensionSpec dimensionSpec;
-
- if (columnArg.isDirectColumnAccess()) {
- dimensionSpec = columnArg.getSimpleExtraction().toDimensionSpec(null,
inputType);
- } else {
- String virtualColumnName =
virtualColumnRegistry.getOrCreateVirtualColumnForExpression(
- columnArg,
- dataType
+ if (!inputType.is(ValueType.COMPLEX)) {
Review Comment:
return with error here
##########
extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchBaseSqlAggregator.java:
##########
@@ -154,19 +155,21 @@ public Aggregation toDruidAggregation(
}
if (inputType.is(ValueType.COMPLEX)) {
- aggregatorFactory = new HllSketchMergeAggregatorFactory(
- aggregatorName,
- dimensionSpec.getOutputName(),
- logK,
- tgtHllType,
-
- // For HllSketchMergeAggregatorFactory, stringEncoding is only
advisory to aid in detection of mismatched
- // merges. It does not affect the results of the aggregator. At
this point in the code, we do not know what
- // the input encoding of the original sketches was, so we set it
to the default.
- HllSketchAggregatorFactory.DEFAULT_STRING_ENCODING,
- finalizeSketch ||
SketchQueryContext.isFinalizeOuterSketches(plannerContext),
- ROUND
- );
+ if (validateInputType(inputType)) {
Review Comment:
why not return with the error right away?
##########
processing/src/main/java/org/apache/druid/query/aggregation/hyperloglog/HyperUniquesAggregatorFactory.java:
##########
@@ -122,25 +118,43 @@ public BufferAggregator
factorizeBuffered(ColumnSelectorFactory metricFactory)
if (selector instanceof NilColumnValueSelector) {
return NoopBufferAggregator.instance();
}
- final Class classOfObject = selector.classOfObject();
- if (classOfObject.equals(Object.class) ||
HyperLogLogCollector.class.isAssignableFrom(classOfObject)) {
- return new HyperUniquesBufferAggregator(selector);
- }
-
- throw new IAE("Incompatible type for metric[%s], expected a HyperUnique,
got a %s", fieldName, classOfObject);
+ validateInputs(metricFactory.getColumnCapabilities(fieldName));
+ return new HyperUniquesBufferAggregator(selector);
}
@Override
public VectorAggregator factorizeVector(final VectorColumnSelectorFactory
selectorFactory)
{
- final ColumnCapabilities capabilities =
selectorFactory.getColumnCapabilities(fieldName);
- if (!Types.is(capabilities, ValueType.COMPLEX)) {
+ final ColumnCapabilities columnCapabilities =
selectorFactory.getColumnCapabilities(fieldName);
+ if (!Types.is(columnCapabilities, ValueType.COMPLEX)) {
return NoopVectorAggregator.instance();
} else {
+ validateInputs(columnCapabilities);
return new
HyperUniquesVectorAggregator(selectorFactory.makeObjectSelector(fieldName));
}
}
+ /**
+ * Validates whether the aggregator supports the input column type.
+ * Supported column types are complex types of hyperUnique,
preComputedHyperUnique, as well as UNKNOWN_COMPLEX.
+ * @param capabilities
+ */
+ private void validateInputs(@Nullable ColumnCapabilities capabilities)
+ {
+ if (capabilities != null) {
+ final ColumnType type = capabilities.toColumnType();
+ if (!(ColumnType.UNKNOWN_COMPLEX.equals(type) || TYPE.equals(type) ||
PRECOMPUTED_TYPE.equals(type))) {
Review Comment:
you push the negation into this conditional...or return early if they are
ok...
but why allow everything in case `capabilities == null` ? does that cause
any trouble? if its not I think its better to throw an error in that case as
well....
##########
extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchBaseSqlAggregator.java:
##########
@@ -89,13 +90,17 @@ public Aggregation toDruidAggregation(
sketchSize = SketchAggregatorFactory.DEFAULT_MAX_SKETCH_SIZE;
}
- final AggregatorFactory aggregatorFactory;
+ AggregatorFactory aggregatorFactory = null;
final String aggregatorName = finalizeAggregations ?
Calcites.makePrefixedName(name, "a") : name;
if (columnArg.isDirectColumnAccess()
&& inputAccessor.getInputRowSignature()
.getColumnType(columnArg.getDirectColumn())
- .map(type -> type.is(ValueType.COMPLEX))
+ .map(type -> type.is(ValueType.COMPLEX) && (
Review Comment:
note: `type.is(ValueType.COMPLEX)` is redundant; can be just removed
##########
extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchApproxCountDistinctSqlAggregator.java:
##########
@@ -21,28 +21,68 @@
import org.apache.calcite.sql.SqlAggFunction;
import org.apache.calcite.sql.SqlFunctionCategory;
+import org.apache.calcite.sql.type.CastedLiteralOperandTypeCheckers;
import org.apache.calcite.sql.type.InferTypes;
+import org.apache.calcite.sql.type.OperandTypes;
+import org.apache.calcite.sql.type.SqlSingleOperandTypeChecker;
import org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.java.util.common.StringEncoding;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.AggregatorFactory;
+import
org.apache.druid.query.aggregation.datasketches.hll.HllSketchBuildAggregatorFactory;
+import
org.apache.druid.query.aggregation.datasketches.hll.HllSketchMergeAggregatorFactory;
import
org.apache.druid.query.aggregation.post.FinalizingFieldAccessPostAggregator;
import org.apache.druid.sql.calcite.aggregation.Aggregation;
import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
import org.apache.druid.sql.calcite.expression.OperatorConversions;
+import org.apache.druid.sql.calcite.table.RowSignatures;
import java.util.Collections;
+/**
+ * Approximate count distinct aggregator using HLL sketches.
+ * Supported column types: String, Numeric, HLLSketchMerge, HLLSketchBuild.
+ */
public class HllSketchApproxCountDistinctSqlAggregator extends
HllSketchBaseSqlAggregator implements SqlAggregator
{
public static final String NAME = "APPROX_COUNT_DISTINCT_DS_HLL";
+
+ private static final SqlSingleOperandTypeChecker
AGGREGATED_COLUMN_TYPE_CHECKER = OperandTypes.or(
+ OperandTypes.STRING,
+ OperandTypes.NUMERIC,
+ RowSignatures.complexTypeChecker(HllSketchMergeAggregatorFactory.TYPE),
+ RowSignatures.complexTypeChecker(HllSketchBuildAggregatorFactory.TYPE)
+ );
+
private static final SqlAggFunction FUNCTION_INSTANCE =
OperatorConversions.aggregatorBuilder(NAME)
- .operandNames("column", "lgK", "tgtHllType")
- .operandTypes(SqlTypeFamily.ANY,
SqlTypeFamily.NUMERIC, SqlTypeFamily.STRING)
.operandTypeInference(InferTypes.VARCHAR_1024)
- .requiredOperandCount(1)
- .literalOperands(1, 2)
+ .operandTypeChecker(
+ OperandTypes.or(
+ // APPROX_COUNT_DISTINCT_DS_HLL(column)
+
OperandTypes.and(AGGREGATED_COLUMN_TYPE_CHECKER,
OperandTypes.family(SqlTypeFamily.ANY)),
Review Comment:
I wonder why the need to AND with an family of `ANY`
can't this be simply : `AGGREGATED_COLUMN_TYPE_CHECKER` ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]