Voon Hou created HUDI-7173:
------------------------------
Summary: Fix Hudi-on-Flink reads after schema evolution is applied
on Decimal fields
Key: HUDI-7173
URL: https://issues.apache.org/jira/browse/HUDI-7173
Project: Apache Hudi
Issue Type: Bug
Reporter: Voon Hou
h1. Introduction
Hudi-on-Flink reads currently fails when reading parquet files after these two
COMPREHENSIVE schema evolution involving DECIMAL types are applied:
# *Adding* a DECIMAL field into a struct
# *Reordering* a DECIMAL field in a struct
The error stack traces are provided below when trying perform reads.
h2. *Adding* a DECIMAL field into a struct
{code:java}
ClassCastException:
org.apache.hudi.table.format.cow.vector.ParquetDecimalVector cannot be cast to
org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector
at
org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:547)
at
org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:441)
at
org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader.createWritableVectors(ParquetColumnarRowSplitReader.java:216)
at
org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader.<init>(ParquetColumnarRowSplitReader.java:156)
at
org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.genPartColumnarRowReader(ParquetSplitReaderUtil.java:149)
at
org.apache.hudi.table.format.RecordIterators.getParquetRecordIterator(RecordIterators.java:72)
at
org.apache.hudi.table.format.mor.MergeOnReadInputFormat.getBaseFileIterator(MergeOnReadInputFormat.java:362)
at
org.apache.hudi.table.format.mor.MergeOnReadInputFormat.getBaseFileIterator(MergeOnReadInputFormat.java:337)
at
org.apache.hudi.table.format.mor.MergeOnReadInputFormat.open(MergeOnReadInputFormat.java:201)
at
org.apache.hudi.source.StreamReadOperator.processSplits(StreamReadOperator.java:202)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:822)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:771)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:569)
at java.lang.Thread.run(Thread.java:748){code}
h2. *Reordering* a DECIMAL field in a struct
{code:java}
ClassCastException:
org.apache.flink.table.data.columnar.vector.heap.HeapBytesVector cannot be cast
to org.apache.flink.table.data.columnar.vector.DecimalColumnVector
at
org.apache.flink.table.data.columnar.vector.VectorizedColumnBatch.getDecimal(VectorizedColumnBatch.java:118)
at
org.apache.flink.table.data.columnar.ColumnarRowData.getDecimal(ColumnarRowData.java:128)
at
org.apache.flink.table.data.RowData.lambda$createFieldGetter$89bd9445$1(RowData.java:233)
at
org.apache.flink.table.data.RowData.lambda$createFieldGetter$25774257$1(RowData.java:296)
at
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:170)
at
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:131)
at
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48)
at
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:170)
at
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:131)
at
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:80)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:474)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:572)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collect(StreamSourceContexts.java:144)
at
org.apache.hudi.source.StreamReadOperator.consumeAsMiniBatch(StreamReadOperator.java:228)
at
org.apache.hudi.source.StreamReadOperator.processSplits(StreamReadOperator.java:207)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:822)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:771)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:569)
at java.lang.Thread.run(Thread.java:748) {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)