[
https://issues.apache.org/jira/browse/FLINK-21247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17278483#comment-17278483
]
Zheng Hu edited comment on FLINK-21247 at 2/4/21, 2:24 AM:
-----------------------------------------------------------
[~jark], Assume that there are 5 records in iceberg table ( table schema is:
create table test (row : map<string, String>) ):
{code}
row1: map<{"a", "1"}, {"b", "2}, {"c", "3"}>
row2: map<{"a", "1"}>
row3: map<{"c", "111"}>
row4: map<{"d", "11"}>
row5: map<{"e", 1"}>
{code}
If we don't use the reuse object, then for each row we will need to create a
new RowData object for each row, which would create so many young objects that
produces the GC issues. But if use the reuse object, then each children
fields inside this object will all be reusable. In this case, we will need a
ReusableMapData to maintain the key-value pairs for the first row1, then for
the second row2, actually the ReusableMapData has enough space to hold the map
of row2, then we don't have to allocate new map - that will really reduce the
heap object allocations.
In our apache iceberg generic parquet reader & writer, we use this reuse
strategy for all the compute engines. Another similar data structure is
ReusableArrayData, which extends from ArrayData interfaces. I think we also
need to fix it in this PR. Thanks.
was (Author: openinx):
[~jark], Assume that there are 5 records in iceberg table ( table schema is:
create table test (row : map<string, String>) ):
row1: map<{"a", "1"}, {"b", "2}, {"c", "3"}>
row2: map<{"a", "1"}>
row3: map<{"c", "111"}>
row4: map<{"d", "11"}>
row5: map<{"e", 1"}>
If we don't use the reuse object, then for each row we will need to create a
new RowData object for each row, which would create so many young objects that
produces the GC issues. But if use the reuse object, then each children
fields inside this object will all be reusable. In this case, we will need a
ReusableMapData to maintain the key-value pairs for the first row1, then for
the second row2, actually the ReusableMapData has enough space to hold the map
of row2, then we don't have to allocate new map - that will really reduce the
heap object allocations.
In our apache iceberg generic parquet reader & writer, we use this reuse
strategy for all the compute engines. Another similar data structure is
ReusableArrayData, which extends from ArrayData interfaces. I think we also
need to fix it in this PR. Thanks.
> flink iceberg table map<string,string> cannot convert to datastream
> -------------------------------------------------------------------
>
> Key: FLINK-21247
> URL: https://issues.apache.org/jira/browse/FLINK-21247
> Project: Flink
> Issue Type: New Feature
> Components: Table SQL / Ecosystem
> Environment: iceberg master
> flink 1.12
>
>
> Reporter: donglei
> Priority: Major
> Labels: pull-request-available
> Attachments: image-2021-02-03-15-38-42-340.png,
> image-2021-02-03-15-40-27-055.png, image-2021-02-03-15-41-34-426.png,
> image-2021-02-03-15-43-19-919.png, image-2021-02-03-15-52-12-493.png,
> image-2021-02-03-15-53-18-244.png
>
>
> Flink Iceberg Table with map<string,string>
> !image-2021-02-03-15-38-42-340.png!
>
> we want to read the table like this :
>
> String querySql = "SELECT
> ftime,extinfo,country,province,operator,apn,gw,src_ip_head,info_str,product_id,app_version,sdk_id,sdk_version,hardware_os,qua,upload_ip,client_ip,upload_apn,event_code,event_result,package_size,consume_time,event_value,event_time,upload_time,boundle_id,uin,platform,os_version,channel,brand,model
> from bfzt3 ";
> Table table = tEnv.sqlQuery(querySql);
> DataStream<AttaInfo> sinkStream = tEnv.toAppendStream(table,
> Types.POJO(AttaInfo.class, map));
> sinkStream.map(x->1).returns(Types.INT).keyBy(new
> NullByteKeySelector<Integer>()).reduce((x,y) -> {
> return x+y;
> }).print();
>
>
> when read we find a exception
>
> 2021-02-03 15:37:57
> java.lang.ClassCastException:
> org.apache.iceberg.flink.data.FlinkParquetReaders$ReusableMapData cannot be
> cast to org.apache.flink.table.data.binary.BinaryMapData
> at
> org.apache.flink.table.runtime.typeutils.MapDataSerializer.copy(MapDataSerializer.java:107)
> at
> org.apache.flink.table.runtime.typeutils.MapDataSerializer.copy(MapDataSerializer.java:47)
> at
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:166)
> at
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:129)
> at
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:50)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:317)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:411)
> at
> org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:92)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241)
>
> we find that iceberg map is ReusableMapData implements MapData
> !image-2021-02-03-15-40-27-055.png!
>
> this is the exception
> !image-2021-02-03-15-41-34-426.png!
> MapData has two default implements GenericMapData and BinaryMapData
> from iceberg implement is ReusableMapData
>
> so i think that code should change to like this
> !image-2021-02-03-15-43-19-919.png!
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)