[
https://issues.apache.org/jira/browse/FLINK-27776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
David Anderson updated FLINK-27776:
-----------------------------------
Summary: Throw exception when UDAF used in sliding window does not
implement merge method in PyFlink (was: Throws exception when udaf used in
sliding window does not implement merge method in PyFlink)
> Throw exception when UDAF used in sliding window does not implement merge
> method in PyFlink
> -------------------------------------------------------------------------------------------
>
> Key: FLINK-27776
> URL: https://issues.apache.org/jira/browse/FLINK-27776
> Project: Flink
> Issue Type: Improvement
> Components: API / Python
> Affects Versions: 1.15.0, 1.13.6, 1.14.4
> Reporter: Huang Xingbo
> Assignee: Huang Xingbo
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.16.0, 1.14.5, 1.15.1
>
>
> We use the pane state to optimize the result of calculating the window state,
> which requires udaf to implement the merge method. However, due to the lack
> of detection of whether the merge method of udaf is implemented, the user's
> output result did not meet his expectations and there is no exception. Below
> is an example of a UDAF that implements the merge method:
> {code:python}
> class SumAggregateFunction(AggregateFunction):
> def get_value(self, accumulator):
> return accumulator[0]
> def create_accumulator(self):
> return [0]
> def accumulate(self, accumulator, *args):
> accumulator[0] = accumulator[0] + args[0]
> def retract(self, accumulator, *args):
> accumulator[0] = accumulator[0] - args[0]
> def merge(self, accumulator, accumulators):
> for other_acc in accumulators:
> accumulator[0] = accumulator[0] + other_acc[0]
> def get_accumulator_type(self):
> return DataTypes.ARRAY(DataTypes.BIGINT())
> def get_result_type(self):
> return DataTypes.BIGINT()
> {code}
--
This message was sent by Atlassian Jira
(v8.20.7#820007)