JoyJoyJo opened a new issue, #12561:
URL: https://github.com/apache/hudi/issues/12561

   
   **Describe the problem you faced**
   
   I'm using a flink job to append data into a cow table. When async clustering 
was triggered, the job throw a exception as below:
   `[ERROR] 2025-01-01 01:04:46,869 
method:org.apache.hudi.sink.utils.NonThrownExecutor.handleException(NonThrownExecutor.java:140)
   Executor executes action [Execute clustering for instant 20250101010426430 
from task 10] error
   org.apache.hudi.exception.HoodieException: unable to read next record from 
parquet file 
        at 
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:53)
        at 
org.apache.hudi.common.util.MappingIterator.hasNext(MappingIterator.java:35)
        at 
org.apache.hudi.common.util.MappingIterator.hasNext(MappingIterator.java:35)
        at 
java.util.Spliterators$IteratorSpliterator.tryAdvance(Spliterators.java:1811)
        at 
java.util.stream.StreamSpliterators$WrappingSpliterator.lambda$initPartialTraversalState$0(StreamSpliterators.java:294)
        at 
java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.fillBuffer(StreamSpliterators.java:206)
        at 
java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.doAdvance(StreamSpliterators.java:161)
        at 
java.util.stream.StreamSpliterators$WrappingSpliterator.tryAdvance(StreamSpliterators.java:300)
        at java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681)
        at 
org.apache.hudi.client.utils.ConcatenatingIterator.hasNext(ConcatenatingIterator.java:45)
        at 
org.apache.hudi.sink.clustering.ClusteringOperator.doClustering(ClusteringOperator.java:264)
        at 
org.apache.hudi.sink.clustering.ClusteringOperator.lambda$processElement$0(ClusteringOperator.java:192)
        at 
org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:130)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
   Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read 
value at 0 in block -1 in file 
file:/tmp/hudi/hudi_cow_sink3/1/6aed4a45-4272-4c3f-823d-a8dd34c1817c-0_0-5-330_20250101005816345.parquet
        at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:254)
        at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
        at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
        at 
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48)
        ... 15 more
   Caused by: org.apache.parquet.io.ParquetDecodingException: The requested 
schema is not compatible with the file schema. incompatible types: required 
int32 id != optional int32 id
        at 
org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.incompatibleSchema(ColumnIOFactory.java:101)
        at 
org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visitChildren(ColumnIOFactory.java:81)
        at 
org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visit(ColumnIOFactory.java:57)
        at org.apache.parquet.schema.MessageType.accept(MessageType.java:55)
        at 
org.apache.parquet.io.ColumnIOFactory.getColumnIO(ColumnIOFactory.java:162)
        at 
org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:135)
        at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:225)
        ... 18 more
   `
   Previously, I have stopped the flink job and executed clustering once by 
Spark `run_clustering` Procedure. And then I restart the Flink job. The latest 
clustering plan generated by Flink job contains some file written by spark. 
   
   **To Reproduce**
   
   Steps to reproduce the behavior (locally):
   
   1. start a flink job to append some data into cow table (turn off 
clustering.async.enabled)
   2. stop flink job and execute clustering by Spark `run_clustering` procedure
   3. restart the flink job (turn on clustering.async.enabled and 
clustering.schedule.enabled)
   In step3, the clustering plan generated by flink job should contains some 
files written by Step2.
   
   **Expected behavior**
   
   Is it a bug? Any advice can help me to solve this conflict?
   
   **Environment Description**
   
   * Hudi version : 0.13.1
   
   * Spark version : 2.4/3.4
   
   * Flink version: 1.16
   
   **Additional context**
   
   The field's repetiton properties of primary key `id` in parquet schema 
written by Flink is `REQUIRED` but `OPTIONAL` by Spark.
   
   parquet schema written by flink:
   `"schema" : {
         "name" : "flink_schema",
         "repetition" : "REPEATED",
         "logicalTypeAnnotation" : null,
         "id" : null,
         "fields" : [ {
           "name" : "_hoodie_commit_time",
           "repetition" : "OPTIONAL",
           "logicalTypeAnnotation" : { },
           "id" : null,
           "primitive" : "BINARY",
           "length" : 0,
           "decimalMeta" : null,
           "columnOrder" : {
             "columnOrderName" : "TYPE_DEFINED_ORDER"
           }
         }, {
           "name" : "_hoodie_commit_seqno",
           "repetition" : "OPTIONAL",
           "logicalTypeAnnotation" : { },
           "id" : null,
           "primitive" : "BINARY",
           "length" : 0,
           "decimalMeta" : null,
           "columnOrder" : {
             "columnOrderName" : "TYPE_DEFINED_ORDER"
           }
         }, {
           "name" : "_hoodie_record_key",
           "repetition" : "OPTIONAL",
           "logicalTypeAnnotation" : { },
           "id" : null,
           "primitive" : "BINARY",
           "length" : 0,
           "decimalMeta" : null,
           "columnOrder" : {
             "columnOrderName" : "TYPE_DEFINED_ORDER"
           }
         }, {
           "name" : "_hoodie_partition_path",
           "repetition" : "OPTIONAL",
           "logicalTypeAnnotation" : { },
           "id" : null,
           "primitive" : "BINARY",
           "length" : 0,
           "decimalMeta" : null,
           "columnOrder" : {
             "columnOrderName" : "TYPE_DEFINED_ORDER"
           }
         }, {
           "name" : "_hoodie_file_name",
           "repetition" : "OPTIONAL",
           "logicalTypeAnnotation" : { },
           "id" : null,
           "primitive" : "BINARY",
           "length" : 0,
           "decimalMeta" : null,
           "columnOrder" : {
             "columnOrderName" : "TYPE_DEFINED_ORDER"
           }
         }, {
           "name" : "id",
           "repetition" : "REQUIRED",
           "logicalTypeAnnotation" : null,
           "id" : null,
           "primitive" : "INT32",
           "length" : 0,
           "decimalMeta" : null,
           "columnOrder" : {
             "columnOrderName" : "TYPE_DEFINED_ORDER"
           }
         }, {
           "name" : "name",
           "repetition" : "OPTIONAL",
           "logicalTypeAnnotation" : { },
           "id" : null,
           "primitive" : "BINARY",
           "length" : 0,
           "decimalMeta" : null,
           "columnOrder" : {
             "columnOrderName" : "TYPE_DEFINED_ORDER"
           }
         }, {
           "name" : "dt",
           "repetition" : "OPTIONAL",
           "logicalTypeAnnotation" : null,
           "id" : null,
           "primitive" : "INT32",
           "length" : 0,
           "decimalMeta" : null,
           "columnOrder" : {
             "columnOrderName" : "TYPE_DEFINED_ORDER"
           }
         }, {
           "name" : "ts",
           "repetition" : "OPTIONAL",
           "logicalTypeAnnotation" : null,
           "id" : null,
           "primitive" : "INT64",
           "length" : 0,
           "decimalMeta" : null,
           "columnOrder" : {
             "columnOrderName" : "TYPE_DEFINED_ORDER"
           }
         }`
   
   parquet schema written by spark:
   `"schema" : {
         "name" : "spark_schema",
         "repetition" : "REPEATED",
         "logicalTypeAnnotation" : null,
         "id" : null,
         "fields" : [ {
           "name" : "_hoodie_commit_time",
           "repetition" : "OPTIONAL",
           "logicalTypeAnnotation" : { },
           "id" : null,
           "primitive" : "BINARY",
           "length" : 0,
           "decimalMeta" : null,
           "columnOrder" : {
             "columnOrderName" : "TYPE_DEFINED_ORDER"
           }
         }, {
           "name" : "_hoodie_commit_seqno",
           "repetition" : "OPTIONAL",
           "logicalTypeAnnotation" : { },
           "id" : null,
           "primitive" : "BINARY",
           "length" : 0,
           "decimalMeta" : null,
           "columnOrder" : {
             "columnOrderName" : "TYPE_DEFINED_ORDER"
           }
         }, {
           "name" : "_hoodie_record_key",
           "repetition" : "OPTIONAL",
           "logicalTypeAnnotation" : { },
           "id" : null,
           "primitive" : "BINARY",
           "length" : 0,
           "decimalMeta" : null,
           "columnOrder" : {
             "columnOrderName" : "TYPE_DEFINED_ORDER"
           }
         }, {
           "name" : "_hoodie_partition_path",
           "repetition" : "OPTIONAL",
           "logicalTypeAnnotation" : { },
           "id" : null,
           "primitive" : "BINARY",
           "length" : 0,
           "decimalMeta" : null,
           "columnOrder" : {
             "columnOrderName" : "TYPE_DEFINED_ORDER"
           }
         }, {
           "name" : "_hoodie_file_name",
           "repetition" : "OPTIONAL",
           "logicalTypeAnnotation" : { },
           "id" : null,
           "primitive" : "BINARY",
           "length" : 0,
           "decimalMeta" : null,
           "columnOrder" : {
             "columnOrderName" : "TYPE_DEFINED_ORDER"
           }
         }, {
           "name" : "id",
           "repetition" : "OPTIONAL",
           "logicalTypeAnnotation" : null,
           "id" : null,
           "primitive" : "INT32",
           "length" : 0,
           "decimalMeta" : null,
           "columnOrder" : {
             "columnOrderName" : "TYPE_DEFINED_ORDER"
           }
         }, {
           "name" : "name",
           "repetition" : "OPTIONAL",
           "logicalTypeAnnotation" : { },
           "id" : null,
           "primitive" : "BINARY",
           "length" : 0,
           "decimalMeta" : null,
           "columnOrder" : {
             "columnOrderName" : "TYPE_DEFINED_ORDER"
           }
         }, {
           "name" : "dt",
           "repetition" : "OPTIONAL",
           "logicalTypeAnnotation" : null,
           "id" : null,
           "primitive" : "INT32",
           "length" : 0,
           "decimalMeta" : null,
           "columnOrder" : {
             "columnOrderName" : "TYPE_DEFINED_ORDER"
           }
         }, {
           "name" : "ts",
           "repetition" : "OPTIONAL",
           "logicalTypeAnnotation" : null,
           "id" : null,
           "primitive" : "INT64",
           "length" : 0,
           "decimalMeta" : null,
           "columnOrder" : {
             "columnOrderName" : "TYPE_DEFINED_ORDER"
           }
         }`
   
   **Stacktrace**
   
   ```Add the stacktrace of the error.```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to