[
https://issues.apache.org/jira/browse/HUDI-8841?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Shuo Cheng updated HUDI-8841:
-----------------------------
Priority: Blocker (was: Major)
> Fix schema validating exception during flink async clustering
> -------------------------------------------------------------
>
> Key: HUDI-8841
> URL: https://issues.apache.org/jira/browse/HUDI-8841
> Project: Apache Hudi
> Issue Type: Bug
> Components: flink
> Reporter: Shuo Cheng
> Assignee: Shuo Cheng
> Priority: Blocker
> Labels: pull-request-available
> Fix For: 1.0.1
>
>
> {code:java}
> Caused by: org.apache.hudi.exception.HoodieException: unable to read next
> record from parquet file
> at
> org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:54)
> at
> org.apache.hudi.common.util.collection.MappingIterator.hasNext(MappingIterator.java:39)
> at
> org.apache.hudi.common.util.collection.MappingIterator.hasNext(MappingIterator.java:39)
> at
> java.util.Spliterators$IteratorSpliterator.tryAdvance(Spliterators.java:1811)
> at
> java.util.stream.StreamSpliterators$WrappingSpliterator.lambda$initPartialTraversalState$0(StreamSpliterators.java:294)
> at
> java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.fillBuffer(StreamSpliterators.java:206)
> at
> java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.doAdvance(StreamSpliterators.java:161)
> at
> java.util.stream.StreamSpliterators$WrappingSpliterator.tryAdvance(StreamSpliterators.java:300)
> at java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681)
> at
> org.apache.hudi.client.utils.ConcatenatingIterator.hasNext(ConcatenatingIterator.java:45)
> at
> org.apache.hudi.sink.clustering.ClusteringOperator.doClustering(ClusteringOperator.java:256)
> at
> org.apache.hudi.sink.clustering.ClusteringOperator.processElement(ClusteringOperator.java:200)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:238)
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:157)
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:114)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> at java.lang.Thread.run(Thread.java:750)
> Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value
> at 0 in block -1 in file
> file:/var/folders/br/97x3mf4d32l8t6clbtjmnpdh0000gn/T/junit3978218215968849217/par4/ddf21ff1-2378-4027-8aa8-a20b113fdaed_2-4-0_20250108094450480.parquet
> at
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:264)
> at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
> at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
> at
> org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49)
> ... 24 more
> Caused by: org.apache.parquet.io.ParquetDecodingException: The requested
> schema is not compatible with the file schema. incompatible types: required
> binary uuid (STRING) != optional binary uuid (STRING)
> at
> org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.incompatibleSchema(ColumnIOFactory.java:101)
> at
> org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visitChildren(ColumnIOFactory.java:81)
> at
> org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visit(ColumnIOFactory.java:57)
> at org.apache.parquet.schema.MessageType.accept(MessageType.java:55)
> at
> org.apache.parquet.io.ColumnIOFactory.getColumnIO(ColumnIOFactory.java:162)
> at
> org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:140)
> at
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:230)
> ... 27 more
> {code}
>
> In Flink SQL, primary key constraint can be defined, and the type of pk field
> become non-null,while spark has no primary key constraint, so they have
> discrepancies in the schema of the underlying files, e.g., parquet.
> we can make a schema reconciliation during the async clustering reading.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)