[
https://issues.apache.org/jira/browse/FLINK-27799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Martijn Visser closed FLINK-27799.
----------------------------------
Resolution: Information Provided
> Version 1.13.5 is not compatible with version 1.10 UDF
> ------------------------------------------------------
>
> Key: FLINK-27799
> URL: https://issues.apache.org/jira/browse/FLINK-27799
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / API
> Affects Versions: 1.13.5
> Reporter: Yichao Yang
> Priority: Major
>
> Flink 1.10 Version,The following code will work
>
> {code:java}
> // UDF
> public class SetStringUDF extends ScalarFunction {
> // @DataTypeHint("RAW")
> public Set<String> eval(String input) {
> return Sets.newHashSet(input, input + "_1", input + "_2");
> }
> @Override
> public TypeInformation<?> getResultType(Class<?>[] signature) {
> return TypeInformation.of(new TypeHint<Set<String>>() {
> });
> }
> }
> public class GetSetValue extends ScalarFunction {
> public String eval(Set<String> set) {
> return set.iterator().next();
> }
> }
> StreamTableEnvironment.createFunction("set_string", SetStringUDF.class);
> StreamTableEnvironment.createFunction("get_set_value", GetSetValue.class);
> CREATE TABLE Orders (
> order_id BIGINT NOT NULL,
> name STRING,
> row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
> WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
> ) WITH (
> 'connector' = 'datagen',
> 'rows-per-second' = '10',
> 'fields.name.length' = '1',
> 'fields.order_id.min' = '1',
> 'fields.order_id.max' = '10'
> );CREATE TABLE target_table (
> order_id BIGINT NOT NULL,
> name STRING,
> row_time timestamp(3),
> i STRING
> ) WITH (
> 'connector' = 'print'
> );
> INSERT INTO target_table
> SELECT *, cast(get_set_value(set_string(name)) as string) as i
> FROM Orders{code}
> but in Flink 1.13.5,it will throw exception like:
>
>
> {code:java}
> Caused by: org.apache.flink.table.api.ValidationException: Could not extract
> a data type from 'java.util.Set<java.lang.String>'. Interpreting it as a
> structured type was also not successful.
> at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
> at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:291)
> at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:233)
> ... 36 more
> Caused by: org.apache.flink.table.api.ValidationException: Class
> 'java.util.Set' must not be abstract.
> at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
> at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:356)
> at
> org.apache.flink.table.types.extraction.ExtractionUtils.validateStructuredClass(ExtractionUtils.java:164)
> at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractStructuredType(DataTypeExtractor.java:479)
> at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:289)
> ... 37 more {code}
>
>
> I have to change my UDF to fix this problem.
>
> {code:java}
> public class GetSetValue extends ScalarFunction {
> public String eval(@DataTypeHint("RAW") Object set) {
>
> Set<String> s = (Set<String>) set;
>
> return s.iterator().next();
> }
> }
> public class SetStringUDF extends ScalarFunction {
> @DataTypeHint("RAW")
> public Object eval(String input) {
> return Sets.newHashSet(input, input + "_1", input + "_2");
> }
> }
> {code}
>
>
> I have two questions:
> # At present, is there a way to be compatible with this problem without
> changing the code?
> # If 1 is not。We need fix all of the UDFs,it will be a lot work to do. Can
> there be a plan to complete compatibility in the future
--
This message was sent by Atlassian Jira
(v8.20.7#820007)