[
https://issues.apache.org/jira/browse/FLINK-30849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jianhui Dong closed FLINK-30849.
--------------------------------
Resolution: Invalid
> udaf validated failed with TableEnvironment#executeSql but work correctly
> with StreamTableEnrivorment#registerFunction
> ----------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-30849
> URL: https://issues.apache.org/jira/browse/FLINK-30849
> Project: Flink
> Issue Type: Bug
> Components: API / Core, Table SQL / API
> Reporter: Jianhui Dong
> Priority: Major
> Attachments: MultiAggToJsonArrayV2.java
>
>
> We have a udaf which has some overloaded methods and it can work in flink
> 1.12.2 with deprecated api StreamTableEnrivorment#registerFunction, but when
> use TableEnvironment#executeSql in flink 1.12.2 or flink 1.16, it will throw
> an exception as follows:
> {code:java}
> Caused by: org.apache.flink.table.api.ValidationException: Considering all
> hints, the method should comply with the signature:
> accumulate(_, java.lang.String, java.lang.Object, java.lang.Object)
> at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
> at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:328)
> at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.createMethodNotFoundError(FunctionMappingExtractor.java:535)
> {code}
> In flink 1.16, the method
> <T, ACC> void registerFunction(String name, AggregateFunction<T, ACC>
> aggregateFunction); does noe exist anymore, and I want to how to rewrite the
> udaf to make it works.
> The test code is as follows:
> {code:java}
> TableEnvironment tableEnvironment =
> StreamTableEnvironment.create(StreamExecutionEnvironment.getExecutionEnvironment());
> // tableEnvironment.registerFunction("MultiAggToJsonArrayV2", new
> MultiAggToJsonArrayV2());
> tableEnvironment.executeSql("CREATE FUNCTION `MultiAggToJsonArrayV2`
> AS 'com.sankuai.flink.streaming.udf.MultiAggToJsonArrayV2'");
> tableEnvironment.executeSql("CREATE TABLE `grocery_udf_test`(`a`
> VARCHAR,`b` INTEGER,`c` VARCHAR,`d` VARCHAR) WITH " +
> "('connector'='datagen')\n");
> tableEnvironment.executeSql("CREATE TABLE `grocery_udf_test_sink`(`a`
> VARCHAR,`res` VARCHAR) WITH ('connector'='blackhole')\n" );
> tableEnvironment.executeSql("INSERT INTO `grocery_udf_test_sink`
> SELECT `a`, `MultiAggToJsonArrayV2`('b', '', '', '') AS `res` FROM
> `grocery_udf_test` GROUP BY `a`"); {code}
> [^MultiAggToJsonArrayV2.java]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)