[
https://issues.apache.org/jira/browse/FLINK-25238?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Praneeth Ramesh updated FLINK-25238:
------------------------------------
Description:
I have a stream with iceberg table as a source. I have few columns of array
types in the table.
I try to read using iceberg connector.
Flink Version : 1.13.2
Iceberg Flink Version: 0.12.1
I see the error as below.
java.lang.ClassCastException: class
org.apache.iceberg.flink.data.FlinkParquetReaders$ReusableArrayData cannot be
cast to class org.apache.flink.table.data.ColumnarArrayData
(org.apache.iceberg.flink.data.FlinkParquetReaders$ReusableArrayData and
org.apache.flink.table.data.ColumnarArrayData are in unnamed module of loader
'app')
at
org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copy(ArrayDataSerializer.java:90)
at
org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copy(ArrayDataSerializer.java:47)
at
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:170)
at
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:131)
at
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:317)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:411)
at
org.apache.iceberg.flink.source.StreamingReaderOperator.processSplits(StreamingReaderOperator.java:155)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:359)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:323)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.base/java.lang.Thread.run(Thread.java:834)
Could be same issue as https://issues.apache.org/jira/browse/FLINK-21247 except
it happening for another type.
I see that Iceberg use custom types other than the types from
org.apache.flink.table.data like
org.apache.iceberg.flink.data.FlinkParquetReaders.ReusableArrayData and these
types are not handled in
org.apache.flink.table.runtime.typeutils.ArrayDataSerializer
!Screen Shot 2021-12-09 at 6.58.56 PM.png!
Just to try I changed the above code to handle the iceberg type as a binary
Array and built it locally and used in my application and that worked.
!Screen Shot 2021-12-09 at 7.04.10 PM.png!
Not sure if this is already handled in some newer versions.
was:
I have a stream with iceberg table as a source. I have few columns of array
types in the table.
I try to read using iceberg connector.
Flink Version : 1.13.2
Iceberg Flink Version: 0.12.1
I see the error as below.
java.lang.ClassCastException: class
org.apache.iceberg.flink.data.FlinkParquetReaders$ReusableArrayData cannot be
cast to class org.apache.flink.table.data.ColumnarArrayData
(org.apache.iceberg.flink.data.FlinkParquetReaders$ReusableArrayData and
org.apache.flink.table.data.ColumnarArrayData are in unnamed module of loader
'app')
at
org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copy(ArrayDataSerializer.java:90)
at
org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copy(ArrayDataSerializer.java:47)
at
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:170)
at
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:131)
at
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:317)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:411)
at
org.apache.iceberg.flink.source.StreamingReaderOperator.processSplits(StreamingReaderOperator.java:155)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:359)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:323)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.base/java.lang.Thread.run(Thread.java:834)
Could be same issue as https://issues.apache.org/jira/browse/FLINK-21247 except
it happening for another type.
I see that Iceberg use custom types other than the types from
org.apache.flink.table.data like
org.apache.iceberg.flink.data.FlinkParquetReaders.ReusableArrayData and these
types are not handled in
org.apache.flink.table.runtime.typeutils.ArrayDataSerializer !Screen Shot
2021-12-09 at 6.58.56 PM.png!
Just to try I changed the above code to handle the iceberg type as a binary
Array and built it locally and used in my application and that worked.
!Screen Shot 2021-12-09 at 7.04.10 PM.png!
Not sure if this is already handled in some newer versions.
> flink iceberg source reading array types fail with Cast Exception
> -----------------------------------------------------------------
>
> Key: FLINK-25238
> URL: https://issues.apache.org/jira/browse/FLINK-25238
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 1.13.2
> Reporter: Praneeth Ramesh
> Priority: Major
> Attachments: Screen Shot 2021-12-09 at 6.58.56 PM.png, Screen Shot
> 2021-12-09 at 7.04.10 PM.png
>
>
> I have a stream with iceberg table as a source. I have few columns of array
> types in the table.
> I try to read using iceberg connector.
> Flink Version : 1.13.2
> Iceberg Flink Version: 0.12.1
>
> I see the error as below.
> java.lang.ClassCastException: class
> org.apache.iceberg.flink.data.FlinkParquetReaders$ReusableArrayData cannot be
> cast to class org.apache.flink.table.data.ColumnarArrayData
> (org.apache.iceberg.flink.data.FlinkParquetReaders$ReusableArrayData and
> org.apache.flink.table.data.ColumnarArrayData are in unnamed module of loader
> 'app')
> at
> org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copy(ArrayDataSerializer.java:90)
> at
> org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copy(ArrayDataSerializer.java:47)
> at
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:170)
> at
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:131)
> at
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:317)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:411)
> at
> org.apache.iceberg.flink.source.StreamingReaderOperator.processSplits(StreamingReaderOperator.java:155)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:359)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:323)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
> at java.base/java.lang.Thread.run(Thread.java:834)
>
> Could be same issue as https://issues.apache.org/jira/browse/FLINK-21247
> except it happening for another type.
> I see that Iceberg use custom types other than the types from
> org.apache.flink.table.data like
> org.apache.iceberg.flink.data.FlinkParquetReaders.ReusableArrayData and these
> types are not handled in
> org.apache.flink.table.runtime.typeutils.ArrayDataSerializer
> !Screen Shot 2021-12-09 at 6.58.56 PM.png!
> Just to try I changed the above code to handle the iceberg type as a binary
> Array and built it locally and used in my application and that worked.
>
> !Screen Shot 2021-12-09 at 7.04.10 PM.png!
> Not sure if this is already handled in some newer versions.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)