Shuo Cheng created HUDI-8841:
--------------------------------

             Summary: Fix schema validating exception during flink async 
clustering
                 Key: HUDI-8841
                 URL: https://issues.apache.org/jira/browse/HUDI-8841
             Project: Apache Hudi
          Issue Type: Bug
          Components: flink
            Reporter: Shuo Cheng
             Fix For: 1.0.1


{code:java}
Caused by: org.apache.hudi.exception.HoodieException: unable to read next 
record from parquet file 
    at 
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:54)
    at 
org.apache.hudi.common.util.collection.MappingIterator.hasNext(MappingIterator.java:39)
    at 
org.apache.hudi.common.util.collection.MappingIterator.hasNext(MappingIterator.java:39)
    at 
java.util.Spliterators$IteratorSpliterator.tryAdvance(Spliterators.java:1811)
    at 
java.util.stream.StreamSpliterators$WrappingSpliterator.lambda$initPartialTraversalState$0(StreamSpliterators.java:294)
    at 
java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.fillBuffer(StreamSpliterators.java:206)
    at 
java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.doAdvance(StreamSpliterators.java:161)
    at 
java.util.stream.StreamSpliterators$WrappingSpliterator.tryAdvance(StreamSpliterators.java:300)
    at java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681)
    at 
org.apache.hudi.client.utils.ConcatenatingIterator.hasNext(ConcatenatingIterator.java:45)
    at 
org.apache.hudi.sink.clustering.ClusteringOperator.doClustering(ClusteringOperator.java:256)
    at 
org.apache.hudi.sink.clustering.ClusteringOperator.processElement(ClusteringOperator.java:200)
    at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:238)
    at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:157)
    at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:114)
    at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
    at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
    at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value 
at 0 in block -1 in file 
file:/var/folders/br/97x3mf4d32l8t6clbtjmnpdh0000gn/T/junit3978218215968849217/par4/ddf21ff1-2378-4027-8aa8-a20b113fdaed_2-4-0_20250108094450480.parquet
    at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:264)
    at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
    at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
    at 
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49)
    ... 24 more
Caused by: org.apache.parquet.io.ParquetDecodingException: The requested schema 
is not compatible with the file schema. incompatible types: required binary 
uuid (STRING) != optional binary uuid (STRING)
    at 
org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.incompatibleSchema(ColumnIOFactory.java:101)
    at 
org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visitChildren(ColumnIOFactory.java:81)
    at 
org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visit(ColumnIOFactory.java:57)
    at org.apache.parquet.schema.MessageType.accept(MessageType.java:55)
    at 
org.apache.parquet.io.ColumnIOFactory.getColumnIO(ColumnIOFactory.java:162)
    at 
org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:140)
    at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:230)
    ... 27 more
 {code}
 

In Flink SQL, primary key constraint can be defined, and the type of pk field 
become non-null,while spark has no primary key constraint, so they have 
discrepancies in the schema of the underlying files, e.g., parquet.

we can make a schema reconciliation during the async clustering reading.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to