[ 
https://issues.apache.org/jira/browse/FLINK-36847?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-36847:
-----------------------------------
    Labels: pull-request-available  (was: )

> Table API toDataStream()  cannot be converted to an enum type field
> -------------------------------------------------------------------
>
>                 Key: FLINK-36847
>                 URL: https://issues.apache.org/jira/browse/FLINK-36847
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.16.1
>         Environment: jdk: 1.8
> flink: 1.16.1
>  
>            Reporter: LuJiang
>            Priority: Not a Priority
>              Labels: pull-request-available
>
> toDataStream(table,clazz) will run fail when pojo Class contains enum 
> fields.Exception occurs when searching for Class constructor:
> org.apache.flink.table.types.extraction.DataTypeExtractor#extractStructuredType
> {code:java}
> private DataType extractStructuredType(
>             DataTypeTemplate template, List<Type> typeHierarchy, Type type) {
>         final Class<?> clazz = toClass(type);
>         if (clazz == null) {
>             throw extractionError("Not a class type.");
>         }        validateStructuredClass(clazz);
>         validateStructuredSelfReference(type, typeHierarchy);        final 
> List<Field> fields = collectStructuredFields(clazz);        if 
> (fields.isEmpty()) {
>             throw extractionError("Class '%s' has no fields.", 
> clazz.getName());
>         }        // if not all fields are mutable, a default constructor is 
> not enough
>         final boolean allFieldsMutable =
>                 fields.stream()
>                         .allMatch(
>                                 f -> {
>                                     validateStructuredFieldReadability(clazz, 
> f);
>                                     return isStructuredFieldMutable(clazz, f);
>                                 });        final 
> ExtractionUtils.AssigningConstructor constructor =
>                 extractAssigningConstructor(clazz, fields);
>         if (!allFieldsMutable && constructor == null) {
>             throw extractionError(
>                     "Class '%s' has immutable fields and thus requires a 
> constructor that is publicly "
>                             + "accessible and assigns all fields: %s",
>                     clazz.getName(),
>                     
> fields.stream().map(Field::getName).collect(Collectors.joining(", ")));
>         }
>         // check for a default constructor otherwise
>         else if (constructor == null && !hasInvokableConstructor(clazz)) {
>             throw extractionError(
>                     "Class '%s' has neither a constructor that assigns all 
> fields nor a default constructor.",
>                     clazz.getName());
>         }        final Map<String, DataType> fieldDataTypes =
>                 extractStructuredTypeFields(template, typeHierarchy, type, 
> fields);        final DataTypes.Field[] attributes =
>                 createStructuredTypeAttributes(constructor, fieldDataTypes);  
>       return DataTypes.STRUCTURED(clazz, attributes);
>     }{code}
> extractAssigningConstructor cannot find any available constructor.
>  
> It's easy to reproduce as following:
>  
>  
> {code:java}
> @Test
> public void testTable2Pojo() throws Exception {
>     StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>     env.setParallelism(1);
>     StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
>     String sql = readFromClasspath("table2pojo.sql");
>     tEnv.executeSql(sql);
>     Table table = tEnv.sqlQuery("select * from my_user");
>     DataStream<MyUser> dataStream = tEnv.toDataStream(table, MyUser.class);
>     dataStream.process(new ProcessFunction<MyUser, String>() {
>         @Override
>         public void processElement(MyUser value, ProcessFunction<MyUser, 
> String>.Context ctx, Collector<String> out) throws Exception {
>             out.collect(JSON.toJSONString(value));
>         }
>     }).addSink(new PrintSinkFunction<>());
>     env.execute(getClass().getSimpleName());
> }
> @Data
> @Accessors(chain = true)
> public class MyUser {
>     private String username;
>     private Integer age;
>     private MyExtInfo info;
> }
> @Data
> @Accessors(chain = true)
> public class MyExtInfo {
>     private String family;
>     private String myHobby;
>     private MyJobType job;
> }
> public enum MyJobType {
>     Teacher,
>     Artist,
>     Scientist;
> }
> // table2pojo.sql
> create table my_user
> (
>     username   STRING,
>     age    INT,
>     info ROW <
>         my_hobby STRING,
>     family STRING,
>     job    STRING >
> )
> with
>     ( 'connector' = 'kafka',
>         'value.format' ='json',
>         'topic'='test',
>         'properties.group.id'='test001',
>         'properties.auto.offset.reset'= 'latest',
>         'properties.bootstrap.servers'='localhost:50000',
>         'value.fields-include' ='ALL')
> {code}
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to