[ 
https://issues.apache.org/jira/browse/HUDI-8841?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuo Cheng updated HUDI-8841:
-----------------------------
    Priority: Blocker  (was: Major)

> Fix schema validating exception during flink async clustering
> -------------------------------------------------------------
>
>                 Key: HUDI-8841
>                 URL: https://issues.apache.org/jira/browse/HUDI-8841
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: flink
>            Reporter: Shuo Cheng
>            Assignee: Shuo Cheng
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.0.1
>
>
> {code:java}
> Caused by: org.apache.hudi.exception.HoodieException: unable to read next 
> record from parquet file 
>     at 
> org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:54)
>     at 
> org.apache.hudi.common.util.collection.MappingIterator.hasNext(MappingIterator.java:39)
>     at 
> org.apache.hudi.common.util.collection.MappingIterator.hasNext(MappingIterator.java:39)
>     at 
> java.util.Spliterators$IteratorSpliterator.tryAdvance(Spliterators.java:1811)
>     at 
> java.util.stream.StreamSpliterators$WrappingSpliterator.lambda$initPartialTraversalState$0(StreamSpliterators.java:294)
>     at 
> java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.fillBuffer(StreamSpliterators.java:206)
>     at 
> java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.doAdvance(StreamSpliterators.java:161)
>     at 
> java.util.stream.StreamSpliterators$WrappingSpliterator.tryAdvance(StreamSpliterators.java:300)
>     at java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681)
>     at 
> org.apache.hudi.client.utils.ConcatenatingIterator.hasNext(ConcatenatingIterator.java:45)
>     at 
> org.apache.hudi.sink.clustering.ClusteringOperator.doClustering(ClusteringOperator.java:256)
>     at 
> org.apache.hudi.sink.clustering.ClusteringOperator.processElement(ClusteringOperator.java:200)
>     at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:238)
>     at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:157)
>     at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:114)
>     at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>     at java.lang.Thread.run(Thread.java:750)
> Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value 
> at 0 in block -1 in file 
> file:/var/folders/br/97x3mf4d32l8t6clbtjmnpdh0000gn/T/junit3978218215968849217/par4/ddf21ff1-2378-4027-8aa8-a20b113fdaed_2-4-0_20250108094450480.parquet
>     at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:264)
>     at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
>     at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
>     at 
> org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49)
>     ... 24 more
> Caused by: org.apache.parquet.io.ParquetDecodingException: The requested 
> schema is not compatible with the file schema. incompatible types: required 
> binary uuid (STRING) != optional binary uuid (STRING)
>     at 
> org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.incompatibleSchema(ColumnIOFactory.java:101)
>     at 
> org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visitChildren(ColumnIOFactory.java:81)
>     at 
> org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visit(ColumnIOFactory.java:57)
>     at org.apache.parquet.schema.MessageType.accept(MessageType.java:55)
>     at 
> org.apache.parquet.io.ColumnIOFactory.getColumnIO(ColumnIOFactory.java:162)
>     at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:140)
>     at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:230)
>     ... 27 more
>  {code}
>  
> In Flink SQL, primary key constraint can be defined, and the type of pk field 
> become non-null,while spark has no primary key constraint, so they have 
> discrepancies in the schema of the underlying files, e.g., parquet.
> we can make a schema reconciliation during the async clustering reading.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to