[ 
https://issues.apache.org/jira/browse/FLINK-21247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-21247:
-------------------------------
    Fix Version/s:     (was: 1.13.0)
                   1.13.1
                   1.14.0

> flink iceberg table map<string,string> cannot convert to datastream
> -------------------------------------------------------------------
>
>                 Key: FLINK-21247
>                 URL: https://issues.apache.org/jira/browse/FLINK-21247
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.12.1
>         Environment: iceberg master
> flink 1.12
>  
>  
>            Reporter: donglei
>            Assignee: Shuo Cheng
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.14.0, 1.13.1
>
>         Attachments: image-2021-02-03-15-38-42-340.png, 
> image-2021-02-03-15-40-27-055.png, image-2021-02-03-15-41-34-426.png, 
> image-2021-02-03-15-43-19-919.png, image-2021-02-03-15-52-12-493.png, 
> image-2021-02-03-15-53-18-244.png
>
>
> Flink Iceberg Table with map<string,string>
> !image-2021-02-03-15-38-42-340.png!
>  
> we want to read the table like this :
>  
> String querySql = "SELECT 
> ftime,extinfo,country,province,operator,apn,gw,src_ip_head,info_str,product_id,app_version,sdk_id,sdk_version,hardware_os,qua,upload_ip,client_ip,upload_apn,event_code,event_result,package_size,consume_time,event_value,event_time,upload_time,boundle_id,uin,platform,os_version,channel,brand,model
>  from bfzt3 ";
> Table table = tEnv.sqlQuery(querySql);
> DataStream<AttaInfo> sinkStream = tEnv.toAppendStream(table, 
> Types.POJO(AttaInfo.class, map));
> sinkStream.map(x->1).returns(Types.INT).keyBy(new 
> NullByteKeySelector<Integer>()).reduce((x,y) -> {
>  return x+y;
> }).print();
>  
>  
> when read  we find a exception
>  
> 2021-02-03 15:37:57
> java.lang.ClassCastException: 
> org.apache.iceberg.flink.data.FlinkParquetReaders$ReusableMapData cannot be 
> cast to org.apache.flink.table.data.binary.BinaryMapData
>     at 
> org.apache.flink.table.runtime.typeutils.MapDataSerializer.copy(MapDataSerializer.java:107)
>     at 
> org.apache.flink.table.runtime.typeutils.MapDataSerializer.copy(MapDataSerializer.java:47)
>     at 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:166)
>     at 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:129)
>     at 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:50)
>     at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
>     at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
>     at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
>     at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>     at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>     at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:317)
>     at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:411)
>     at 
> org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:92)
>     at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>     at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>     at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241)
>  
> we find that iceberg map is  ReusableMapData implements MapData 
> !image-2021-02-03-15-40-27-055.png!
>  
> this is the exception 
> !image-2021-02-03-15-41-34-426.png!
> MapData has two default implements  GenericMapData and BinaryMapData
> from iceberg implement is ReusableMapData
>  
> so i think that code should change to like this 
> !image-2021-02-03-15-43-19-919.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to