dianfu commented on a change in pull request #14410:
URL: https://github.com/apache/flink/pull/14410#discussion_r546478883



##########
File path: flink-python/pyflink/table/table.py
##########
@@ -1059,9 +1098,25 @@ def flat_aggregate(self, func: Union[str, Expression]) 
-> 'FlatAggregateTable':
         """
         if isinstance(func, str):
             return FlatAggregateTable(self._j_table.flatAggregate(func), 
self._t_env)
+        elif isinstance(func, Expression):
+            return 
FlatAggregateTable(self._j_table.flatAggregate(func._j_expr), self._t_env)
         else:
+            func = self._to_expr(func)
             return 
FlatAggregateTable(self._j_table.flatAggregate(func._j_expr), self._t_env)
 
+    def _to_expr(self, func: UserDefinedAggregateFunctionWrapper) -> 
Expression:
+        group_keys_field = 
self._j_table.getClass().getDeclaredField("groupKeys")
+        group_keys_field.setAccessible(True)
+        j_group_keys = group_keys_field.get(self._j_table)
+        fields_without_keys = without_columns(
+            j_group_keys[0], *([j_group_keys[i] for i in range(1, 
len(j_group_keys))]))
+        if hasattr(func, "alias_names"):
+            alias_names = getattr(func, "alias_names")
+            func_expression = func(fields_without_keys).alias(*alias_names)

Review comment:
       I think the keys should also be given to the row based function.

##########
File path: flink-python/pyflink/table/table.py
##########
@@ -817,6 +828,8 @@ def aggregate(self, func: Union[str, Expression]) -> 
'AggregatedTable':
             ...                    DataTypes.FIELD("b", DataTypes.INT())]),
             ...               func_type="pandas")
             >>> tab.aggregate(agg(tab.a).alias("a", "b")).select("a, b")
+            >>> # input all columns
+            >>> tab.aggregate(agg.alias("a, b")).select("a, b")

Review comment:
       For Row based operation, the row function should receive Row/DataFrame 
as input. Otherwise, the fields name information will be missing. This will 
become a problem when there are hundreds of columns. It may be difficult to use 
as users have to give each column a meaningful name.

##########
File path: flink-python/pyflink/table/table.py
##########
@@ -792,6 +798,8 @@ def flat_map(self, func: Union[str, Expression]) -> 'Table':
             ...     for s in string.split(","):
             ...         yield x, s
             >>> tab.flat_map(split(tab.a, table.b))
+            >>> # input all columns

Review comment:
       ```suggestion
               >>> # take all the columns as inputs
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to