dianfu commented on a change in pull request #16065:
URL: https://github.com/apache/flink/pull/16065#discussion_r646317153



##########
File path: flink-python/pyflink/fn_execution/operation_utils.py
##########
@@ -84,15 +84,9 @@ def normalize_pandas_result(it):
     return arrays
 
 
-def wrap_inputs_as_row(*args):
-    from pyflink.common.types import Row
+def wrap_input_series_as_datframe(*args):

Review comment:
       ```suggestion
   def wrap_input_series_as_dataframe(*args):
   ```

##########
File path: flink-python/pyflink/fn_execution/table/aggregate_fast.pyx
##########
@@ -252,14 +253,19 @@ cdef class 
SimpleAggsHandleFunctionBase(AggsHandleFunctionBase):
                 else:
                     raise Exception(
                         "The args are not in the distinct data view, this 
should not happen.")
+            # Transfer InternalRow to Row in row-based operations

Review comment:
       ```suggestion
               # Convert InternalRow to Row
   ```

##########
File path: flink-python/pyflink/fn_execution/operation_utils.py
##########
@@ -249,9 +251,8 @@ def extract_user_defined_aggregate_function(
             distinct_index = current_index
     else:
         distinct_index = -1
-    if user_defined_function_proto.takes_row_as_input:
-        local_variable_dict['wrap_inputs_as_row'] = wrap_inputs_as_row
-        func_str = "lambda value : [wrap_inputs_as_row(%s)]" % 
",".join(args_str)
+    if user_defined_function_proto.takes_row_as_input and not 
local_variable_dict:

Review comment:
       What about adding some comments?

##########
File path: flink-python/pyflink/fn_execution/operation_utils.py
##########
@@ -171,8 +165,16 @@ def _extract_input(args) -> Tuple[str, Dict, List]:
     variable_dict.update(input_variable_dict)
     user_defined_funcs.extend(input_funcs)
     if user_defined_function_proto.takes_row_as_input:
-        variable_dict['wrap_inputs_as_row'] = wrap_inputs_as_row
-        func_str = "%s(wrap_inputs_as_row(%s))" % (func_name, func_args)
+        if input_variable_dict:

Review comment:
       It would be great to add some comments on this.

##########
File path: flink-python/pyflink/fn_execution/table/aggregate_fast.pyx
##########
@@ -287,6 +293,10 @@ cdef class 
SimpleAggsHandleFunctionBase(AggsHandleFunctionBase):
             distinct_index = self._distinct_indexes[i]
             if distinct_index >= 0 and args in 
self._distinct_data_views[distinct_index]:
                 continue
+            # Transfer InternalRow to Row in row-based operations

Review comment:
       ditto




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to