[ 
https://issues.apache.org/jira/browse/FLINK-27799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser closed FLINK-27799.
----------------------------------
    Resolution: Information Provided

> Version 1.13.5 is not compatible with version 1.10 UDF
> ------------------------------------------------------
>
>                 Key: FLINK-27799
>                 URL: https://issues.apache.org/jira/browse/FLINK-27799
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.13.5
>            Reporter: Yichao Yang
>            Priority: Major
>
> Flink 1.10 Version,The following code will work
>  
> {code:java}
> // UDF
> public class SetStringUDF extends ScalarFunction {
> //    @DataTypeHint("RAW")
>     public Set<String> eval(String input) {
>         return Sets.newHashSet(input, input + "_1", input + "_2");
>     }
>     @Override
>     public TypeInformation<?> getResultType(Class<?>[] signature) {
>         return TypeInformation.of(new TypeHint<Set<String>>() {
>         });
>     }
> }
> public class GetSetValue extends ScalarFunction {
>     public String eval(Set<String> set) {
>         return set.iterator().next();
>     }
> }
> StreamTableEnvironment.createFunction("set_string", SetStringUDF.class); 
> StreamTableEnvironment.createFunction("get_set_value", GetSetValue.class);
> CREATE TABLE Orders (
>     order_id BIGINT NOT NULL,
>     name STRING,
>     row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
>     WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
> ) WITH (
>   'connector' = 'datagen',
>   'rows-per-second' = '10',
>   'fields.name.length' = '1',
>   'fields.order_id.min' = '1',
>   'fields.order_id.max' = '10'
> );CREATE TABLE target_table (
>     order_id BIGINT NOT NULL,
>     name STRING,
>     row_time timestamp(3),
>     i STRING
> ) WITH (
>   'connector' = 'print'
> );
> INSERT INTO target_table
> SELECT *, cast(get_set_value(set_string(name)) as string) as i
> FROM Orders{code}
> but in Flink 1.13.5,it will throw exception like:
>  
>  
> {code:java}
> Caused by: org.apache.flink.table.api.ValidationException: Could not extract 
> a data type from 'java.util.Set<java.lang.String>'. Interpreting it as a 
> structured type was also not successful.
>     at 
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
>     at 
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:291)
>     at 
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:233)
>     ... 36 more
> Caused by: org.apache.flink.table.api.ValidationException: Class 
> 'java.util.Set' must not be abstract.
>     at 
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
>     at 
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:356)
>     at 
> org.apache.flink.table.types.extraction.ExtractionUtils.validateStructuredClass(ExtractionUtils.java:164)
>     at 
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractStructuredType(DataTypeExtractor.java:479)
>     at 
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:289)
>     ... 37 more {code}
>  
>  
> I have to change my UDF to fix this problem.
>  
> {code:java}
> public class GetSetValue extends ScalarFunction {
>     public String eval(@DataTypeHint("RAW") Object set) {
>         
>         Set<String> s = (Set<String>) set;
>         
>         return s.iterator().next();
>     }
> }
> public class SetStringUDF extends ScalarFunction {
>     @DataTypeHint("RAW")
>     public Object eval(String input) {
>         return Sets.newHashSet(input, input + "_1", input + "_2");
>     }
> }
>  {code}
>  
>  
> I have two questions:
>  # At present, is there a way to be compatible with this problem without 
> changing the code?
>  # If 1 is not。We need fix all of the UDFs,it will be a lot work to do. Can 
> there be a plan to complete compatibility in the future



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to