[
https://issues.apache.org/jira/browse/FLINK-25238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17518538#comment-17518538
]
Yi Tang commented on FLINK-25238:
---------------------------------
Hi [~gyfora], thanks for helping merging these fixes.
Since release-1.15 is not released officially, I'm not sure whether it is
proper to add the fix to 1.13 due to the upgrade policy. What's your opinion?
Will there be another fixing release for version 1.13?
> flink iceberg source reading array types fail with Cast Exception
> -----------------------------------------------------------------
>
> Key: FLINK-25238
> URL: https://issues.apache.org/jira/browse/FLINK-25238
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 1.13.2, 1.15.0, 1.14.4
> Reporter: Praneeth Ramesh
> Assignee: Gyula Fora
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.16.0
>
> Attachments: Screen Shot 2021-12-09 at 6.58.56 PM.png, Screen Shot
> 2021-12-09 at 7.04.10 PM.png
>
>
> I have a stream with iceberg table as a source. I have few columns of array
> types in the table.
> I try to read using iceberg connector.
> Flink Version : 1.13.2
> Iceberg Flink Version: 0.12.1
>
> I see the error as below.
> java.lang.ClassCastException: class
> org.apache.iceberg.flink.data.FlinkParquetReaders$ReusableArrayData cannot be
> cast to class org.apache.flink.table.data.ColumnarArrayData
> (org.apache.iceberg.flink.data.FlinkParquetReaders$ReusableArrayData and
> org.apache.flink.table.data.ColumnarArrayData are in unnamed module of loader
> 'app')
> at
> org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copy(ArrayDataSerializer.java:90)
> at
> org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copy(ArrayDataSerializer.java:47)
> at
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:170)
> at
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:131)
> at
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:317)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:411)
> at
> org.apache.iceberg.flink.source.StreamingReaderOperator.processSplits(StreamingReaderOperator.java:155)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:359)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:323)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
> at java.base/java.lang.Thread.run(Thread.java:834)
>
> Could be same issue as https://issues.apache.org/jira/browse/FLINK-21247
> except it happening for another type.
> I see that Iceberg use custom types other than the types from
> org.apache.flink.table.data like
> org.apache.iceberg.flink.data.FlinkParquetReaders.ReusableArrayData and these
> types are not handled in
> org.apache.flink.table.runtime.typeutils.ArrayDataSerializer
> !Screen Shot 2021-12-09 at 6.58.56 PM.png!
> Just to try I changed the above code to handle the iceberg type as a binary
> Array and built it locally and used in my application and that worked.
>
> !Screen Shot 2021-12-09 at 7.04.10 PM.png!
> Not sure if this is already handled in some newer versions.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)