dianfu commented on a change in pull request #13843:
URL: https://github.com/apache/flink/pull/13843#discussion_r514680464



##########
File path: flink-python/pyflink/fn_execution/aggregate.py
##########
@@ -37,12 +38,37 @@ def join_row(left: Row, right: Row):
     return Row(*fields)
 
 
+def extract_data_view_specs_from_accumulator(current_index, accumulator):
+    # for built in functions we extract the data view specs from their 
accumulator
+    i = -1
+    extracted_specs = []
+    for field in accumulator:
+        i += 1
+        # TODO: infer the coder from the input types and output type of the 
built-in functions
+        if isinstance(field, MapView):
+            extracted_specs.append(MapViewSpec(
+                "builtInAgg%df%d" % (current_index, i), i, PickleCoder(), 
PickleCoder()))
+        elif isinstance(field, ListView):
+            extracted_specs.append(ListViewSpec(
+                "builtInAgg%df%d" % (current_index, i), i, PickleCoder()))
+    return extracted_specs
+
+
 def extract_data_view_specs(udfs):
     extracted_udf_data_view_specs = []
+    current_index = -1
     for udf in udfs:
+        current_index += 1
         udf_data_view_specs_proto = udf.specs
-        if udf_data_view_specs_proto is None:
-            extracted_udf_data_view_specs.append([])
+        if not udf_data_view_specs_proto:
+            if is_built_in_function(udf.payload):
+                bulit_in_function = load_aggregate_function(udf.payload)

Review comment:
       ```suggestion
                   built_in_function = load_aggregate_function(udf.payload)
   ```

##########
File path: flink-python/pyflink/fn_execution/aggregate.py
##########
@@ -37,12 +38,37 @@ def join_row(left: Row, right: Row):
     return Row(*fields)
 
 
+def extract_data_view_specs_from_accumulator(current_index, accumulator):
+    # for built in functions we extract the data view specs from their 
accumulator
+    i = -1
+    extracted_specs = []
+    for field in accumulator:
+        i += 1
+        # TODO: infer the coder from the input types and output type of the 
built-in functions
+        if isinstance(field, MapView):
+            extracted_specs.append(MapViewSpec(
+                "builtInAgg%df%d" % (current_index, i), i, PickleCoder(), 
PickleCoder()))
+        elif isinstance(field, ListView):
+            extracted_specs.append(ListViewSpec(
+                "builtInAgg%df%d" % (current_index, i), i, PickleCoder()))
+    return extracted_specs
+
+
 def extract_data_view_specs(udfs):
     extracted_udf_data_view_specs = []
+    current_index = -1
     for udf in udfs:
+        current_index += 1
         udf_data_view_specs_proto = udf.specs
-        if udf_data_view_specs_proto is None:
-            extracted_udf_data_view_specs.append([])
+        if not udf_data_view_specs_proto:
+            if is_built_in_function(udf.payload):
+                bulit_in_function = load_aggregate_function(udf.payload)
+                accumulator = bulit_in_function.create_accumulator()
+                extracted_udf_data_view_specs.append(
+                    extract_data_view_specs_from_accumulator(current_index, 
accumulator))
+            else:
+                extracted_udf_data_view_specs.append([])
+            continue

Review comment:
       What about enclose the following code in an *else*, then we can remove 
the *continue*. It will make the code more readable.

##########
File path: flink-python/pyflink/table/tests/test_aggregate.py
##########
@@ -256,6 +258,59 @@ def test_double_aggregate(self):
             .select("my_count(a) as a, my_sum(b) as b")
         assert_frame_equal(result.to_pandas(), pd.DataFrame([[3, 12]], 
columns=['a', 'b']))
 
+    def test_mixed_with_built_in_functions_with_retract(self):

Review comment:
       Could we merge the following two test cases into one test case or even 
merge them into an existing test case?

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecPythonGroupAggregateRule.java
##########
@@ -67,13 +67,14 @@ public boolean matches(RelOptRuleCall call) {
                        aggCalls.stream().anyMatch(x -> 
PythonUtil.isPythonAggregate(x, PythonFunctionKind.GENERAL));
                boolean existPandasFunction =
                        aggCalls.stream().anyMatch(x -> 
PythonUtil.isPythonAggregate(x, PythonFunctionKind.PANDAS));
-               boolean existJavaFunction =
-                       aggCalls.stream().anyMatch(x -> 
!PythonUtil.isPythonAggregate(x, null));
+               boolean existUserDefinedJavaFunction =

Review comment:
       ```suggestion
                boolean existJavaUserDefinedFunction =
   ```

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPythonAggregate.scala
##########
@@ -158,4 +172,24 @@ trait CommonPythonAggregate extends CommonPythonBase {
       Array()
     }
   }
+
+  private def getBuiltInPythonAggregateFunction(
+      javaBuiltInAggregateFunction: UserDefinedFunction): 
BuiltInPythonAggregateFunction = {
+    javaBuiltInAggregateFunction match {
+      case _: Count1AggFunction =>
+        BuiltInPythonAggregateFunction.COUNT1
+      case _: FirstValueWithRetractAggFunction[_] =>
+        BuiltInPythonAggregateFunction.FIRST_VALUE_RETRACT
+      case _: IntSum0AggFunction | _: ByteSum0AggFunction | _: 
ShortSum0AggFunction |
+           _: LongSum0AggFunction =>
+        BuiltInPythonAggregateFunction.INT_SUM0
+      case _: FloatSum0AggFunction | _: DoubleSum0AggFunction =>
+        BuiltInPythonAggregateFunction.FLOAT_SUM0
+      case _: DecimalSum0AggFunction =>
+        BuiltInPythonAggregateFunction.DECIMAL_SUM0
+      case _ =>
+        throw new TableException("This aggregate function can not be mixed 
with Python UDAF: " +

Review comment:
       ```suggestion
           throw new TableException("Aggregate function %s is still not 
supported to be mixed with Python UDAF: " +
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to