JoyJoyJo opened a new issue, #12561:
URL: https://github.com/apache/hudi/issues/12561
**Describe the problem you faced**
I'm using a flink job to append data into a cow table. When async clustering
was triggered, the job throw a exception as below:
`[ERROR] 2025-01-01 01:04:46,869
method:org.apache.hudi.sink.utils.NonThrownExecutor.handleException(NonThrownExecutor.java:140)
Executor executes action [Execute clustering for instant 20250101010426430
from task 10] error
org.apache.hudi.exception.HoodieException: unable to read next record from
parquet file
at
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:53)
at
org.apache.hudi.common.util.MappingIterator.hasNext(MappingIterator.java:35)
at
org.apache.hudi.common.util.MappingIterator.hasNext(MappingIterator.java:35)
at
java.util.Spliterators$IteratorSpliterator.tryAdvance(Spliterators.java:1811)
at
java.util.stream.StreamSpliterators$WrappingSpliterator.lambda$initPartialTraversalState$0(StreamSpliterators.java:294)
at
java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.fillBuffer(StreamSpliterators.java:206)
at
java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.doAdvance(StreamSpliterators.java:161)
at
java.util.stream.StreamSpliterators$WrappingSpliterator.tryAdvance(StreamSpliterators.java:300)
at java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681)
at
org.apache.hudi.client.utils.ConcatenatingIterator.hasNext(ConcatenatingIterator.java:45)
at
org.apache.hudi.sink.clustering.ClusteringOperator.doClustering(ClusteringOperator.java:264)
at
org.apache.hudi.sink.clustering.ClusteringOperator.lambda$processElement$0(ClusteringOperator.java:192)
at
org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:130)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read
value at 0 in block -1 in file
file:/tmp/hudi/hudi_cow_sink3/1/6aed4a45-4272-4c3f-823d-a8dd34c1817c-0_0-5-330_20250101005816345.parquet
at
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:254)
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
at
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48)
... 15 more
Caused by: org.apache.parquet.io.ParquetDecodingException: The requested
schema is not compatible with the file schema. incompatible types: required
int32 id != optional int32 id
at
org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.incompatibleSchema(ColumnIOFactory.java:101)
at
org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visitChildren(ColumnIOFactory.java:81)
at
org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visit(ColumnIOFactory.java:57)
at org.apache.parquet.schema.MessageType.accept(MessageType.java:55)
at
org.apache.parquet.io.ColumnIOFactory.getColumnIO(ColumnIOFactory.java:162)
at
org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:135)
at
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:225)
... 18 more
`
Previously, I have stopped the flink job and executed clustering once by
Spark `run_clustering` Procedure. And then I restart the Flink job. The latest
clustering plan generated by Flink job contains some file written by spark.
**To Reproduce**
Steps to reproduce the behavior (locally):
1. start a flink job to append some data into cow table (turn off
clustering.async.enabled)
2. stop flink job and execute clustering by Spark `run_clustering` procedure
3. restart the flink job (turn on clustering.async.enabled and
clustering.schedule.enabled)
In step3, the clustering plan generated by flink job should contains some
files written by Step2.
**Expected behavior**
Is it a bug? Any advice can help me to solve this conflict?
**Environment Description**
* Hudi version : 0.13.1
* Spark version : 2.4/3.4
* Flink version: 1.16
**Additional context**
The field's repetiton properties of primary key `id` in parquet schema
written by Flink is `REQUIRED` but `OPTIONAL` by Spark.
parquet schema written by flink:
`"schema" : {
"name" : "flink_schema",
"repetition" : "REPEATED",
"logicalTypeAnnotation" : null,
"id" : null,
"fields" : [ {
"name" : "_hoodie_commit_time",
"repetition" : "OPTIONAL",
"logicalTypeAnnotation" : { },
"id" : null,
"primitive" : "BINARY",
"length" : 0,
"decimalMeta" : null,
"columnOrder" : {
"columnOrderName" : "TYPE_DEFINED_ORDER"
}
}, {
"name" : "_hoodie_commit_seqno",
"repetition" : "OPTIONAL",
"logicalTypeAnnotation" : { },
"id" : null,
"primitive" : "BINARY",
"length" : 0,
"decimalMeta" : null,
"columnOrder" : {
"columnOrderName" : "TYPE_DEFINED_ORDER"
}
}, {
"name" : "_hoodie_record_key",
"repetition" : "OPTIONAL",
"logicalTypeAnnotation" : { },
"id" : null,
"primitive" : "BINARY",
"length" : 0,
"decimalMeta" : null,
"columnOrder" : {
"columnOrderName" : "TYPE_DEFINED_ORDER"
}
}, {
"name" : "_hoodie_partition_path",
"repetition" : "OPTIONAL",
"logicalTypeAnnotation" : { },
"id" : null,
"primitive" : "BINARY",
"length" : 0,
"decimalMeta" : null,
"columnOrder" : {
"columnOrderName" : "TYPE_DEFINED_ORDER"
}
}, {
"name" : "_hoodie_file_name",
"repetition" : "OPTIONAL",
"logicalTypeAnnotation" : { },
"id" : null,
"primitive" : "BINARY",
"length" : 0,
"decimalMeta" : null,
"columnOrder" : {
"columnOrderName" : "TYPE_DEFINED_ORDER"
}
}, {
"name" : "id",
"repetition" : "REQUIRED",
"logicalTypeAnnotation" : null,
"id" : null,
"primitive" : "INT32",
"length" : 0,
"decimalMeta" : null,
"columnOrder" : {
"columnOrderName" : "TYPE_DEFINED_ORDER"
}
}, {
"name" : "name",
"repetition" : "OPTIONAL",
"logicalTypeAnnotation" : { },
"id" : null,
"primitive" : "BINARY",
"length" : 0,
"decimalMeta" : null,
"columnOrder" : {
"columnOrderName" : "TYPE_DEFINED_ORDER"
}
}, {
"name" : "dt",
"repetition" : "OPTIONAL",
"logicalTypeAnnotation" : null,
"id" : null,
"primitive" : "INT32",
"length" : 0,
"decimalMeta" : null,
"columnOrder" : {
"columnOrderName" : "TYPE_DEFINED_ORDER"
}
}, {
"name" : "ts",
"repetition" : "OPTIONAL",
"logicalTypeAnnotation" : null,
"id" : null,
"primitive" : "INT64",
"length" : 0,
"decimalMeta" : null,
"columnOrder" : {
"columnOrderName" : "TYPE_DEFINED_ORDER"
}
}`
parquet schema written by spark:
`"schema" : {
"name" : "spark_schema",
"repetition" : "REPEATED",
"logicalTypeAnnotation" : null,
"id" : null,
"fields" : [ {
"name" : "_hoodie_commit_time",
"repetition" : "OPTIONAL",
"logicalTypeAnnotation" : { },
"id" : null,
"primitive" : "BINARY",
"length" : 0,
"decimalMeta" : null,
"columnOrder" : {
"columnOrderName" : "TYPE_DEFINED_ORDER"
}
}, {
"name" : "_hoodie_commit_seqno",
"repetition" : "OPTIONAL",
"logicalTypeAnnotation" : { },
"id" : null,
"primitive" : "BINARY",
"length" : 0,
"decimalMeta" : null,
"columnOrder" : {
"columnOrderName" : "TYPE_DEFINED_ORDER"
}
}, {
"name" : "_hoodie_record_key",
"repetition" : "OPTIONAL",
"logicalTypeAnnotation" : { },
"id" : null,
"primitive" : "BINARY",
"length" : 0,
"decimalMeta" : null,
"columnOrder" : {
"columnOrderName" : "TYPE_DEFINED_ORDER"
}
}, {
"name" : "_hoodie_partition_path",
"repetition" : "OPTIONAL",
"logicalTypeAnnotation" : { },
"id" : null,
"primitive" : "BINARY",
"length" : 0,
"decimalMeta" : null,
"columnOrder" : {
"columnOrderName" : "TYPE_DEFINED_ORDER"
}
}, {
"name" : "_hoodie_file_name",
"repetition" : "OPTIONAL",
"logicalTypeAnnotation" : { },
"id" : null,
"primitive" : "BINARY",
"length" : 0,
"decimalMeta" : null,
"columnOrder" : {
"columnOrderName" : "TYPE_DEFINED_ORDER"
}
}, {
"name" : "id",
"repetition" : "OPTIONAL",
"logicalTypeAnnotation" : null,
"id" : null,
"primitive" : "INT32",
"length" : 0,
"decimalMeta" : null,
"columnOrder" : {
"columnOrderName" : "TYPE_DEFINED_ORDER"
}
}, {
"name" : "name",
"repetition" : "OPTIONAL",
"logicalTypeAnnotation" : { },
"id" : null,
"primitive" : "BINARY",
"length" : 0,
"decimalMeta" : null,
"columnOrder" : {
"columnOrderName" : "TYPE_DEFINED_ORDER"
}
}, {
"name" : "dt",
"repetition" : "OPTIONAL",
"logicalTypeAnnotation" : null,
"id" : null,
"primitive" : "INT32",
"length" : 0,
"decimalMeta" : null,
"columnOrder" : {
"columnOrderName" : "TYPE_DEFINED_ORDER"
}
}, {
"name" : "ts",
"repetition" : "OPTIONAL",
"logicalTypeAnnotation" : null,
"id" : null,
"primitive" : "INT64",
"length" : 0,
"decimalMeta" : null,
"columnOrder" : {
"columnOrderName" : "TYPE_DEFINED_ORDER"
}
}`
**Stacktrace**
```Add the stacktrace of the error.```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]