[ 
https://issues.apache.org/jira/browse/HUDI-8954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated HUDI-8954:
---------------------------------
    Labels: pull-request-available  (was: )

> Reconstruct writer schema while clustering by spark row writer
> --------------------------------------------------------------
>
>                 Key: HUDI-8954
>                 URL: https://issues.apache.org/jira/browse/HUDI-8954
>             Project: Apache Hudi
>          Issue Type: Bug
>            Reporter: joy2025
>            Priority: Major
>              Labels: pull-request-available
>
> In clustering scenario,non-nullable fields will be convert to nullable if we 
> enable row writer in spark.
> When Spark read records through HadoopFsRelation, it passes a nullable schema:
> {code:java}
> // Spark-sql_2.12-3.5.3
> // DataSource(line: 414)
> HadoopFsRelation(
>         fileCatalog,
>         partitionSchema = partitionSchema,
>         dataSchema = dataSchema.asNullable,
>         bucketSpec = bucketSpec,
>         format,
>         caseInsensitiveOptions)(sparkSession)
> {code}
> When spark save these records into parquet file, it uses the read schema that 
> it pass into HadoopFsRelation before.
>  
> {code:java}
> // Hudi: master
> // HoodieDatasetBulkInsertHelper(lines: 150)
> def bulkInsert(dataset: Dataset[Row],
>                  instantTime: String,
>                  table: HoodieTable[_, _, _, _],
>                  writeConfig: HoodieWriteConfig,
>                  arePartitionRecordsSorted: Boolean,
>                  shouldPreserveHoodieMetadata: Boolean): 
> HoodieData[WriteStatus] = {
>     val schema = dataset.schema
>     HoodieJavaRDD.of(
>       injectSQLConf(dataset.queryExecution.toRdd.mapPartitions(iter => {
>         val taskContextSupplier: TaskContextSupplier = 
> table.getTaskContextSupplier
>         val taskPartitionId = taskContextSupplier.getPartitionIdSupplier.get
>         val taskId = taskContextSupplier.getStageIdSupplier.get.toLong
>         val taskEpochId = taskContextSupplier.getAttemptIdSupplier.get
>         ...
>   } {code}
>  
> If we disable row writer or use flink to execute clustering next time, it may 
> cause schema conflict between reader schema (avro schema) and writer schema 
> (parquet schema).
> {code:java}
> [ERROR] 2025-01-01 01:04:46,869 
> method:org.apache.hudi.sink.utils.NonThrownExecutor.handleException(NonThrownExecutor.java:140)Executor
>  executes action [Execute clustering for instant 20250101010426430 from task 
> 10] error
> org.apache.hudi.exception.HoodieException: unable to read next record from 
> parquet file
> at 
> org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:53)
> at 
> org.apache.hudi.common.util.MappingIterator.hasNext(MappingIterator.java:35)
> at 
> org.apache.hudi.common.util.MappingIterator.hasNext(MappingIterator.java:35)
> at 
> java.util.Spliterators$IteratorSpliterator.tryAdvance(Spliterators.java:1811)
> at 
> java.util.stream.StreamSpliterators$WrappingSpliterator.lambda$initPartialTraversalState$0(StreamSpliterators.java:294)
> at 
> java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.fillBuffer(StreamSpliterators.java:206)
> at 
> java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.doAdvance(StreamSpliterators.java:161)
> at 
> java.util.stream.StreamSpliterators$WrappingSpliterator.tryAdvance(StreamSpliterators.java:300)
> at java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681)
> at 
> org.apache.hudi.client.utils.ConcatenatingIterator.hasNext(ConcatenatingIterator.java:45)
> at 
> org.apache.hudi.sink.clustering.ClusteringOperator.doClustering(ClusteringOperator.java:264)
> at 
> org.apache.hudi.sink.clustering.ClusteringOperator.lambda$processElement$0(ClusteringOperator.java:192)
> at 
> org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:130)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:750)
> Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value 
> at 0 in block -1 in file 
> file:/tmp/hudi/hudi_cow_sink3/1/6aed4a45-4272-4c3f-823d-a8dd34c1817c-0_0-5-330_20250101005816345.parquet
> at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:254)
> at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
> at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
> at 
> org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48)
> ... 15 more
> Caused by: org.apache.parquet.io.ParquetDecodingException: The requested 
> schema is not compatible with the file schema. incompatible types: required 
> int32 id != optional int32 id
> at 
> org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.incompatibleSchema(ColumnIOFactory.java:101)
> at 
> org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visitChildren(ColumnIOFactory.java:81)
> at 
> org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visit(ColumnIOFactory.java:57)
> at org.apache.parquet.schema.MessageType.accept(MessageType.java:55)
> at org.apache.parquet.io.ColumnIOFactory.getColumnIO(ColumnIOFactory.java:162)
> at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:135)
> at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:225)
> ... 18 more {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to