LakshSingla commented on code in PR #16682:
URL: https://github.com/apache/druid/pull/16682#discussion_r1671612578


##########
extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeAggregatorFactory.java:
##########
@@ -142,6 +150,26 @@ public VectorAggregator 
factorizeVector(VectorColumnSelectorFactory selectorFact
     );
   }
 
+  /**
+   * Validates whether the aggregator supports the input column type.
+   * @param capabilities
+   */
+  private void validateInputs(@Nullable ColumnCapabilities capabilities)
+  {
+    if (capabilities != null) {
+      final ColumnType type = capabilities.toColumnType();
+      boolean isUnsupportedComplexType = 
ValueType.COMPLEX.equals(type.getType()) &&

Review Comment:
   should the variable be named as `isSupportedComplexType`? 



##########
sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/BuiltinApproxCountDistinctSqlAggregator.java:
##########
@@ -88,13 +90,15 @@ public Aggregation toDruidAggregation(
       return null;
     }
 
-    final AggregatorFactory aggregatorFactory;
+    AggregatorFactory aggregatorFactory = null;
     final String aggregatorName = finalizeAggregations ? 
Calcites.makePrefixedName(name, "a") : name;
 
     if (arg.isDirectColumnAccess()
         && inputAccessor.getInputRowSignature()
             .getColumnType(arg.getDirectColumn())
-            .map(type -> type.is(ValueType.COMPLEX))
+            .map(type -> type.is(ValueType.COMPLEX)
+                         && (Objects.equals(type.getComplexTypeName(), 
HyperUniquesAggregatorFactory.TYPE.getComplexTypeName()) || 
Objects.equals(type.getComplexTypeName(), 
HyperUniquesAggregatorFactory.PRECOMPUTED_TYPE.getComplexTypeName()))
+            )

Review Comment:
   Too long conditional, please separate it out into a meaningful variable 
which is easy to follow. 



##########
extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeAggregatorFactory.java:
##########
@@ -142,6 +150,26 @@ public VectorAggregator 
factorizeVector(VectorColumnSelectorFactory selectorFact
     );
   }
 
+  /**
+   * Validates whether the aggregator supports the input column type.
+   * @param capabilities
+   */
+  private void validateInputs(@Nullable ColumnCapabilities capabilities)
+  {
+    if (capabilities != null) {
+      final ColumnType type = capabilities.toColumnType();
+      boolean isUnsupportedComplexType = 
ValueType.COMPLEX.equals(type.getType()) &&
+                                          
(HllSketchModule.TYPE_NAME.equals(type.getComplexTypeName()) ||
+                                           
HllSketchModule.BUILD_TYPE_NAME.equals(type.getComplexTypeName()));
+      if (!ColumnType.UNKNOWN_COMPLEX.equals(type) && !TYPE.equals(type) && 
!isUnsupportedComplexType) {

Review Comment:
   stylistic nit: Each conditional is of the form !x && !y && !z, which is 
still making it difficult to follow. Either change the individual elements such 
that it reads like a && b && c or modify the condition as !(x || y || z). 
   
   Regarding the condition itself, the check should be simple: 
   If the columnType is a complex type, and not one of the following, then 
fail:  
   a. TYPE (MERGE_TYPE_NAME)
   b. HllSketchModule.TYPE_NAME
   c. HllSketchModule.BUILD_TYPE_NAME
   d. Maybe we should also allow UNKNOWN complex types to pass as well
   
   I didn't understand why `TYPE` is separated from the 
`unsupportedComplexType` variable (which I feel should be 
`supportedComplexType` instead) 



##########
processing/src/main/java/org/apache/druid/query/aggregation/hyperloglog/HyperUniquesAggregatorFactory.java:
##########
@@ -103,44 +103,55 @@ public HyperUniquesAggregatorFactory(
   @Override
   public Aggregator factorize(ColumnSelectorFactory metricFactory)
   {
+    validateInputs(metricFactory.getColumnCapabilities(fieldName));
     BaseObjectColumnValueSelector selector = 
metricFactory.makeColumnValueSelector(fieldName);
     if (selector instanceof NilColumnValueSelector) {
       return NoopAggregator.instance();
     }
-    final Class classOfObject = selector.classOfObject();
-    if (classOfObject.equals(Object.class) || 
HyperLogLogCollector.class.isAssignableFrom(classOfObject)) {
-      return new HyperUniquesAggregator(selector);
-    }
-
-    throw new IAE("Incompatible type for metric[%s], expected a HyperUnique, 
got a %s", fieldName, classOfObject);
+    return new HyperUniquesAggregator(selector);
   }
 
   @Override
   public BufferAggregator factorizeBuffered(ColumnSelectorFactory 
metricFactory)
   {
+    validateInputs(metricFactory.getColumnCapabilities(fieldName));
     BaseObjectColumnValueSelector selector = 
metricFactory.makeColumnValueSelector(fieldName);
     if (selector instanceof NilColumnValueSelector) {
       return NoopBufferAggregator.instance();
     }
-    final Class classOfObject = selector.classOfObject();
-    if (classOfObject.equals(Object.class) || 
HyperLogLogCollector.class.isAssignableFrom(classOfObject)) {
-      return new HyperUniquesBufferAggregator(selector);
-    }
-
-    throw new IAE("Incompatible type for metric[%s], expected a HyperUnique, 
got a %s", fieldName, classOfObject);
+    return new HyperUniquesBufferAggregator(selector);
   }
 
   @Override
   public VectorAggregator factorizeVector(final VectorColumnSelectorFactory 
selectorFactory)
   {
-    final ColumnCapabilities capabilities = 
selectorFactory.getColumnCapabilities(fieldName);
-    if (!Types.is(capabilities, ValueType.COMPLEX)) {
+    final ColumnCapabilities columnCapabilities = 
selectorFactory.getColumnCapabilities(fieldName);
+    validateInputs(columnCapabilities);
+
+    if (!Types.is(columnCapabilities, ValueType.COMPLEX)) {
       return NoopVectorAggregator.instance();
     } else {
       return new 
HyperUniquesVectorAggregator(selectorFactory.makeObjectSelector(fieldName));
     }
   }
 
+  /**
+   * Validates whether the aggregator supports the input column type.
+   * @param capabilities
+   */
+  private void validateInputs(@Nullable ColumnCapabilities capabilities)
+  {
+    if (capabilities != null) {
+      final ColumnType type = capabilities.toColumnType();
+      if (!ColumnType.UNKNOWN_COMPLEX.equals(type) && !TYPE.equals(type) && 
!PRECOMPUTED_TYPE.equals(type)) {

Review Comment:
   It is easier to follow than the first one, maybe because of the smaller 
variable names.



##########
extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchBaseSqlAggregator.java:
##########
@@ -154,19 +161,23 @@ public Aggregation toDruidAggregation(
       }
 
       if (inputType.is(ValueType.COMPLEX)) {
-        aggregatorFactory = new HllSketchMergeAggregatorFactory(
-            aggregatorName,
-            dimensionSpec.getOutputName(),
-            logK,
-            tgtHllType,
-
-            // For HllSketchMergeAggregatorFactory, stringEncoding is only 
advisory to aid in detection of mismatched
-            // merges. It does not affect the results of the aggregator. At 
this point in the code, we do not know what
-            // the input encoding of the original sketches was, so we set it 
to the default.
-            HllSketchAggregatorFactory.DEFAULT_STRING_ENCODING,
-            finalizeSketch || 
SketchQueryContext.isFinalizeOuterSketches(plannerContext),
-            ROUND
-        );
+        if (HllSketchMergeAggregatorFactory.TYPE.equals(inputType) ||
+            HllSketchModule.TYPE_NAME.equals(inputType.getComplexTypeName()) ||
+            
HllSketchModule.BUILD_TYPE_NAME.equals(inputType.getComplexTypeName())) {

Review Comment:
   This is a clean conditional and is easy to follow. 



##########
sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/BuiltinApproxCountDistinctSqlAggregator.java:
##########
@@ -118,12 +122,14 @@ public Aggregation toDruidAggregation(
       }
 
       if (inputType.is(ValueType.COMPLEX)) {
-        aggregatorFactory = new HyperUniquesAggregatorFactory(
-            aggregatorName,
-            dimensionSpec.getOutputName(),
-            false,
-            true
-        );
+        if ((Objects.equals(inputType.getComplexTypeName(), 
HyperUniquesAggregatorFactory.TYPE.getComplexTypeName()) || 
Objects.equals(inputType.getComplexTypeName(), 
HyperUniquesAggregatorFactory.PRECOMPUTED_TYPE.getComplexTypeName()))) {

Review Comment:
   Seems like we are reusing some portion of the above conditional. Consider 
separating it out to a new method and having a single check like:
   
   ```
   boolean validateInputComplexTypeName(String inputComplexTypeName)
   ```



##########
extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeAggregatorFactory.java:
##########
@@ -142,6 +150,26 @@ public VectorAggregator 
factorizeVector(VectorColumnSelectorFactory selectorFact
     );
   }
 
+  /**
+   * Validates whether the aggregator supports the input column type.
+   * @param capabilities
+   */
+  private void validateInputs(@Nullable ColumnCapabilities capabilities)
+  {
+    if (capabilities != null) {
+      final ColumnType type = capabilities.toColumnType();
+      boolean isUnsupportedComplexType = 
ValueType.COMPLEX.equals(type.getType()) &&
+                                          
(HllSketchModule.TYPE_NAME.equals(type.getComplexTypeName()) ||
+                                           
HllSketchModule.BUILD_TYPE_NAME.equals(type.getComplexTypeName()));
+      if (!ColumnType.UNKNOWN_COMPLEX.equals(type) && !TYPE.equals(type) && 
!isUnsupportedComplexType) {
+        throw DruidException.forPersona(DruidException.Persona.USER)
+                            .ofCategory(DruidException.Category.UNSUPPORTED)
+                            .build("Using aggregator [%s] is not supported for 
complex columns with type [%s].",
+                                   getIntermediateType().getComplexTypeName(), 
type);

Review Comment:
   nit: formatting
   We should have all the aggregators on the same line as the function call and 
the closing brackets like: `f(a,b,c)` or 
   have everything on a separate line like:
   ```
   f(
       a,
       b,
       c
     )
   ```



##########
extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeAggregatorFactory.java:
##########
@@ -142,6 +150,26 @@ public VectorAggregator 
factorizeVector(VectorColumnSelectorFactory selectorFact
     );
   }
 
+  /**
+   * Validates whether the aggregator supports the input column type.
+   * @param capabilities
+   */
+  private void validateInputs(@Nullable ColumnCapabilities capabilities)
+  {
+    if (capabilities != null) {
+      final ColumnType type = capabilities.toColumnType();
+      boolean isUnsupportedComplexType = 
ValueType.COMPLEX.equals(type.getType()) &&
+                                          
(HllSketchModule.TYPE_NAME.equals(type.getComplexTypeName()) ||
+                                           
HllSketchModule.BUILD_TYPE_NAME.equals(type.getComplexTypeName()));
+      if (!ColumnType.UNKNOWN_COMPLEX.equals(type) && !TYPE.equals(type) && 
!isUnsupportedComplexType) {

Review Comment:
   Why is just `(!isSupportedComplexType)` not enough for the condition? 



##########
extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java:
##########
@@ -508,6 +524,33 @@ public void testApproxCountDistinctHllSketchIsRounded()
     );
   }
 
+  @Test
+  public void testApproxCountDistinctOnUnsupportedComplexColumn()
+  {
+    try {
+      testQuery("SELECT COUNT(distinct double_first_added) FROM 
druid.wikipedia_first_last", ImmutableList.of(), ImmutableList.of());
+      Assert.fail("query planning should fail");

Review Comment:
   You should use `assertQueryIsUnplannable` method instead of `testQuery` to 
verify planning errors. That is cleaner. 



##########
extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java:
##########
@@ -104,20 +99,40 @@ public AggregatorAndSize 
factorizeWithSize(ColumnSelectorFactory metricFactory)
   @Override
   public BufferAggregator factorizeBuffered(ColumnSelectorFactory 
metricFactory)
   {
-    ColumnCapabilities capabilities = 
metricFactory.getColumnCapabilities(fieldName);
-    if (capabilities != null && capabilities.isArray()) {
-      throw InvalidInput.exception("ARRAY types are not supported for theta 
sketch");
-    }
+    validateInputs(metricFactory.getColumnCapabilities(fieldName));
     BaseObjectColumnValueSelector selector = 
metricFactory.makeColumnValueSelector(fieldName);
     return new SketchBufferAggregator(selector, size, 
getMaxIntermediateSizeWithNulls());
   }
 
   @Override
   public VectorAggregator factorizeVector(VectorColumnSelectorFactory 
selectorFactory)
   {
+    validateInputs(selectorFactory.getColumnCapabilities(fieldName));
     return new SketchVectorAggregator(selectorFactory, fieldName, size, 
getMaxIntermediateSizeWithNulls());
   }
 
+  /**
+   * Validates whether the aggregator supports the input column type.
+   * @param capabilities
+   */
+  private void validateInputs(@Nullable ColumnCapabilities capabilities)
+  {
+    if (capabilities != null) {
+      if (capabilities.isArray() || (capabilities.is(ValueType.COMPLEX) && !(

Review Comment:
   nit: Let's break out the second half of the conditional into a variable, and 
have the condition like:
   `capabilities.isArray() || !supportedComplexTypes) { throw.. }



##########
sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/BuiltinApproxCountDistinctSqlAggregator.java:
##########
@@ -135,6 +141,14 @@ public Aggregation toDruidAggregation(
       }
     }
 
+    if (aggregatorFactory == null) {
+      plannerContext.setPlanningError("Using APPROX_COUNT_DISTINCT() or 
enabling approximation with COUNT(DISTINCT) "
+                                      + "is not supported for %s column. You 
can disable approximation and use "
+                                      + "COUNT(DISTINCT %s) and run the query 
again.",

Review Comment:
   ```suggestion
         plannerContext.setPlanningError("Using APPROX_COUNT_DISTINCT() or 
enabling approximation with COUNT(DISTINCT) "
                                         + "is not supported for %s column. You 
can disable approximation, use "
                                         + "COUNT(DISTINCT %s) and rerun the 
query.",
   ```



##########
processing/src/main/java/org/apache/druid/query/aggregation/hyperloglog/HyperUniquesAggregatorFactory.java:
##########
@@ -103,44 +103,55 @@ public HyperUniquesAggregatorFactory(
   @Override
   public Aggregator factorize(ColumnSelectorFactory metricFactory)
   {
+    validateInputs(metricFactory.getColumnCapabilities(fieldName));
     BaseObjectColumnValueSelector selector = 
metricFactory.makeColumnValueSelector(fieldName);
     if (selector instanceof NilColumnValueSelector) {
       return NoopAggregator.instance();
     }
-    final Class classOfObject = selector.classOfObject();
-    if (classOfObject.equals(Object.class) || 
HyperLogLogCollector.class.isAssignableFrom(classOfObject)) {
-      return new HyperUniquesAggregator(selector);
-    }
-
-    throw new IAE("Incompatible type for metric[%s], expected a HyperUnique, 
got a %s", fieldName, classOfObject);
+    return new HyperUniquesAggregator(selector);
   }
 
   @Override
   public BufferAggregator factorizeBuffered(ColumnSelectorFactory 
metricFactory)
   {
+    validateInputs(metricFactory.getColumnCapabilities(fieldName));
     BaseObjectColumnValueSelector selector = 
metricFactory.makeColumnValueSelector(fieldName);
     if (selector instanceof NilColumnValueSelector) {
       return NoopBufferAggregator.instance();
     }
-    final Class classOfObject = selector.classOfObject();
-    if (classOfObject.equals(Object.class) || 
HyperLogLogCollector.class.isAssignableFrom(classOfObject)) {
-      return new HyperUniquesBufferAggregator(selector);
-    }
-
-    throw new IAE("Incompatible type for metric[%s], expected a HyperUnique, 
got a %s", fieldName, classOfObject);
+    return new HyperUniquesBufferAggregator(selector);
   }
 
   @Override
   public VectorAggregator factorizeVector(final VectorColumnSelectorFactory 
selectorFactory)
   {
-    final ColumnCapabilities capabilities = 
selectorFactory.getColumnCapabilities(fieldName);
-    if (!Types.is(capabilities, ValueType.COMPLEX)) {
+    final ColumnCapabilities columnCapabilities = 
selectorFactory.getColumnCapabilities(fieldName);
+    validateInputs(columnCapabilities);
+
+    if (!Types.is(columnCapabilities, ValueType.COMPLEX)) {
       return NoopVectorAggregator.instance();
     } else {
       return new 
HyperUniquesVectorAggregator(selectorFactory.makeObjectSelector(fieldName));
     }
   }
 
+  /**
+   * Validates whether the aggregator supports the input column type.
+   * @param capabilities
+   */
+  private void validateInputs(@Nullable ColumnCapabilities capabilities)
+  {
+    if (capabilities != null) {
+      final ColumnType type = capabilities.toColumnType();
+      if (!ColumnType.UNKNOWN_COMPLEX.equals(type) && !TYPE.equals(type) && 
!PRECOMPUTED_TYPE.equals(type)) {
+        throw DruidException.forPersona(DruidException.Persona.USER)
+                            .ofCategory(DruidException.Category.UNSUPPORTED)
+                            .build("Using aggregator [%s] is not supported for 
complex columns with type [%s].",
+                                   getIntermediateType().getComplexTypeName(), 
type);

Review Comment:
   nit: Incorrect formatting - either everything on the same line or everything 
on a new line when making a function call. 



##########
sql/src/main/java/org/apache/druid/sql/calcite/aggregation/ApproxCountDistinctSqlAggregator.java:
##########


Review Comment:
   I am unsure of the changes in this class. Please get them vetted by someone 
more familiar with the approach as well.



##########
extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeAggregatorFactory.java:
##########
@@ -142,6 +150,26 @@ public VectorAggregator 
factorizeVector(VectorColumnSelectorFactory selectorFact
     );
   }
 
+  /**
+   * Validates whether the aggregator supports the input column type.
+   * @param capabilities
+   */
+  private void validateInputs(@Nullable ColumnCapabilities capabilities)
+  {
+    if (capabilities != null) {
+      final ColumnType type = capabilities.toColumnType();
+      boolean isUnsupportedComplexType = 
ValueType.COMPLEX.equals(type.getType()) &&
+                                          
(HllSketchModule.TYPE_NAME.equals(type.getComplexTypeName()) ||
+                                           
HllSketchModule.BUILD_TYPE_NAME.equals(type.getComplexTypeName()));
+      if (!ColumnType.UNKNOWN_COMPLEX.equals(type) && !TYPE.equals(type) && 
!isUnsupportedComplexType) {

Review Comment:
   Also, The TYPE check can be done in the above section right? Why do we need 
to distinguish it from supported/unsupported column types? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to