[
https://issues.apache.org/jira/browse/HUDI-8954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated HUDI-8954:
---------------------------------
Labels: pull-request-available (was: )
> Reconstruct writer schema while clustering by spark row writer
> --------------------------------------------------------------
>
> Key: HUDI-8954
> URL: https://issues.apache.org/jira/browse/HUDI-8954
> Project: Apache Hudi
> Issue Type: Bug
> Reporter: joy2025
> Priority: Major
> Labels: pull-request-available
>
> In clustering scenario,non-nullable fields will be convert to nullable if we
> enable row writer in spark.
> When Spark read records through HadoopFsRelation, it passes a nullable schema:
> {code:java}
> // Spark-sql_2.12-3.5.3
> // DataSource(line: 414)
> HadoopFsRelation(
> fileCatalog,
> partitionSchema = partitionSchema,
> dataSchema = dataSchema.asNullable,
> bucketSpec = bucketSpec,
> format,
> caseInsensitiveOptions)(sparkSession)
> {code}
> When spark save these records into parquet file, it uses the read schema that
> it pass into HadoopFsRelation before.
>
> {code:java}
> // Hudi: master
> // HoodieDatasetBulkInsertHelper(lines: 150)
> def bulkInsert(dataset: Dataset[Row],
> instantTime: String,
> table: HoodieTable[_, _, _, _],
> writeConfig: HoodieWriteConfig,
> arePartitionRecordsSorted: Boolean,
> shouldPreserveHoodieMetadata: Boolean):
> HoodieData[WriteStatus] = {
> val schema = dataset.schema
> HoodieJavaRDD.of(
> injectSQLConf(dataset.queryExecution.toRdd.mapPartitions(iter => {
> val taskContextSupplier: TaskContextSupplier =
> table.getTaskContextSupplier
> val taskPartitionId = taskContextSupplier.getPartitionIdSupplier.get
> val taskId = taskContextSupplier.getStageIdSupplier.get.toLong
> val taskEpochId = taskContextSupplier.getAttemptIdSupplier.get
> ...
> } {code}
>
> If we disable row writer or use flink to execute clustering next time, it may
> cause schema conflict between reader schema (avro schema) and writer schema
> (parquet schema).
> {code:java}
> [ERROR] 2025-01-01 01:04:46,869
> method:org.apache.hudi.sink.utils.NonThrownExecutor.handleException(NonThrownExecutor.java:140)Executor
> executes action [Execute clustering for instant 20250101010426430 from task
> 10] error
> org.apache.hudi.exception.HoodieException: unable to read next record from
> parquet file
> at
> org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:53)
> at
> org.apache.hudi.common.util.MappingIterator.hasNext(MappingIterator.java:35)
> at
> org.apache.hudi.common.util.MappingIterator.hasNext(MappingIterator.java:35)
> at
> java.util.Spliterators$IteratorSpliterator.tryAdvance(Spliterators.java:1811)
> at
> java.util.stream.StreamSpliterators$WrappingSpliterator.lambda$initPartialTraversalState$0(StreamSpliterators.java:294)
> at
> java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.fillBuffer(StreamSpliterators.java:206)
> at
> java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.doAdvance(StreamSpliterators.java:161)
> at
> java.util.stream.StreamSpliterators$WrappingSpliterator.tryAdvance(StreamSpliterators.java:300)
> at java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681)
> at
> org.apache.hudi.client.utils.ConcatenatingIterator.hasNext(ConcatenatingIterator.java:45)
> at
> org.apache.hudi.sink.clustering.ClusteringOperator.doClustering(ClusteringOperator.java:264)
> at
> org.apache.hudi.sink.clustering.ClusteringOperator.lambda$processElement$0(ClusteringOperator.java:192)
> at
> org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:130)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:750)
> Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value
> at 0 in block -1 in file
> file:/tmp/hudi/hudi_cow_sink3/1/6aed4a45-4272-4c3f-823d-a8dd34c1817c-0_0-5-330_20250101005816345.parquet
> at
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:254)
> at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
> at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
> at
> org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48)
> ... 15 more
> Caused by: org.apache.parquet.io.ParquetDecodingException: The requested
> schema is not compatible with the file schema. incompatible types: required
> int32 id != optional int32 id
> at
> org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.incompatibleSchema(ColumnIOFactory.java:101)
> at
> org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visitChildren(ColumnIOFactory.java:81)
> at
> org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visit(ColumnIOFactory.java:57)
> at org.apache.parquet.schema.MessageType.accept(MessageType.java:55)
> at org.apache.parquet.io.ColumnIOFactory.getColumnIO(ColumnIOFactory.java:162)
> at
> org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:135)
> at
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:225)
> ... 18 more {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)