voonhous opened a new pull request, #10247:
URL: https://github.com/apache/hudi/pull/10247

   …and decimal types
   
   ### Change Logs
   
   Hudi-on-Flink reads currently fails when reading parquet files after these 
two COMPREHENSIVE schema evolution involving DECIMAL types are applied:
   
   1. **Adding** a DECIMAL field into a struct
   2. **Reordering** a DECIMAL field in a struct
   
   This PR aims to address the above Hudi-on-Flink read issues.
   
   **Error log for adding a decimal field into a struct**
   
   ```log
   ClassCastException: 
org.apache.hudi.table.format.cow.vector.ParquetDecimalVector cannot be cast to 
org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector
       at 
org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:547)
       at 
org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:441)
       at 
org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader.createWritableVectors(ParquetColumnarRowSplitReader.java:216)
       at 
org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader.<init>(ParquetColumnarRowSplitReader.java:156)
       at 
org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.genPartColumnarRowReader(ParquetSplitReaderUtil.java:149)
       at 
org.apache.hudi.table.format.RecordIterators.getParquetRecordIterator(RecordIterators.java:72)
       at 
org.apache.hudi.table.format.mor.MergeOnReadInputFormat.getBaseFileIterator(MergeOnReadInputFormat.java:362)
       at 
org.apache.hudi.table.format.mor.MergeOnReadInputFormat.getBaseFileIterator(MergeOnReadInputFormat.java:337)
       at 
org.apache.hudi.table.format.mor.MergeOnReadInputFormat.open(MergeOnReadInputFormat.java:201)
       at 
org.apache.hudi.source.StreamReadOperator.processSplits(StreamReadOperator.java:202)
       at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
       at 
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
       at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)
       at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
       at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
       at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:822)
       at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:771)
       at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
       at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:569)
       at java.lang.Thread.run(Thread.java:748)
   ```
   
   **Error log for adding a decimal field into a struct**
   
   ```log
   ClassCastException: 
org.apache.flink.table.data.columnar.vector.heap.HeapBytesVector cannot be cast 
to org.apache.flink.table.data.columnar.vector.DecimalColumnVector
       at 
org.apache.flink.table.data.columnar.vector.VectorizedColumnBatch.getDecimal(VectorizedColumnBatch.java:118)
       at 
org.apache.flink.table.data.columnar.ColumnarRowData.getDecimal(ColumnarRowData.java:128)
       at 
org.apache.flink.table.data.RowData.lambda$createFieldGetter$89bd9445$1(RowData.java:233)
       at 
org.apache.flink.table.data.RowData.lambda$createFieldGetter$25774257$1(RowData.java:296)
       at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:170)
       at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:131)
       at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48)
       at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:170)
       at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:131)
       at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48)
       at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:80)
       at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
       at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
       at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
       at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
       at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:474)
       at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:572)
       at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collect(StreamSourceContexts.java:144)
       at 
org.apache.hudi.source.StreamReadOperator.consumeAsMiniBatch(StreamReadOperator.java:228)
       at 
org.apache.hudi.source.StreamReadOperator.processSplits(StreamReadOperator.java:207)
       at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
       at 
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
       at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)
       at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
       at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
       at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:822)
       at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:771)
       at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
       at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:569)
       at java.lang.Thread.run(Thread.java:748) 
   ```
   
   
   ### Impact
   
   None
   
   ### Risk level (write none, low medium or high below)
   
   None
   
   ### Documentation Update
   
   _Describe any necessary documentation update if there is any new feature, 
config, or user-facing change_
   
   - _The config description must be updated if new configs are added or the 
default value of the configs are changed_
   - _Any new feature or user-facing change requires updating the Hudi website. 
Please create a Jira ticket, attach the
     ticket number here and follow the 
[instruction](https://hudi.apache.org/contribute/developer-setup#website) to 
make
     changes to the website._
   
   ### Contributor's checklist
   
   - [X] Read through [contributor's 
guide](https://hudi.apache.org/contribute/how-to-contribute)
   - [ ] Change Logs and Impact were stated clearly
   - [ ] Adequate tests were added if applicable
   - [ ] CI passed
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to