[
https://issues.apache.org/jira/browse/HUDI-7173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Danny Chen closed HUDI-7173.
----------------------------
Resolution: Fixed
Fixed via master branch: 2aa91ca24e2d3b52c403441215c45b3830d722ff
> Fix Hudi-on-Flink reads after schema evolution is applied on Decimal fields
> ---------------------------------------------------------------------------
>
> Key: HUDI-7173
> URL: https://issues.apache.org/jira/browse/HUDI-7173
> Project: Apache Hudi
> Issue Type: Bug
> Reporter: Voon Hou
> Assignee: voon
> Priority: Major
> Labels: pull-request-available
> Fix For: 0.14.1, 1.0.0
>
>
> h1. Introduction
> Hudi-on-Flink reads currently fails when reading parquet files after these
> two COMPREHENSIVE schema evolution involving DECIMAL types are applied:
> # *Adding* a DECIMAL field into a struct
> # *Reordering* a DECIMAL field in a struct
>
> The error stack traces are provided below when trying perform reads.
> h2. *Adding* a DECIMAL field into a struct
> {code:java}
> ClassCastException:
> org.apache.hudi.table.format.cow.vector.ParquetDecimalVector cannot be cast
> to org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector
> at
> org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:547)
> at
> org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:441)
> at
> org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader.createWritableVectors(ParquetColumnarRowSplitReader.java:216)
> at
> org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader.<init>(ParquetColumnarRowSplitReader.java:156)
> at
> org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.genPartColumnarRowReader(ParquetSplitReaderUtil.java:149)
> at
> org.apache.hudi.table.format.RecordIterators.getParquetRecordIterator(RecordIterators.java:72)
> at
> org.apache.hudi.table.format.mor.MergeOnReadInputFormat.getBaseFileIterator(MergeOnReadInputFormat.java:362)
> at
> org.apache.hudi.table.format.mor.MergeOnReadInputFormat.getBaseFileIterator(MergeOnReadInputFormat.java:337)
> at
> org.apache.hudi.table.format.mor.MergeOnReadInputFormat.open(MergeOnReadInputFormat.java:201)
> at
> org.apache.hudi.source.StreamReadOperator.processSplits(StreamReadOperator.java:202)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:822)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:771)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:569)
> at java.lang.Thread.run(Thread.java:748){code}
> h2. *Reordering* a DECIMAL field in a struct
> {code:java}
> ClassCastException:
> org.apache.flink.table.data.columnar.vector.heap.HeapBytesVector cannot be
> cast to org.apache.flink.table.data.columnar.vector.DecimalColumnVector
> at
> org.apache.flink.table.data.columnar.vector.VectorizedColumnBatch.getDecimal(VectorizedColumnBatch.java:118)
> at
> org.apache.flink.table.data.columnar.ColumnarRowData.getDecimal(ColumnarRowData.java:128)
> at
> org.apache.flink.table.data.RowData.lambda$createFieldGetter$89bd9445$1(RowData.java:233)
> at
> org.apache.flink.table.data.RowData.lambda$createFieldGetter$25774257$1(RowData.java:296)
> at
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:170)
> at
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:131)
> at
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48)
> at
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:170)
> at
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:131)
> at
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:80)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:474)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:572)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collect(StreamSourceContexts.java:144)
> at
> org.apache.hudi.source.StreamReadOperator.consumeAsMiniBatch(StreamReadOperator.java:228)
> at
> org.apache.hudi.source.StreamReadOperator.processSplits(StreamReadOperator.java:207)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:822)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:771)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:569)
> at java.lang.Thread.run(Thread.java:748) {code}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)