dianfu commented on a change in pull request #14410:
URL: https://github.com/apache/flink/pull/14410#discussion_r547015112
##########
File path: flink-python/pyflink/proto/flink-fn-execution.proto
##########
@@ -47,6 +47,9 @@ message UserDefinedFunction {
// The index of the over window used in pandas batch over window aggregation
int32 window_index = 3;
+
+ // Whether the UDF is used in row-based operation
+ bool used_in_row_based_operation = 4;
Review comment:
```suggestion
bool takes_row_as_input = 4;
```
##########
File path: flink-python/pyflink/proto/flink-fn-execution.proto
##########
@@ -47,6 +47,9 @@ message UserDefinedFunction {
// The index of the over window used in pandas batch over window aggregation
int32 window_index = 3;
+
+ // Whether the UDF is used in row-based operation
Review comment:
```suggestion
// Whether the UDF takes row as input instead of each columns of a row
```
##########
File path: flink-python/pyflink/table/udf.py
##########
@@ -348,11 +348,20 @@ def __init__(self, func, input_types, func_type,
deterministic=None, name=None):
func.is_deterministic() if isinstance(func, UserDefinedFunction)
else True)
self._func_type = func_type
self._judf_placeholder = None
+ self._used_in_row_based_operation = False
def __call__(self, *args) -> Expression:
from pyflink.table import expressions as expr
return expr.call(self, *args)
+ def alias(self, *alias_names: str):
+ self._alias_names = alias_names
+ return self
+
+ def set_used_in_row_based_operation(self):
Review comment:
```suggestion
def _set_takes_row_as_input(self):
```
##########
File path: flink-python/pyflink/table/udf.py
##########
@@ -348,11 +348,20 @@ def __init__(self, func, input_types, func_type,
deterministic=None, name=None):
func.is_deterministic() if isinstance(func, UserDefinedFunction)
else True)
self._func_type = func_type
self._judf_placeholder = None
+ self._used_in_row_based_operation = False
def __call__(self, *args) -> Expression:
from pyflink.table import expressions as expr
return expr.call(self, *args)
+ def alias(self, *alias_names: str):
Review comment:
```suggestion
def _alias(self, *alias_names: str):
```
##########
File path: flink-python/pyflink/proto/flink-fn-execution.proto
##########
@@ -47,6 +47,9 @@ message UserDefinedFunction {
// The index of the over window used in pandas batch over window aggregation
int32 window_index = 3;
+
+ // Whether the UDF is used in row-based operation
+ bool used_in_row_based_operation = 4;
Review comment:
Why there is no such flag for aggregate function?
##########
File path: flink-python/pyflink/table/table.py
##########
@@ -1165,9 +1224,26 @@ def aggregate(self, func: Union[str, Expression]) ->
'AggregatedTable':
"""
if isinstance(func, str):
return AggregatedTable(self._j_table.aggregate(func), self._t_env)
+ elif isinstance(func, Expression):
+ return AggregatedTable(self._j_table.aggregate(func._j_expr),
self._t_env)
else:
+ func.set_used_in_row_based_operation()
+ func = self._to_expr(func)
return AggregatedTable(self._j_table.aggregate(func._j_expr),
self._t_env)
+ def _to_expr(self, func: UserDefinedAggregateFunctionWrapper) ->
Expression:
+ group_window_field =
self._j_table.getClass().getDeclaredField("window")
+ group_window_field.setAccessible(True)
+ j_group_window = group_window_field.get(self._j_table)
+ j_time_field = j_group_window.getTimeField()
+ fields_without_window = without_columns(j_time_field)
+ if hasattr(func, "_alias_names"):
+ alias_names = getattr(func, "_alias_names")
+ func_expression = func(fields_without_window).alias(*alias_names)
+ else:
+ func_expression = func(fields_without_window)
+ return func_expression
+
Review comment:
Could you also update the example in
FlatAggregateTable.select/AggregatedTable.select to use row-based operations?
##########
File path: flink-python/pyflink/table/table.py
##########
@@ -769,6 +771,8 @@ def map(self, func: Union[str, Expression]) -> 'Table':
>>> add = udf(lambda x: Row(x + 1, x * x),
result_type=DataTypes.Row(
... [DataTypes.FIELD("a", DataTypes.INT()), DataTypes.FIELD("b",
DataTypes.INT())]))
>>> tab.map(add(tab.a)).alias("a, b")
+ >>> # take all the columns as inputs
+ >>> tab.map(add)
Review comment:
I guess the example doesn't apply any more as the user-defined function
should take Row as input. Should also update the other examples.
##########
File path: flink-python/pyflink/fn_execution/operation_utils.py
##########
@@ -43,6 +43,15 @@ def wrap_pandas_result(it):
return arrays
+def wrap_inputs_to_row(*args):
Review comment:
```suggestion
def wrap_inputs_as_row(*args):
```
##########
File path: flink-python/pyflink/table/udf.py
##########
@@ -460,7 +470,8 @@ def _create_judf(self, serialized_func, j_input_types,
j_function_kind):
j_result_type,
j_function_kind,
self._deterministic,
- _get_python_env())
+ _get_python_env(),
+ self._used_in_row_based_operation)
Review comment:
What about putting self._used_in_row_based_operation after
self._deterministic?
##########
File path: flink-python/pyflink/table/udf.py
##########
@@ -348,11 +348,20 @@ def __init__(self, func, input_types, func_type,
deterministic=None, name=None):
func.is_deterministic() if isinstance(func, UserDefinedFunction)
else True)
self._func_type = func_type
self._judf_placeholder = None
+ self._used_in_row_based_operation = False
def __call__(self, *args) -> Expression:
from pyflink.table import expressions as expr
return expr.call(self, *args)
+ def alias(self, *alias_names: str):
+ self._alias_names = alias_names
+ return self
+
+ def set_used_in_row_based_operation(self):
+ self._used_in_row_based_operation = True
+ return self
+
def java_user_defined_function(self):
Review comment:
```suggestion
def _java_user_defined_function(self):
```
##########
File path: flink-python/pyflink/table/tests/test_row_based_operation.py
##########
@@ -120,14 +117,16 @@ def test_aggregate_with_pandas_udaf(self):
['a', 'b', 'c'],
[DataTypes.TINYINT(), DataTypes.FLOAT(), DataTypes.INT()])
self.t_env.register_table_sink("Results", table_sink)
- pandas_udaf = udaf(lambda a: (a.mean(), a.max()),
+ pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()),
result_type=DataTypes.ROW(
[DataTypes.FIELD("a", DataTypes.FLOAT()),
DataTypes.FIELD("b", DataTypes.INT())]),
func_type="pandas")
- t.group_by(t.a) \
- .aggregate(pandas_udaf(t.b).alias("c", "d")) \
- .select("a, c, d").execute_insert("Results") \
+ t.select(t.a, t.b) \
+ .group_by(t.a) \
+ .aggregate(pandas_udaf) \
Review comment:
could you improve pandas_udaf to let it also access the group key?
##########
File path:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonTableFunction.java
##########
@@ -48,6 +48,7 @@
private final PythonFunctionKind pythonFunctionKind;
private final boolean deterministic;
private final PythonEnv pythonEnv;
+ private final boolean usedInRowBasedOperation;
Review comment:
I guess we could also add row-based support in
join_lateral/left_outer_join_lateral? I'm fine to do it in a separate PR if it
you want.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]