dianfu commented on a change in pull request #16065:
URL: https://github.com/apache/flink/pull/16065#discussion_r646317153
##########
File path: flink-python/pyflink/fn_execution/operation_utils.py
##########
@@ -84,15 +84,9 @@ def normalize_pandas_result(it):
return arrays
-def wrap_inputs_as_row(*args):
- from pyflink.common.types import Row
+def wrap_input_series_as_datframe(*args):
Review comment:
```suggestion
def wrap_input_series_as_dataframe(*args):
```
##########
File path: flink-python/pyflink/fn_execution/table/aggregate_fast.pyx
##########
@@ -252,14 +253,19 @@ cdef class
SimpleAggsHandleFunctionBase(AggsHandleFunctionBase):
else:
raise Exception(
"The args are not in the distinct data view, this
should not happen.")
+ # Transfer InternalRow to Row in row-based operations
Review comment:
```suggestion
# Convert InternalRow to Row
```
##########
File path: flink-python/pyflink/fn_execution/operation_utils.py
##########
@@ -249,9 +251,8 @@ def extract_user_defined_aggregate_function(
distinct_index = current_index
else:
distinct_index = -1
- if user_defined_function_proto.takes_row_as_input:
- local_variable_dict['wrap_inputs_as_row'] = wrap_inputs_as_row
- func_str = "lambda value : [wrap_inputs_as_row(%s)]" %
",".join(args_str)
+ if user_defined_function_proto.takes_row_as_input and not
local_variable_dict:
Review comment:
What about adding some comments?
##########
File path: flink-python/pyflink/fn_execution/operation_utils.py
##########
@@ -171,8 +165,16 @@ def _extract_input(args) -> Tuple[str, Dict, List]:
variable_dict.update(input_variable_dict)
user_defined_funcs.extend(input_funcs)
if user_defined_function_proto.takes_row_as_input:
- variable_dict['wrap_inputs_as_row'] = wrap_inputs_as_row
- func_str = "%s(wrap_inputs_as_row(%s))" % (func_name, func_args)
+ if input_variable_dict:
Review comment:
It would be great to add some comments on this.
##########
File path: flink-python/pyflink/fn_execution/table/aggregate_fast.pyx
##########
@@ -287,6 +293,10 @@ cdef class
SimpleAggsHandleFunctionBase(AggsHandleFunctionBase):
distinct_index = self._distinct_indexes[i]
if distinct_index >= 0 and args in
self._distinct_data_views[distinct_index]:
continue
+ # Transfer InternalRow to Row in row-based operations
Review comment:
ditto
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]