Kenneth William Krugler created HUDI-5381:
---------------------------------------------
Summary: Class cast exception with Flink 1.15 source when reading
table written using bulk insert
Key: HUDI-5381
URL: https://issues.apache.org/jira/browse/HUDI-5381
Project: Apache Hudi
Issue Type: Bug
Components: flink
Reporter: Kenneth William Krugler
When running a unit test that reads records which were written by a Flink
workflow using MOR and batch insert, the following exception occurs:
{{22/12/13 08:22:04 WARN taskmanager.Task:1104 - Source: Hudi Source -> Conver
Row Data To Enriched Netflow (1/1)#5 (75289b6b98a35bc5d4522caaed3753a1)
switched from RUNNING to FAILED with failure cause:
java.lang.NoSuchMethodError:
org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader.nextRecord()Lorg/apache/flink/table/data/ColumnarRowData;
at
org.apache.hudi.table.format.mor.MergeOnReadInputFormat$BaseFileOnlyFilteringIterator.reachedEnd(MergeOnReadInputFormat.java:510)
at
org.apache.hudi.table.format.mor.MergeOnReadInputFormat.reachedEnd(MergeOnReadInputFormat.java:245)
at
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:89)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:332)
}}
MergeOnReadInputFormat (which is part of hudi-flink) uses
ParquetColumnarRowSplitReader to get the next record, and each Flink-specific
jar (hudi-flink1.13x, 1.14x, 1.15x) has their own version of
ParquetColumnarRowSplitReader.
Unfortunately the nextRecord() method in this class returns Flinkās
ColumnarRowData, which changed packages between Flink 1.14 and 1.15. So when
you use MergeOnReadInputFormat with Flink 1.15, you get a class cast error,
because (I assume) MergeOnReadInputFormat was compiled against 1.13 or 1.14,
where the result type from nextRecord() is
org.apache.hudi.table.data.ColumnarRowData, not
org.apache.flink.table.data.columnar.ColumnarRowData.
This error didn't happen previously when the Flink workflow was using insert
(versus bulk insert) to write the table...though I haven't dug into how this is
related.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)