joy2025 created HUDI-8954:
-----------------------------

             Summary: Reconstruct writer schema while clustering by spark row 
writer
                 Key: HUDI-8954
                 URL: https://issues.apache.org/jira/browse/HUDI-8954
             Project: Apache Hudi
          Issue Type: Bug
            Reporter: joy2025


In clustering scenario,non-nullable fields will be convert to nullable if we 
enable row writer in spark.

When Spark read records through HadoopFsRelation, it passes a nullable schema:
{code:java}
// Spark-sql_2.12-3.5.3
// DataSource(line: 414)

HadoopFsRelation(
        fileCatalog,
        partitionSchema = partitionSchema,
        dataSchema = dataSchema.asNullable,
        bucketSpec = bucketSpec,
        format,
        caseInsensitiveOptions)(sparkSession)
{code}
When spark save these records into parquet file, it uses the read schema that 
it pass into HadoopFsRelation before.

 
{code:java}
// Hudi: master
// HoodieDatasetBulkInsertHelper(lines: 150)

def bulkInsert(dataset: Dataset[Row],
                 instantTime: String,
                 table: HoodieTable[_, _, _, _],
                 writeConfig: HoodieWriteConfig,
                 arePartitionRecordsSorted: Boolean,
                 shouldPreserveHoodieMetadata: Boolean): 
HoodieData[WriteStatus] = {
    val schema = dataset.schema
    HoodieJavaRDD.of(
      injectSQLConf(dataset.queryExecution.toRdd.mapPartitions(iter => {
        val taskContextSupplier: TaskContextSupplier = 
table.getTaskContextSupplier
        val taskPartitionId = taskContextSupplier.getPartitionIdSupplier.get
        val taskId = taskContextSupplier.getStageIdSupplier.get.toLong
        val taskEpochId = taskContextSupplier.getAttemptIdSupplier.get
        ...
  } {code}
 

If we disable row writer or use flink to execute clustering next time, it may 
cause schema conflict between reader schema (avro schema) and writer schema 
(parquet schema).
{code:java}
[ERROR] 2025-01-01 01:04:46,869 
method:org.apache.hudi.sink.utils.NonThrownExecutor.handleException(NonThrownExecutor.java:140)Executor
 executes action [Execute clustering for instant 20250101010426430 from task 
10] error
org.apache.hudi.exception.HoodieException: unable to read next record from 
parquet file
at 
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:53)
at org.apache.hudi.common.util.MappingIterator.hasNext(MappingIterator.java:35)
at org.apache.hudi.common.util.MappingIterator.hasNext(MappingIterator.java:35)
at java.util.Spliterators$IteratorSpliterator.tryAdvance(Spliterators.java:1811)
at 
java.util.stream.StreamSpliterators$WrappingSpliterator.lambda$initPartialTraversalState$0(StreamSpliterators.java:294)
at 
java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.fillBuffer(StreamSpliterators.java:206)
at 
java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.doAdvance(StreamSpliterators.java:161)
at 
java.util.stream.StreamSpliterators$WrappingSpliterator.tryAdvance(StreamSpliterators.java:300)
at java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681)
at 
org.apache.hudi.client.utils.ConcatenatingIterator.hasNext(ConcatenatingIterator.java:45)
at 
org.apache.hudi.sink.clustering.ClusteringOperator.doClustering(ClusteringOperator.java:264)
at 
org.apache.hudi.sink.clustering.ClusteringOperator.lambda$processElement$0(ClusteringOperator.java:192)
at 
org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:130)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value 
at 0 in block -1 in file 
file:/tmp/hudi/hudi_cow_sink3/1/6aed4a45-4272-4c3f-823d-a8dd34c1817c-0_0-5-330_20250101005816345.parquet
at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:254)
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
at 
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48)
... 15 more
Caused by: org.apache.parquet.io.ParquetDecodingException: The requested schema 
is not compatible with the file schema. incompatible types: required int32 id 
!= optional int32 id
at 
org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.incompatibleSchema(ColumnIOFactory.java:101)
at 
org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visitChildren(ColumnIOFactory.java:81)
at 
org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visit(ColumnIOFactory.java:57)
at org.apache.parquet.schema.MessageType.accept(MessageType.java:55)
at org.apache.parquet.io.ColumnIOFactory.getColumnIO(ColumnIOFactory.java:162)
at 
org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:135)
at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:225)
... 18 more {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to