aljoscha commented on a change in pull request #13007:
URL: https://github.com/apache/flink/pull/13007#discussion_r461628431



##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/typeutils/FieldInfoUtils.java
##########
@@ -357,12 +358,7 @@ else if (inputType instanceof CompositeType) {
 
                // atomic in any case
                if (fieldNames == null) {
-                       int i = 0;
-                       String fieldName = ATOMIC_FIELD_NAME;
-                       while ((null != existingNames) && 
existingNames.contains(fieldName)) {
-                               fieldName = ATOMIC_FIELD_NAME + "_" + i++;
-                       }
-                       fieldNames = Collections.singletonList(fieldName);
+                       fieldNames = 
Collections.singletonList(getAtomicName(existingNames));

Review comment:
       Nice catch! 😃

##########
File path: 
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java
##########
@@ -170,7 +170,13 @@ static StreamTableEnvironment 
create(StreamExecutionEnvironment executionEnviron
         * @param aggregateFunction The AggregateFunction to register.
         * @param <T> The type of the output value.
         * @param <ACC> The type of aggregate accumulator.
+        *
+        * @deprecated Use {@link #createTemporarySystemFunction(String, 
UserDefinedFunction)} instead. Please

Review comment:
       I like the thorough Javadoc here!

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableAggregateFunction.java
##########
@@ -19,39 +19,65 @@
 package org.apache.flink.table.functions;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.FunctionHint;
 import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.types.extraction.TypeInferenceExtractor;
 import org.apache.flink.table.types.inference.TypeInference;
 import org.apache.flink.util.Collector;
 
 /**
- * Base class for user-defined table aggregates.
- *
- * <p>The behavior of a {@link TableAggregateFunction} can be defined by 
implementing a series of
- * custom methods. A {@link TableAggregateFunction} needs at least three 
methods:
+ * Base class for a user-defined table aggregate function. A user-defined 
table aggregate function maps scalar
+ * values of multiple rows to zero, one, or multiple rows. If an output row 
consists of only one field,
+ * the row can be omitted and a scalar value can be emitted. It will be 
wrapped into an implicit row
+ * by the runtime.
+ *
+ * <p>Similar to an {@link AggregateFunction}, the behavior of an {@link 
TableAggregateFunction} is centered
+ * around the concept of an accumulator. The accumulator is an intermediate 
data structure that stores
+ * the aggregated values until a final aggregation result is computed.
+ *
+ * <p>For each set of rows that needs to be aggregated, the runtime will 
create an empty accumulator
+ * by calling the {@link #createAccumulator()}. Subsequently, the {@code 
accumulate()} method of the

Review comment:
       ```suggestion
    * by calling {@link #createAccumulator()}. Subsequently, the {@code 
accumulate()} method of the
   ```

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java
##########
@@ -164,6 +168,33 @@ public static TableSchema 
expandCompositeTypeToSchema(DataType dataType) {
                return tableSchema.getFieldDataType(name);
        }
 
+       /**
+        * Returns the data types of the flat representation of the given data 
type.
+        */
+       public static List<DataType> flattenToDataTypes(DataType dataType) {
+               final LogicalType type = dataType.getLogicalType();
+               if (hasRoot(type, LogicalTypeRoot.DISTINCT_TYPE)) {
+                       return 
flattenToDataTypes(dataType.getChildren().get(0));
+               } else if (isCompositeType(type)) {
+                       return dataType.getChildren();
+               }
+               return Collections.singletonList(dataType);
+       }
+
+       /**
+        * Returns the names of the flat representation of the given data type.
+        */
+       public static List<String> flattenToNames(DataType dataType, 
List<String> existingNames) {
+               final LogicalType type = dataType.getLogicalType();

Review comment:
       nitpick: but this could probably reuse `flattenToDataType()` from above 
for the actual traversal and then just convert to names.

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java
##########
@@ -206,10 +205,13 @@ private boolean verifyFunctionKind(
                        return false;
                }
 
-               // it would be nice to give a more meaningful exception when a 
scalar function is used instead
-               // of a table function and vice versa, but we can do that only 
once FLIP-51 is implemented
+               final FunctionKind kind = definition.getKind();
 
-               if (definition.getKind() == FunctionKind.SCALAR) {
+               if (kind == FunctionKind.TABLE) {

Review comment:
       Not to ask timo, what exactly is this verifying?

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java
##########
@@ -164,6 +168,33 @@ public static TableSchema 
expandCompositeTypeToSchema(DataType dataType) {
                return tableSchema.getFieldDataType(name);
        }
 
+       /**

Review comment:
       This only flattens the first level, right?

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableAggregateFunction.java
##########
@@ -19,39 +19,65 @@
 package org.apache.flink.table.functions;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.FunctionHint;
 import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.types.extraction.TypeInferenceExtractor;
 import org.apache.flink.table.types.inference.TypeInference;
 import org.apache.flink.util.Collector;
 
 /**
- * Base class for user-defined table aggregates.
- *
- * <p>The behavior of a {@link TableAggregateFunction} can be defined by 
implementing a series of
- * custom methods. A {@link TableAggregateFunction} needs at least three 
methods:
+ * Base class for a user-defined table aggregate function. A user-defined 
table aggregate function maps scalar

Review comment:
       Again, I really like the thorough new Javadoc!

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/SqlAggFunctionVisitor.java
##########
@@ -77,44 +84,72 @@ public SqlAggFunction visit(CallExpression call) {
                        defaultMethod(call);
                }
 
-               FunctionDefinition def = call.getFunctionDefinition();
-               if (AGG_DEF_SQL_OPERATOR_MAPPING.containsKey(def)) {
-                       return AGG_DEF_SQL_OPERATOR_MAPPING.get(def);
+               final FunctionDefinition definition = 
call.getFunctionDefinition();
+               if (AGG_DEF_SQL_OPERATOR_MAPPING.containsKey(definition)) {
+                       return AGG_DEF_SQL_OPERATOR_MAPPING.get(definition);
                }
-               if (BuiltInFunctionDefinitions.DISTINCT == def) {
+               if (BuiltInFunctionDefinitions.DISTINCT == definition) {
                        Expression innerAgg = call.getChildren().get(0);
                        return innerAgg.accept(this);
                }
 
-               if (isFunctionOfKind(call, AGGREGATE)) {
-                       AggregateFunctionDefinition aggDef = 
(AggregateFunctionDefinition) def;
-                       AggregateFunction aggFunc = 
aggDef.getAggregateFunction();
-                       FunctionIdentifier identifier = 
call.getFunctionIdentifier()
-                               
.orElse(FunctionIdentifier.of(aggFunc.functionIdentifier()));
+               return createSqlAggFunction(
+                       call.getFunctionIdentifier().orElse(null),
+                       call.getFunctionDefinition());
+       }
+
+       private SqlAggFunction createSqlAggFunction(@Nullable 
FunctionIdentifier identifier, FunctionDefinition definition) {
+               // legacy
+               if (definition instanceof AggregateFunctionDefinition) {
+                       final AggregateFunctionDefinition aggDef = 
(AggregateFunctionDefinition) definition;

Review comment:
       That's the clean code nerd in me but I think all the if-else branches 
could become function calls, including the new code outside the if. I.e:
   ```
   if (definition instanceof AggregateFunctionDefinition) {
       return translateLegacyAggregateFunction(...)
   } else if (...) {
       return translateLegacyTableAggregateFunction(...)
   }
   
   return translateFunction(...)
   ```

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java
##########
@@ -445,6 +445,15 @@ public Integer visit(StructuredType structuredType) {
                public Integer visit(DistinctType distinctType) {
                        return distinctType.getSourceType().accept(this);
                }
+
+               @Override

Review comment:
       Stumpled across this? What does `defaultMethod` actually do?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to