dianfu commented on a change in pull request #13843:
URL: https://github.com/apache/flink/pull/13843#discussion_r514680464
##########
File path: flink-python/pyflink/fn_execution/aggregate.py
##########
@@ -37,12 +38,37 @@ def join_row(left: Row, right: Row):
return Row(*fields)
+def extract_data_view_specs_from_accumulator(current_index, accumulator):
+ # for built in functions we extract the data view specs from their
accumulator
+ i = -1
+ extracted_specs = []
+ for field in accumulator:
+ i += 1
+ # TODO: infer the coder from the input types and output type of the
built-in functions
+ if isinstance(field, MapView):
+ extracted_specs.append(MapViewSpec(
+ "builtInAgg%df%d" % (current_index, i), i, PickleCoder(),
PickleCoder()))
+ elif isinstance(field, ListView):
+ extracted_specs.append(ListViewSpec(
+ "builtInAgg%df%d" % (current_index, i), i, PickleCoder()))
+ return extracted_specs
+
+
def extract_data_view_specs(udfs):
extracted_udf_data_view_specs = []
+ current_index = -1
for udf in udfs:
+ current_index += 1
udf_data_view_specs_proto = udf.specs
- if udf_data_view_specs_proto is None:
- extracted_udf_data_view_specs.append([])
+ if not udf_data_view_specs_proto:
+ if is_built_in_function(udf.payload):
+ bulit_in_function = load_aggregate_function(udf.payload)
Review comment:
```suggestion
built_in_function = load_aggregate_function(udf.payload)
```
##########
File path: flink-python/pyflink/fn_execution/aggregate.py
##########
@@ -37,12 +38,37 @@ def join_row(left: Row, right: Row):
return Row(*fields)
+def extract_data_view_specs_from_accumulator(current_index, accumulator):
+ # for built in functions we extract the data view specs from their
accumulator
+ i = -1
+ extracted_specs = []
+ for field in accumulator:
+ i += 1
+ # TODO: infer the coder from the input types and output type of the
built-in functions
+ if isinstance(field, MapView):
+ extracted_specs.append(MapViewSpec(
+ "builtInAgg%df%d" % (current_index, i), i, PickleCoder(),
PickleCoder()))
+ elif isinstance(field, ListView):
+ extracted_specs.append(ListViewSpec(
+ "builtInAgg%df%d" % (current_index, i), i, PickleCoder()))
+ return extracted_specs
+
+
def extract_data_view_specs(udfs):
extracted_udf_data_view_specs = []
+ current_index = -1
for udf in udfs:
+ current_index += 1
udf_data_view_specs_proto = udf.specs
- if udf_data_view_specs_proto is None:
- extracted_udf_data_view_specs.append([])
+ if not udf_data_view_specs_proto:
+ if is_built_in_function(udf.payload):
+ bulit_in_function = load_aggregate_function(udf.payload)
+ accumulator = bulit_in_function.create_accumulator()
+ extracted_udf_data_view_specs.append(
+ extract_data_view_specs_from_accumulator(current_index,
accumulator))
+ else:
+ extracted_udf_data_view_specs.append([])
+ continue
Review comment:
What about enclose the following code in an *else*, then we can remove
the *continue*. It will make the code more readable.
##########
File path: flink-python/pyflink/table/tests/test_aggregate.py
##########
@@ -256,6 +258,59 @@ def test_double_aggregate(self):
.select("my_count(a) as a, my_sum(b) as b")
assert_frame_equal(result.to_pandas(), pd.DataFrame([[3, 12]],
columns=['a', 'b']))
+ def test_mixed_with_built_in_functions_with_retract(self):
Review comment:
Could we merge the following two test cases into one test case or even
merge them into an existing test case?
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecPythonGroupAggregateRule.java
##########
@@ -67,13 +67,14 @@ public boolean matches(RelOptRuleCall call) {
aggCalls.stream().anyMatch(x ->
PythonUtil.isPythonAggregate(x, PythonFunctionKind.GENERAL));
boolean existPandasFunction =
aggCalls.stream().anyMatch(x ->
PythonUtil.isPythonAggregate(x, PythonFunctionKind.PANDAS));
- boolean existJavaFunction =
- aggCalls.stream().anyMatch(x ->
!PythonUtil.isPythonAggregate(x, null));
+ boolean existUserDefinedJavaFunction =
Review comment:
```suggestion
boolean existJavaUserDefinedFunction =
```
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPythonAggregate.scala
##########
@@ -158,4 +172,24 @@ trait CommonPythonAggregate extends CommonPythonBase {
Array()
}
}
+
+ private def getBuiltInPythonAggregateFunction(
+ javaBuiltInAggregateFunction: UserDefinedFunction):
BuiltInPythonAggregateFunction = {
+ javaBuiltInAggregateFunction match {
+ case _: Count1AggFunction =>
+ BuiltInPythonAggregateFunction.COUNT1
+ case _: FirstValueWithRetractAggFunction[_] =>
+ BuiltInPythonAggregateFunction.FIRST_VALUE_RETRACT
+ case _: IntSum0AggFunction | _: ByteSum0AggFunction | _:
ShortSum0AggFunction |
+ _: LongSum0AggFunction =>
+ BuiltInPythonAggregateFunction.INT_SUM0
+ case _: FloatSum0AggFunction | _: DoubleSum0AggFunction =>
+ BuiltInPythonAggregateFunction.FLOAT_SUM0
+ case _: DecimalSum0AggFunction =>
+ BuiltInPythonAggregateFunction.DECIMAL_SUM0
+ case _ =>
+ throw new TableException("This aggregate function can not be mixed
with Python UDAF: " +
Review comment:
```suggestion
throw new TableException("Aggregate function %s is still not
supported to be mixed with Python UDAF: " +
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]