joy2025 created HUDI-8954:
-----------------------------
Summary: Reconstruct writer schema while clustering by spark row
writer
Key: HUDI-8954
URL: https://issues.apache.org/jira/browse/HUDI-8954
Project: Apache Hudi
Issue Type: Bug
Reporter: joy2025
In clustering scenario,non-nullable fields will be convert to nullable if we
enable row writer in spark.
When Spark read records through HadoopFsRelation, it passes a nullable schema:
{code:java}
// Spark-sql_2.12-3.5.3
// DataSource(line: 414)
HadoopFsRelation(
fileCatalog,
partitionSchema = partitionSchema,
dataSchema = dataSchema.asNullable,
bucketSpec = bucketSpec,
format,
caseInsensitiveOptions)(sparkSession)
{code}
When spark save these records into parquet file, it uses the read schema that
it pass into HadoopFsRelation before.
{code:java}
// Hudi: master
// HoodieDatasetBulkInsertHelper(lines: 150)
def bulkInsert(dataset: Dataset[Row],
instantTime: String,
table: HoodieTable[_, _, _, _],
writeConfig: HoodieWriteConfig,
arePartitionRecordsSorted: Boolean,
shouldPreserveHoodieMetadata: Boolean):
HoodieData[WriteStatus] = {
val schema = dataset.schema
HoodieJavaRDD.of(
injectSQLConf(dataset.queryExecution.toRdd.mapPartitions(iter => {
val taskContextSupplier: TaskContextSupplier =
table.getTaskContextSupplier
val taskPartitionId = taskContextSupplier.getPartitionIdSupplier.get
val taskId = taskContextSupplier.getStageIdSupplier.get.toLong
val taskEpochId = taskContextSupplier.getAttemptIdSupplier.get
...
} {code}
If we disable row writer or use flink to execute clustering next time, it may
cause schema conflict between reader schema (avro schema) and writer schema
(parquet schema).
{code:java}
[ERROR] 2025-01-01 01:04:46,869
method:org.apache.hudi.sink.utils.NonThrownExecutor.handleException(NonThrownExecutor.java:140)Executor
executes action [Execute clustering for instant 20250101010426430 from task
10] error
org.apache.hudi.exception.HoodieException: unable to read next record from
parquet file
at
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:53)
at org.apache.hudi.common.util.MappingIterator.hasNext(MappingIterator.java:35)
at org.apache.hudi.common.util.MappingIterator.hasNext(MappingIterator.java:35)
at java.util.Spliterators$IteratorSpliterator.tryAdvance(Spliterators.java:1811)
at
java.util.stream.StreamSpliterators$WrappingSpliterator.lambda$initPartialTraversalState$0(StreamSpliterators.java:294)
at
java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.fillBuffer(StreamSpliterators.java:206)
at
java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.doAdvance(StreamSpliterators.java:161)
at
java.util.stream.StreamSpliterators$WrappingSpliterator.tryAdvance(StreamSpliterators.java:300)
at java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681)
at
org.apache.hudi.client.utils.ConcatenatingIterator.hasNext(ConcatenatingIterator.java:45)
at
org.apache.hudi.sink.clustering.ClusteringOperator.doClustering(ClusteringOperator.java:264)
at
org.apache.hudi.sink.clustering.ClusteringOperator.lambda$processElement$0(ClusteringOperator.java:192)
at
org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:130)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value
at 0 in block -1 in file
file:/tmp/hudi/hudi_cow_sink3/1/6aed4a45-4272-4c3f-823d-a8dd34c1817c-0_0-5-330_20250101005816345.parquet
at
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:254)
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
at
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48)
... 15 more
Caused by: org.apache.parquet.io.ParquetDecodingException: The requested schema
is not compatible with the file schema. incompatible types: required int32 id
!= optional int32 id
at
org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.incompatibleSchema(ColumnIOFactory.java:101)
at
org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visitChildren(ColumnIOFactory.java:81)
at
org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visit(ColumnIOFactory.java:57)
at org.apache.parquet.schema.MessageType.accept(MessageType.java:55)
at org.apache.parquet.io.ColumnIOFactory.getColumnIO(ColumnIOFactory.java:162)
at
org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:135)
at
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:225)
... 18 more {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)