[
https://issues.apache.org/jira/browse/FLINK-27799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17543980#comment-17543980
]
Martijn Visser commented on FLINK-27799:
----------------------------------------
[~yangyichao] Unfortunately there's currently no way to work around this. As
with any new Flink version, changes can be made to interfaces that are
annotated as Experimental or PublicEvolving. Only interfaces annotated with
Public won't change unless Flink is bumped to a new major version.
> Version 1.13.5 is not compatible with version 1.10 UDF
> ------------------------------------------------------
>
> Key: FLINK-27799
> URL: https://issues.apache.org/jira/browse/FLINK-27799
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / API
> Affects Versions: 1.13.5
> Reporter: Yichao Yang
> Priority: Major
>
> Flink 1.10 Version,The following code will work
>
> {code:java}
> // UDF
> public class SetStringUDF extends ScalarFunction {
> // @DataTypeHint("RAW")
> public Set<String> eval(String input) {
> return Sets.newHashSet(input, input + "_1", input + "_2");
> }
> @Override
> public TypeInformation<?> getResultType(Class<?>[] signature) {
> return TypeInformation.of(new TypeHint<Set<String>>() {
> });
> }
> }
> public class GetSetValue extends ScalarFunction {
> public String eval(Set<String> set) {
> return set.iterator().next();
> }
> }
> StreamTableEnvironment.createFunction("set_string", SetStringUDF.class);
> StreamTableEnvironment.createFunction("get_set_value", GetSetValue.class);
> CREATE TABLE Orders (
> order_id BIGINT NOT NULL,
> name STRING,
> row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
> WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
> ) WITH (
> 'connector' = 'datagen',
> 'rows-per-second' = '10',
> 'fields.name.length' = '1',
> 'fields.order_id.min' = '1',
> 'fields.order_id.max' = '10'
> );CREATE TABLE target_table (
> order_id BIGINT NOT NULL,
> name STRING,
> row_time timestamp(3),
> i STRING
> ) WITH (
> 'connector' = 'print'
> );
> INSERT INTO target_table
> SELECT *, cast(get_set_value(set_string(name)) as string) as i
> FROM Orders{code}
> but in Flink 1.13.5,it will throw exception like:
>
>
> {code:java}
> Caused by: org.apache.flink.table.api.ValidationException: Could not extract
> a data type from 'java.util.Set<java.lang.String>'. Interpreting it as a
> structured type was also not successful.
> at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
> at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:291)
> at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:233)
> ... 36 more
> Caused by: org.apache.flink.table.api.ValidationException: Class
> 'java.util.Set' must not be abstract.
> at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
> at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:356)
> at
> org.apache.flink.table.types.extraction.ExtractionUtils.validateStructuredClass(ExtractionUtils.java:164)
> at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractStructuredType(DataTypeExtractor.java:479)
> at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:289)
> ... 37 more {code}
>
>
> I have to change my UDF to fix this problem.
>
> {code:java}
> public class GetSetValue extends ScalarFunction {
> public String eval(@DataTypeHint("RAW") Object set) {
>
> Set<String> s = (Set<String>) set;
>
> return s.iterator().next();
> }
> }
> public class SetStringUDF extends ScalarFunction {
> @DataTypeHint("RAW")
> public Object eval(String input) {
> return Sets.newHashSet(input, input + "_1", input + "_2");
> }
> }
> {code}
>
>
> I have two questions:
> # At present, is there a way to be compatible with this problem without
> changing the code?
> # If 1 is not。We need fix all of the UDFs,it will be a lot work to do. Can
> there be a plan to complete compatibility in the future
--
This message was sent by Atlassian Jira
(v8.20.7#820007)