[ 
https://issues.apache.org/jira/browse/HUDI-7173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Chen closed HUDI-7173.
----------------------------
    Resolution: Fixed

Fixed via master branch: 2aa91ca24e2d3b52c403441215c45b3830d722ff

> Fix Hudi-on-Flink reads after schema evolution is applied on Decimal fields
> ---------------------------------------------------------------------------
>
>                 Key: HUDI-7173
>                 URL: https://issues.apache.org/jira/browse/HUDI-7173
>             Project: Apache Hudi
>          Issue Type: Bug
>            Reporter: Voon Hou
>            Assignee: voon
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 0.14.1, 1.0.0
>
>
> h1. Introduction
> Hudi-on-Flink reads currently fails when reading parquet files after these 
> two COMPREHENSIVE schema evolution involving DECIMAL types are applied:
>  # *Adding* a DECIMAL field into a struct
>  # *Reordering* a DECIMAL field in a struct
>  
> The error stack traces are provided below when trying perform reads.
> h2. *Adding* a DECIMAL field into a struct
> {code:java}
> ClassCastException: 
> org.apache.hudi.table.format.cow.vector.ParquetDecimalVector cannot be cast 
> to org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector
>     at 
> org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:547)
>     at 
> org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:441)
>     at 
> org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader.createWritableVectors(ParquetColumnarRowSplitReader.java:216)
>     at 
> org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader.<init>(ParquetColumnarRowSplitReader.java:156)
>     at 
> org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.genPartColumnarRowReader(ParquetSplitReaderUtil.java:149)
>     at 
> org.apache.hudi.table.format.RecordIterators.getParquetRecordIterator(RecordIterators.java:72)
>     at 
> org.apache.hudi.table.format.mor.MergeOnReadInputFormat.getBaseFileIterator(MergeOnReadInputFormat.java:362)
>     at 
> org.apache.hudi.table.format.mor.MergeOnReadInputFormat.getBaseFileIterator(MergeOnReadInputFormat.java:337)
>     at 
> org.apache.hudi.table.format.mor.MergeOnReadInputFormat.open(MergeOnReadInputFormat.java:201)
>     at 
> org.apache.hudi.source.StreamReadOperator.processSplits(StreamReadOperator.java:202)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:822)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:771)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:569)
>     at java.lang.Thread.run(Thread.java:748){code}
> h2. *Reordering* a DECIMAL field in a struct
> {code:java}
> ClassCastException: 
> org.apache.flink.table.data.columnar.vector.heap.HeapBytesVector cannot be 
> cast to org.apache.flink.table.data.columnar.vector.DecimalColumnVector
>     at 
> org.apache.flink.table.data.columnar.vector.VectorizedColumnBatch.getDecimal(VectorizedColumnBatch.java:118)
>     at 
> org.apache.flink.table.data.columnar.ColumnarRowData.getDecimal(ColumnarRowData.java:128)
>     at 
> org.apache.flink.table.data.RowData.lambda$createFieldGetter$89bd9445$1(RowData.java:233)
>     at 
> org.apache.flink.table.data.RowData.lambda$createFieldGetter$25774257$1(RowData.java:296)
>     at 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:170)
>     at 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:131)
>     at 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48)
>     at 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:170)
>     at 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:131)
>     at 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48)
>     at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:80)
>     at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
>     at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
>     at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
>     at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
>     at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:474)
>     at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:572)
>     at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collect(StreamSourceContexts.java:144)
>     at 
> org.apache.hudi.source.StreamReadOperator.consumeAsMiniBatch(StreamReadOperator.java:228)
>     at 
> org.apache.hudi.source.StreamReadOperator.processSplits(StreamReadOperator.java:207)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:822)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:771)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:569)
>     at java.lang.Thread.run(Thread.java:748) {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to