Voon Hou created HUDI-7173:
------------------------------

             Summary: Fix Hudi-on-Flink reads after schema evolution is applied 
on Decimal fields
                 Key: HUDI-7173
                 URL: https://issues.apache.org/jira/browse/HUDI-7173
             Project: Apache Hudi
          Issue Type: Bug
            Reporter: Voon Hou


h1. Introduction

Hudi-on-Flink reads currently fails when reading parquet files after these two 
COMPREHENSIVE schema evolution involving DECIMAL types are applied:
 # *Adding* a DECIMAL field into a struct
 # *Reordering* a DECIMAL field in a struct

 

The error stack traces are provided below when trying perform reads.
h2. *Adding* a DECIMAL field into a struct
{code:java}
ClassCastException: 
org.apache.hudi.table.format.cow.vector.ParquetDecimalVector cannot be cast to 
org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector
    at 
org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:547)
    at 
org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:441)
    at 
org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader.createWritableVectors(ParquetColumnarRowSplitReader.java:216)
    at 
org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader.<init>(ParquetColumnarRowSplitReader.java:156)
    at 
org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.genPartColumnarRowReader(ParquetSplitReaderUtil.java:149)
    at 
org.apache.hudi.table.format.RecordIterators.getParquetRecordIterator(RecordIterators.java:72)
    at 
org.apache.hudi.table.format.mor.MergeOnReadInputFormat.getBaseFileIterator(MergeOnReadInputFormat.java:362)
    at 
org.apache.hudi.table.format.mor.MergeOnReadInputFormat.getBaseFileIterator(MergeOnReadInputFormat.java:337)
    at 
org.apache.hudi.table.format.mor.MergeOnReadInputFormat.open(MergeOnReadInputFormat.java:201)
    at 
org.apache.hudi.source.StreamReadOperator.processSplits(StreamReadOperator.java:202)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:822)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:771)
    at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:569)
    at java.lang.Thread.run(Thread.java:748){code}
h2. *Reordering* a DECIMAL field in a struct
{code:java}
ClassCastException: 
org.apache.flink.table.data.columnar.vector.heap.HeapBytesVector cannot be cast 
to org.apache.flink.table.data.columnar.vector.DecimalColumnVector
    at 
org.apache.flink.table.data.columnar.vector.VectorizedColumnBatch.getDecimal(VectorizedColumnBatch.java:118)
    at 
org.apache.flink.table.data.columnar.ColumnarRowData.getDecimal(ColumnarRowData.java:128)
    at 
org.apache.flink.table.data.RowData.lambda$createFieldGetter$89bd9445$1(RowData.java:233)
    at 
org.apache.flink.table.data.RowData.lambda$createFieldGetter$25774257$1(RowData.java:296)
    at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:170)
    at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:131)
    at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48)
    at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:170)
    at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:131)
    at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48)
    at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:80)
    at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
    at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
    at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
    at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
    at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:474)
    at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:572)
    at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collect(StreamSourceContexts.java:144)
    at 
org.apache.hudi.source.StreamReadOperator.consumeAsMiniBatch(StreamReadOperator.java:228)
    at 
org.apache.hudi.source.StreamReadOperator.processSplits(StreamReadOperator.java:207)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:822)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:771)
    at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:569)
    at java.lang.Thread.run(Thread.java:748) {code}
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to